This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 1ffec7c48ce branch-3.1: [Fix](MS)Handle missing endpoint in DLF-only
Metastore scenario with region fallback #54300 (#54522)
1ffec7c48ce is described below
commit 1ffec7c48ce4776779cef12cd46d417ee803d44d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Aug 13 12:05:01 2025 +0800
branch-3.1: [Fix](MS)Handle missing endpoint in DLF-only Metastore scenario
with region fallback #54300 (#54522)
Cherry-picked from #54300
Co-authored-by: Calvin Kirs <[email protected]>
---
.../org/apache/doris/common/util/PathUtils.java | 92 ++++++++++++++
.../doris/datasource/hive/HMSTransaction.java | 13 +-
.../hive/event/MetastoreEventsProcessor.java | 17 ++-
.../doris/datasource/iceberg/dlf/DLFCatalog.java | 3 +-
.../datasource/paimon/source/PaimonScanNode.java | 2 +-
.../metastore/AliyunDLFBaseProperties.java | 28 ++++-
.../metastore/HMSAliyunDLFMetaStoreProperties.java | 6 +
.../property/metastore/HMSBaseProperties.java | 8 +-
.../IcebergAliyunDLFMetaStoreProperties.java | 5 -
.../IcebergFileSystemMetaStoreProperties.java | 14 ++-
.../metastore/IcebergHMSMetaStoreProperties.java | 5 +
.../storage/AbstractS3CompatibleProperties.java | 28 ++++-
.../datasource/property/storage/COSProperties.java | 7 +-
.../datasource/property/storage/OBSProperties.java | 3 +
.../datasource/property/storage/OSSProperties.java | 61 +++++++++-
.../org/apache/doris/planner/HiveTableSink.java | 19 +--
.../tablefunction/IcebergTableValuedFunction.java | 2 +-
.../apache/doris/common/util/PathUtilsTest.java | 86 ++++++++++++++
.../metastore/AliyunDLFBasePropertiesTest.java | 105 ++++++++++++++++
.../metastore/AliyunDLFPropertiesTest.java | 132 ---------------------
.../property/storage/OSSPropertiesTest.java | 20 +++-
.../apache/doris/planner/HiveTableSinkTest.java | 32 ++++-
22 files changed, 501 insertions(+), 187 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PathUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PathUtils.java
new file mode 100644
index 00000000000..dd4ca93e2ca
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PathUtils.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+public class PathUtils {
+ /**
+ * Compares two URI strings for equality with special handling for the
"s3" scheme.
+ * <p>
+ * The comparison rules are:
+ * - If both URIs have the same scheme, compare the full URI strings
(case-insensitive).
+ * - If the schemes differ, but one of them is "s3", then compare only the
authority and path parts,
+ * ignoring the scheme.
+ * - Otherwise, consider the URIs as not equal.
+ * <p>
+ * This is useful in scenarios where "s3" URIs should be treated as
equivalent to other schemes
+ * if the host and path match, ignoring scheme differences.
+ *
+ * @param p1 the first URI string to compare
+ * @param p2 the second URI string to compare
+ * @return true if the URIs are considered equal under the above rules,
false otherwise
+ */
+ public static boolean equalsIgnoreSchemeIfOneIsS3(String p1, String p2) {
+ if (p1 == null || p2 == null) {
+ return p1 == null && p2 == null;
+ }
+
+ try {
+ URI uri1 = new URI(p1);
+ URI uri2 = new URI(p2);
+
+ String scheme1 = uri1.getScheme();
+ String scheme2 = uri2.getScheme();
+
+ // If schemes are equal, compare the full URI strings ignoring case
+ if (scheme1 != null && scheme1.equalsIgnoreCase(scheme2)) {
+ return p1.equalsIgnoreCase(p2);
+ }
+
+ // If schemes differ but one is "s3", compare only authority and
path ignoring scheme
+ if ("s3".equalsIgnoreCase(scheme1) ||
"s3".equalsIgnoreCase(scheme2)) {
+ String auth1 = normalize(uri1.getAuthority());
+ String auth2 = normalize(uri2.getAuthority());
+ String path1 = normalize(uri1.getPath());
+ String path2 = normalize(uri2.getPath());
+
+ return Objects.equals(auth1, auth2) && Objects.equals(path1,
path2);
+ }
+
+ // Otherwise, URIs are not equal
+ return false;
+
+ } catch (URISyntaxException e) {
+ // If URI parsing fails, fallback to simple case-insensitive
string comparison
+ return p1.equalsIgnoreCase(p2);
+ }
+ }
+
+ /**
+ * Normalizes a URI component by converting null to empty string and
+ * removing trailing slashes.
+ *
+ * @param s the string to normalize
+ * @return normalized string without trailing slashes
+ */
+ private static String normalize(String s) {
+ if (s == null) {
+ return "";
+ }
+ // Remove trailing slashes for consistent comparison
+ String trimmed = s.replaceAll("/+$", "");
+ return trimmed.isEmpty() ? "" : trimmed;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index dd29204bccc..c151b36fcd3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -25,6 +25,7 @@ import org.apache.doris.backup.Status;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.common.util.PathUtils;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.fs.FileSystem;
@@ -1178,7 +1179,16 @@ public class HMSTransaction implements Transaction {
Table table = tableAndMore.getTable();
String targetPath = table.getSd().getLocation();
String writePath = tableAndMore.getCurrentLocation();
- if (!targetPath.equals(writePath)) {
+ // Determine if a rename operation is required for the output file.
+ // In the BE (Backend) implementation, all object storage systems
(e.g., AWS S3, MinIO, OSS, COS)
+ // are unified under the "s3" URI scheme, even if the actual
underlying storage uses a different protocol.
+ // The method PathUtils.equalsIgnoreSchemeIfOneIsS3(...) compares
two paths by ignoring the scheme
+ // if one of them uses the "s3" scheme, and only checks whether
the bucket name and object key match.
+ // This prevents unnecessary rename operations when the scheme
differs (e.g., "s3://" vs. "oss://")
+ // but the actual storage location is identical. If the paths
differ after ignoring the scheme,
+ // a rename operation will be performed.
+ boolean needRename =
!PathUtils.equalsIgnoreSchemeIfOneIsS3(targetPath, writePath);
+ if (needRename) {
wrapperAsyncRenameWithProfileSummary(
fileSystemExecutor,
asyncFileSystemTaskFutures,
@@ -1613,7 +1623,6 @@ public class HMSTransaction implements Transaction {
private void s3Commit(Executor fileSystemExecutor,
List<CompletableFuture<?>> asyncFileSystemTaskFutures,
AtomicBoolean fileSystemTaskCancelled, THivePartitionUpdate
hivePartitionUpdate, String path) {
-
List<TS3MPUPendingUpload> s3MpuPendingUploads =
hivePartitionUpdate.getS3MpuPendingUploads();
if (isMockedPartitionUpdate) {
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
index a152a21e1bf..3a437e9d69a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
@@ -115,9 +115,20 @@ public class MetastoreEventsProcessor extends MasterDaemon
{
CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (catalog instanceof HMSExternalCatalog) {
HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog)
catalog;
- if
(!hmsExternalCatalog.getHmsProperties().isHmsEventsIncrementalSyncEnabled()) {
+ try {
+ // Check if HMS incremental events synchronization is
enabled.
+ // In the past, this value was a constant and always
available.
+ // Now it is retrieved from HmsProperties, which requires
initialization.
+ // In some scenarios, essential HMS parameters may be
missing.
+ // If so, isHmsEventsIncrementalSyncEnabled() may throw
IllegalArgumentException.
+ if
(!hmsExternalCatalog.getHmsProperties().isHmsEventsIncrementalSyncEnabled()) {
+ continue;
+ }
+ } catch (IllegalArgumentException e) {
+ //ignore
continue;
}
+
try {
List<NotificationEvent> events =
getNextHMSEvents(hmsExternalCatalog);
if (!events.isEmpty()) {
@@ -218,7 +229,7 @@ public class MetastoreEventsProcessor extends MasterDaemon {
hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId,
batchSize, null);
LOG.info("CatalogName = {}, lastSyncedEventId = {}, currentEventId
= {},"
+ "batchSize = {}, getEventsSize = {}",
hmsExternalCatalog.getName(), lastSyncedEventId,
- currentEventId, batchSize,
notificationEventResponse.getEvents().size());
+ currentEventId, batchSize,
notificationEventResponse.getEvents().size());
return notificationEventResponse;
} catch (MetastoreNotificationFetchException e) {
@@ -237,7 +248,7 @@ public class MetastoreEventsProcessor extends MasterDaemon {
}
private NotificationEventResponse
getNextEventResponseForSlave(HMSExternalCatalog hmsExternalCatalog)
- throws Exception {
+ throws Exception {
long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog);
long masterLastSyncedEventId =
getMasterLastSyncedEventId(hmsExternalCatalog);
// do nothing if masterLastSyncedEventId has not been synced
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
index c47ff7248d1..e51292feff2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
@@ -21,7 +21,6 @@ import org.apache.doris.common.credentials.CloudCredential;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.datasource.iceberg.HiveCompatibleCatalog;
import org.apache.doris.datasource.iceberg.dlf.client.DLFCachedClientPool;
-import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
@@ -63,7 +62,7 @@ public class DLFCatalog extends HiveCompatibleCatalog {
properties.get(S3Properties.Env.TOKEN)));
}
String region = properties.getOrDefault(OssProperties.REGION,
properties.get(S3Properties.Env.REGION));
- boolean isUsePathStyle =
properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false")
+ boolean isUsePathStyle = properties.getOrDefault("use_path_style",
"false")
.equalsIgnoreCase("true");
// s3 file io just supports s3-like endpoint
String s3Endpoint = endpoint.replace(region, "s3." + region);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 0db54afc1a9..7eca8bba25e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -225,7 +225,7 @@ public class PaimonScanNode extends FileQueryScanNode {
fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
// The hadoop conf should be same with
// PaimonExternalCatalog.createCatalog()#getConfiguration()
-
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties());
+
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getBackendStorageProperties());
Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();
if (optDeletionFile.isPresent()) {
DeletionFile deletionFile = optDeletionFile.get();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AliyunDLFBaseProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AliyunDLFBaseProperties.java
index 543edce45f6..ca78929f405 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AliyunDLFBaseProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AliyunDLFBaseProperties.java
@@ -20,8 +20,11 @@ package org.apache.doris.datasource.property.metastore;
import org.apache.doris.datasource.property.ConnectorPropertiesUtils;
import org.apache.doris.datasource.property.ConnectorProperty;
import org.apache.doris.datasource.property.ParamRules;
+import
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
import com.aliyun.datalake.metastore.common.DataLakeConfig;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
import java.util.Map;
@@ -48,11 +51,12 @@ public class AliyunDLFBaseProperties {
description = "The region of the Aliyun DLF.")
protected String dlfEndpoint = "";
- @ConnectorProperty(names = {"dlf.uid", "dlf.catalog.uid"},
+ @ConnectorProperty(names = {"dlf.catalog.uid", "dlf.uid"},
description = "The uid of the Aliyun DLF.")
protected String dlfUid = "";
- @ConnectorProperty(names = {"dlf.catalog.id"},
+ @ConnectorProperty(names = {"dlf.catalog.id", "dlf.catalog_id"},
+ required = false,
description = "The catalog id of the Aliyun DLF. If not set, it
will be the same as dlf.uid.")
protected String dlfCatalogId = "";
@@ -61,7 +65,7 @@ public class AliyunDLFBaseProperties {
description = "Enable public access to Aliyun DLF.")
protected String dlfAccessPublic = "false";
- @ConnectorProperty(names = {DataLakeConfig.CATALOG_PROXY_MODE},
+ @ConnectorProperty(names = {DataLakeConfig.CATALOG_PROXY_MODE,
"dlf.proxy.mode"},
required = false,
description = "The proxy mode of the Aliyun DLF. Default is
DLF_ONLY.")
protected String dlfProxyMode = "DLF_ONLY";
@@ -69,20 +73,32 @@ public class AliyunDLFBaseProperties {
public static AliyunDLFBaseProperties of(Map<String, String> properties) {
AliyunDLFBaseProperties propertiesObj = new AliyunDLFBaseProperties();
ConnectorPropertiesUtils.bindConnectorProperties(propertiesObj,
properties);
+ propertiesObj.checkAndInit();
return propertiesObj;
}
-
private ParamRules buildRules() {
return new ParamRules()
.require(dlfAccessKey, "dlf.access_key is required")
- .require(dlfSecretKey, "dlf.secret_key is required")
- .require(dlfEndpoint, "dlf.endpoint is required");
+ .require(dlfSecretKey, "dlf.secret_key is required");
}
public void checkAndInit() {
buildRules().validate();
+ if (StringUtils.isBlank(dlfEndpoint) &&
StringUtils.isNotBlank(dlfRegion)) {
+ if (BooleanUtils.toBoolean(dlfAccessPublic)) {
+ dlfEndpoint = "dlf." + dlfRegion + ".aliyuncs.com";
+ } else {
+ dlfEndpoint = "dlf-vpc." + dlfRegion + ".aliyuncs.com";
+ }
+ }
+ if (StringUtils.isBlank(dlfEndpoint)) {
+ throw new StoragePropertiesException("dlf.endpoint is required.");
+ }
+ if (StringUtils.isBlank(dlfCatalogId)) {
+ this.dlfCatalogId = dlfUid;
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSAliyunDLFMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSAliyunDLFMetaStoreProperties.java
index 2a7dc6a12d2..aabf57bbeba 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSAliyunDLFMetaStoreProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSAliyunDLFMetaStoreProperties.java
@@ -17,6 +17,8 @@
package org.apache.doris.datasource.property.metastore;
+import org.apache.doris.datasource.property.storage.OSSProperties;
+
import com.aliyun.datalake.metastore.common.DataLakeConfig;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -26,6 +28,8 @@ public class HMSAliyunDLFMetaStoreProperties extends
AbstractHMSProperties {
private AliyunDLFBaseProperties baseProperties;
+ private OSSProperties ossProperties;
+
public HMSAliyunDLFMetaStoreProperties(Map<String, String> origProps) {
super(Type.DLF, origProps);
}
@@ -33,6 +37,7 @@ public class HMSAliyunDLFMetaStoreProperties extends
AbstractHMSProperties {
@Override
public void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
+ ossProperties = OSSProperties.of(origProps);
baseProperties = AliyunDLFBaseProperties.of(origProps);
initHiveConf();
}
@@ -41,6 +46,7 @@ public class HMSAliyunDLFMetaStoreProperties extends
AbstractHMSProperties {
// @see com.aliyun.datalake.metastore.hive.common.utils.ConfigUtils
// todo support other parameters
hiveConf = new HiveConf();
+ hiveConf.addResource(ossProperties.hadoopStorageConfig);
hiveConf.set(DataLakeConfig.CATALOG_ACCESS_KEY_ID,
baseProperties.dlfAccessKey);
hiveConf.set(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET,
baseProperties.dlfSecretKey);
hiveConf.set(DataLakeConfig.CATALOG_ENDPOINT,
baseProperties.dlfEndpoint);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSBaseProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSBaseProperties.java
index dcfbed77cf3..c4a7661ad51 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSBaseProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSBaseProperties.java
@@ -51,7 +51,7 @@ public class HMSBaseProperties {
description = "The conf resources of the hive metastore.")
private String hiveConfResourcesConfig = "";
- @ConnectorProperty(names = {"hive.metastore.service.principal"},
+ @ConnectorProperty(names = {"hive.metastore.service.principal",
"hive.metastore.kerberos.principal"},
required = false,
description = "The service principal of the hive metastore.")
private String hiveMetastoreServicePrincipal = "";
@@ -134,7 +134,11 @@ public class HMSBaseProperties {
* strongly recommended.
*/
private void initHadoopAuthenticator() {
+ if (StringUtils.isNotBlank(hiveMetastoreServicePrincipal)) {
+ hiveConf.set("hive.metastore.kerberos.principal",
hiveMetastoreServicePrincipal);
+ }
if (this.hiveMetastoreAuthenticationType.equalsIgnoreCase("kerberos"))
{
+ hiveConf.set("hadoop.security.authentication", "kerberos");
KerberosAuthenticationConfig authenticationConfig = new
KerberosAuthenticationConfig(
this.hiveMetastoreClientPrincipal,
this.hiveMetastoreClientKeytab, hiveConf);
this.hmsAuthenticator =
HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
@@ -170,10 +174,10 @@ public class HMSBaseProperties {
this.hiveConf = loadHiveConfFromFile(hiveConfResourcesConfig);
initUserHiveConfig(origProps);
userOverriddenHiveConfig.forEach(hiveConf::set);
- initHadoopAuthenticator();
hiveConf.set("hive.metastore.uris", hiveMetastoreUri);
HiveConf.setVar(hiveConf,
HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT,
String.valueOf(Config.hive_metastore_client_timeout_second));
+ initHadoopAuthenticator();
}
private void initUserHiveConfig(Map<String, String> origProps) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java
index 1a4baab1da7..c1d6124cfd0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java
@@ -46,11 +46,6 @@ public class IcebergAliyunDLFMetaStoreProperties extends
AbstractIcebergProperti
return IcebergExternalCatalog.ICEBERG_DLF;
}
-
- private void initconf() {
-
- }
-
@Override
public Catalog initializeCatalog(String catalogName,
List<StorageProperties> storagePropertiesList) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
index fd66d0b25ec..995b76c2162 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
@@ -17,10 +17,12 @@
package org.apache.doris.datasource.property.metastore;
+import
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.property.storage.HdfsProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
@@ -48,7 +50,16 @@ public class IcebergFileSystemMetaStoreProperties extends
AbstractIcebergPropert
HadoopCatalog catalog = new HadoopCatalog();
catalog.setConf(configuration);
- catalog.initialize(catalogName, catalogProps);
+ try {
+ this.executionAuthenticator.execute(() -> {
+ catalog.initialize(catalogName, catalogProps);
+ return null;
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to initialize iceberg
filesystem catalog: "
+ + ExceptionUtils.getRootCauseMessage(e), e);
+ }
+
return catalog;
}
@@ -70,6 +81,7 @@ public class IcebergFileSystemMetaStoreProperties extends
AbstractIcebergPropert
if (hdfsProps.isKerberos()) {
props.put(CatalogProperties.FILE_IO_IMPL,
"org.apache.doris.datasource.iceberg.fileio.DelegateFileIO");
+ this.executionAuthenticator = new
HadoopExecutionAuthenticator(hdfsProps.getHadoopAuthenticator());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
index 2d2179495b3..ee589ba76fc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
@@ -72,6 +72,11 @@ public class IcebergHMSMetaStoreProperties extends
AbstractIcebergProperties {
HiveCatalog hiveCatalog = new HiveCatalog();
hiveCatalog.setConf(conf);
+ storagePropertiesList.forEach(sp -> {
+ for (Map.Entry<String, String> entry :
sp.getHadoopStorageConfig()) {
+ catalogProps.put(entry.getKey(), entry.getValue());
+ }
+ });
try {
this.executionAuthenticator.execute(() ->
hiveCatalog.initialize(catalogName, catalogProps));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
index 4726fd53ca0..f57bc87b6e1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
@@ -176,10 +176,7 @@ public abstract class AbstractS3CompatibleProperties
extends StorageProperties i
@Override
public void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
- setEndpointIfNotSet();
- if (!isValidEndpoint(getEndpoint())) {
- throw new IllegalArgumentException("Invalid endpoint format: " +
getEndpoint());
- }
+ checkEndpoint();
checkRequiredProperties();
initRegionIfNecessary();
if (StringUtils.isBlank(getRegion())) {
@@ -187,6 +184,29 @@ public abstract class AbstractS3CompatibleProperties
extends StorageProperties i
}
}
+ /**
+ * Checks and validates the configured endpoint.
+ * <p>
+ * All object storage implementations must have an explicitly set endpoint.
+ * However, for compatibility with legacy behavior—especially when using
DLF
+ * as the catalog—some logic may derive the endpoint based on the region.
+ * <p>
+ * To support such cases, this method is exposed as {@code protected} to
allow
+ * subclasses to override it with custom logic if necessary.
+ * <p>
+ * That said, we strongly recommend users to explicitly configure both
+ * {@code endpoint} and {@code region} to ensure predictable behavior
+ * across all storage backends.
+ *
+ * @throws IllegalArgumentException if the endpoint format is invalid
+ */
+ protected void checkEndpoint() {
+ setEndpointIfNotSet();
+ if (!isValidEndpoint(getEndpoint())) {
+ throw new IllegalArgumentException("Invalid endpoint format: " +
getEndpoint());
+ }
+ }
+
private void initRegionIfNecessary() {
if (StringUtils.isNotBlank(getRegion())) {
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
index 6fea7cf18ec..0e0f37e5566 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
@@ -130,7 +130,10 @@ public class COSProperties extends
AbstractS3CompatibleProperties {
@Override
public void initializeHadoopStorageConfig() {
super.initializeHadoopStorageConfig();
- hadoopStorageConfig.set("fs.cos.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
- hadoopStorageConfig.set("fs.cosn.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+ hadoopStorageConfig.set("fs.cos.impl",
"org.apache.hadoop.fs.CosFileSystem");
+ hadoopStorageConfig.set("fs.cosn.impl",
"org.apache.hadoop.fs.CosFileSystem");
+ hadoopStorageConfig.set("fs.cosn.bucket.region", region);
+ hadoopStorageConfig.set("fs.cosn.userinfo.secretId", accessKey);
+ hadoopStorageConfig.set("fs.cosn.userinfo.secretKey", secretKey);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
index 513727416ef..25f1bde5822 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
@@ -136,5 +136,8 @@ public class OBSProperties extends
AbstractS3CompatibleProperties {
public void initializeHadoopStorageConfig() {
super.initializeHadoopStorageConfig();
hadoopStorageConfig.set("fs.obs.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+ hadoopStorageConfig.set("fs.obs.access.key", accessKey);
+ hadoopStorageConfig.set("fs.obs.secret.key", secretKey);
+ hadoopStorageConfig.set("fs.obs.endpoint", endpoint);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
index 7b27a1ea160..9031f4e4ef9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
@@ -17,17 +17,20 @@
package org.apache.doris.datasource.property.storage;
+import org.apache.doris.datasource.property.ConnectorPropertiesUtils;
import org.apache.doris.datasource.property.ConnectorProperty;
import
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
-import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
+import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -66,6 +69,11 @@ public class OSSProperties extends
AbstractS3CompatibleProperties {
description = "The region of OSS.")
protected String region;
+ @ConnectorProperty(names = {"dlf.access.public",
"dlf.catalog.accessPublic"},
+ required = false,
+ description = "Enable public access to Aliyun DLF.")
+ protected String dlfAccessPublic = "false";
+
/**
* Pattern to extract the region from an Alibaba Cloud OSS endpoint.
* <p>
@@ -87,10 +95,20 @@ public class OSSProperties extends
AbstractS3CompatibleProperties {
Pattern.compile("(?:https?://)?([a-z]{2}-[a-z0-9-]+)\\.oss-dls\\.aliyuncs\\.com"),
Pattern.compile("^(?:https?://)?dlf(?:-vpc)?\\.([a-z0-9-]+)\\.aliyuncs\\.com(?:/.*)?$"));
+ private static final List<String> URI_KEYWORDS = Arrays.asList("uri",
"warehouse");
+
protected OSSProperties(Map<String, String> origProps) {
super(Type.OSS, origProps);
}
+ public static OSSProperties of(Map<String, String> properties) {
+ OSSProperties propertiesObj = new OSSProperties(properties);
+ ConnectorPropertiesUtils.bindConnectorProperties(propertiesObj,
properties);
+ propertiesObj.initNormalizeAndCheckProps();
+ propertiesObj.initializeHadoopStorageConfig();
+ return propertiesObj;
+ }
+
protected static boolean guessIsMe(Map<String, String> origProps) {
String value = Stream.of("oss.endpoint", "s3.endpoint",
"AWS_ENDPOINT", "endpoint", "ENDPOINT",
"dlf.endpoint", "dlf.catalog.endpoint")
@@ -98,12 +116,16 @@ public class OSSProperties extends
AbstractS3CompatibleProperties {
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
- if (!Strings.isNullOrEmpty(value)) {
+ if (StringUtils.isNotBlank(value)) {
return (value.contains("aliyuncs.com"));
}
+
Optional<String> uriValue = origProps.entrySet().stream()
- .filter(e -> e.getKey().equalsIgnoreCase("uri"))
+ .filter(e -> URI_KEYWORDS.stream()
+ .anyMatch(key -> key.equalsIgnoreCase(e.getKey())))
.map(Map.Entry::getValue)
+ .filter(Objects::nonNull)
+ .filter(OSSProperties::isKnownObjectStorage)
.findFirst();
return
uriValue.filter(OSSProperties::isKnownObjectStorage).isPresent();
}
@@ -112,6 +134,9 @@ public class OSSProperties extends
AbstractS3CompatibleProperties {
if (value == null) {
return false;
}
+ if (value.startsWith("oss://")) {
+ return true;
+ }
if (!value.contains("aliyuncs.com")) {
return false;
}
@@ -121,12 +146,33 @@ public class OSSProperties extends
AbstractS3CompatibleProperties {
return isAliyunOss || isAmazonS3 || isDls;
}
+ @Override
+ protected void checkEndpoint() {
+ if (StringUtils.isBlank(this.endpoint) &&
StringUtils.isNotBlank(this.region)) {
+ Optional<String> uriValueOpt = origProps.entrySet().stream()
+ .filter(e -> URI_KEYWORDS.stream()
+ .anyMatch(key -> key.equalsIgnoreCase(e.getKey())))
+ .map(Map.Entry::getValue)
+ .filter(Objects::nonNull)
+ .filter(OSSProperties::isKnownObjectStorage)
+ .findFirst();
+ if (uriValueOpt.isPresent()) {
+ String uri = uriValueOpt.get();
+ // If the URI does not start with http(s), derive endpoint
from region
+ // (http(s) URIs are handled by separate logic elsewhere)
+ if (!uri.startsWith("http://") && !uri.startsWith("https://"))
{
+ this.endpoint = getOssEndpoint(region,
BooleanUtils.toBoolean(dlfAccessPublic));
+ }
+ }
+ }
+ super.checkEndpoint();
+ }
+
@Override
public void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
if (endpoint.contains("dlf") || endpoint.contains("oss-dls")) {
- String publicAccess =
origProps.getOrDefault("dlf.catalog.accessPublic", "false");
- this.endpoint = getOssEndpoint(region,
Boolean.parseBoolean(publicAccess));
+ this.endpoint = getOssEndpoint(region,
BooleanUtils.toBoolean(dlfAccessPublic));
}
// Check if credentials are provided properly - either both or neither
if (StringUtils.isNotBlank(accessKey) &&
StringUtils.isNotBlank(secretKey)) {
@@ -172,6 +218,9 @@ public class OSSProperties extends
AbstractS3CompatibleProperties {
@Override
public void initializeHadoopStorageConfig() {
super.initializeHadoopStorageConfig();
- hadoopStorageConfig.set("fs.oss.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+ hadoopStorageConfig.set("fs.oss.impl",
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+ hadoopStorageConfig.set("fs.oss.accessKeyId", accessKey);
+ hadoopStorageConfig.set("fs.oss.accessKeySecret", secretKey);
+ hadoopStorageConfig.set("fs.oss.endpoint", endpoint);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index 0a696f64340..a1957fbc906 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -22,7 +22,6 @@ package org.apache.doris.planner;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
@@ -125,23 +124,17 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
setSerDeProperties(tSink);
THiveLocationParams locationParams = new THiveLocationParams();
- LocationPath locationPath = null;
- try {
- locationPath = LocationPath.of(sd.getLocation(),
targetTable.getStoragePropertiesMap(), false);
- } catch (UserException e) {
- throw new RuntimeException(e);
- }
- String location = locationPath.getPath().toString();
- String storageLocation = locationPath.toStorageLocation().toString();
+ LocationPath locationPath = LocationPath.of(sd.getLocation(),
targetTable.getStoragePropertiesMap());
+ String location = sd.getLocation();
TFileType fileType = locationPath.getTFileTypeForBE();
if (fileType == TFileType.FILE_S3) {
- locationParams.setWritePath(storageLocation);
- locationParams.setOriginalWritePath(location);
- locationParams.setTargetPath(location);
+ locationParams.setWritePath(locationPath.getNormalizedLocation());
+
locationParams.setOriginalWritePath(locationPath.getNormalizedLocation());
+ locationParams.setTargetPath(locationPath.getNormalizedLocation());
if (insertCtx.isPresent()) {
HiveInsertCommandContext context = (HiveInsertCommandContext)
insertCtx.get();
tSink.setOverwrite(context.isOverwrite());
- context.setWritePath(location);
+ context.setWritePath(locationPath.getNormalizedLocation());
context.setFileType(fileType);
}
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
index ad11e296082..d9ccc392084 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
@@ -109,7 +109,7 @@ public class IcebergTableValuedFunction extends
MetadataTableValuedFunction {
throw new AnalysisException("Catalog " + icebergTableName.getCtl()
+ " is not an external catalog");
}
ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
- hadoopProps =
externalCatalog.getCatalogProperty().getHadoopProperties();
+ hadoopProps =
externalCatalog.getCatalogProperty().getBackendStorageProperties();
preExecutionAuthenticator =
externalCatalog.getExecutionAuthenticator();
TableIf dorisTable =
externalCatalog.getDbOrAnalysisException(icebergTableName.getDb())
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/PathUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/PathUtilsTest.java
new file mode 100644
index 00000000000..86a007e9b4c
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/PathUtilsTest.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PathUtilsTest {
+
+ @Test
+ public void testEqualsIgnoreScheme_sameHostAndPath() {
+ Assertions.assertTrue(PathUtils.equalsIgnoreSchemeIfOneIsS3(
+ "s3://my-bucket/data/file.txt",
+ "cos://my-bucket/data/file.txt"
+ ));
+
+ Assertions.assertFalse(PathUtils.equalsIgnoreSchemeIfOneIsS3(
+ "oss://bucket/path/",
+ "obs://bucket/path"
+ ));
+ Assertions.assertFalse(PathUtils.equalsIgnoreSchemeIfOneIsS3(
+ "s3://bucket/abc/path/",
+ "obs://bucket/path"
+ ));
+
+ Assertions.assertFalse(PathUtils.equalsIgnoreSchemeIfOneIsS3(
+ "hdfs://namenode/user/hadoop",
+ "file:///user/hadoop"
+ ));
+ }
+
+ @Test
+ public void testEqualsIgnoreScheme_differentHost() {
+ Assertions.assertFalse(PathUtils.equalsIgnoreSchemeIfOneIsS3(
+ "s3://bucket-a/data/file.txt",
+ "cos://bucket-b/data/file.txt"
+ ));
+
+ Assertions.assertFalse(PathUtils.equalsIgnoreSchemeIfOneIsS3(
+ "hdfs://namenode1/path",
+ "hdfs://namenode2/path"
+ ));
+ }
+
+ @Test
+ public void testEqualsIgnoreScheme_trailingSlash() {
+ Assertions.assertFalse(PathUtils.equalsIgnoreSchemeIfOneIsS3(
+ "oss://bucket/data/",
+ "oss://bucket/data"
+ ));
+
+ Assertions.assertFalse(PathUtils.equalsIgnoreSchemeIfOneIsS3(
+ "hdfs://namenode/user/hadoop/",
+ "hdfs://namenode/user/hadoop"
+ ));
+ }
+
+ @Test
+ public void testEqualsIgnoreScheme_invalidURI() {
+ // Special characters that break URI parsing
+ Assertions.assertFalse(PathUtils.equalsIgnoreSchemeIfOneIsS3(
+ "s3://bucket/data file.txt",
+ "cos://bucket/data file.txt"
+ ));
+
+ Assertions.assertFalse(PathUtils.equalsIgnoreSchemeIfOneIsS3(
+ "s3://bucket/data file.txt",
+ "cos://bucket/other file.txt"
+ ));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AliyunDLFBasePropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AliyunDLFBasePropertiesTest.java
new file mode 100644
index 00000000000..8e6324b0e3f
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AliyunDLFBasePropertiesTest.java
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class AliyunDLFBasePropertiesTest {
+
+ @Test
+ void testAutoGenerateEndpointWithPublicAccess() {
+ Map<String, String> props = new HashMap<>();
+ props.put("dlf.access_key", "ak");
+ props.put("dlf.secret_key", "sk");
+ props.put("dlf.region", "cn-hangzhou");
+ props.put("dlf.access.public", "true");
+
+ AliyunDLFBaseProperties dlfProps = AliyunDLFBaseProperties.of(props);
+ Assertions.assertEquals("dlf.cn-hangzhou.aliyuncs.com",
dlfProps.dlfEndpoint);
+ }
+
+ @Test
+ void testAutoGenerateEndpointWithVpcAccess() {
+ Map<String, String> props = new HashMap<>();
+ props.put("dlf.access_key", "ak");
+ props.put("dlf.secret_key", "sk");
+ props.put("dlf.region", "cn-hangzhou");
+ props.put("dlf.access.public", "false");
+
+ AliyunDLFBaseProperties dlfProps = AliyunDLFBaseProperties.of(props);
+ Assertions.assertEquals("dlf-vpc.cn-hangzhou.aliyuncs.com",
dlfProps.dlfEndpoint);
+ }
+
+ @Test
+ void testExplicitEndpointOverridesAutoGeneration() {
+ Map<String, String> props = new HashMap<>();
+ props.put("dlf.access_key", "ak");
+ props.put("dlf.secret_key", "sk");
+ props.put("dlf.region", "cn-beijing");
+ props.put("dlf.endpoint", "custom.endpoint.com");
+
+ AliyunDLFBaseProperties dlfProps = AliyunDLFBaseProperties.of(props);
+ Assertions.assertEquals("custom.endpoint.com", dlfProps.dlfEndpoint);
+ }
+
+ @Test
+ void testMissingEndpointAndRegionThrowsException() {
+ Map<String, String> props = new HashMap<>();
+ props.put("dlf.access_key", "ak");
+ props.put("dlf.secret_key", "sk");
+
+ StoragePropertiesException ex = Assertions.assertThrows(
+ StoragePropertiesException.class,
+ () -> AliyunDLFBaseProperties.of(props)
+ );
+ Assertions.assertEquals("dlf.endpoint is required.", ex.getMessage());
+ }
+
+ @Test
+ void testMissingAccessKeyThrowsException() {
+ Map<String, String> props = new HashMap<>();
+ props.put("dlf.secret_key", "sk");
+ props.put("dlf.endpoint", "custom.endpoint.com");
+
+ Exception ex = Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> AliyunDLFBaseProperties.of(props)
+ );
+ Assertions.assertTrue(ex.getMessage().contains("dlf.access_key is
required"));
+ }
+
+ @Test
+ void testMissingSecretKeyThrowsException() {
+ Map<String, String> props = new HashMap<>();
+ props.put("dlf.access_key", "ak");
+ props.put("dlf.endpoint", "custom.endpoint.com");
+
+ Exception ex = Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> AliyunDLFBaseProperties.of(props)
+ );
+ Assertions.assertTrue(ex.getMessage().contains("dlf.secret_key is
required"));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AliyunDLFPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AliyunDLFPropertiesTest.java
deleted file mode 100644
index 3aa7d55b560..00000000000
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AliyunDLFPropertiesTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.property.metastore;
-
-import org.apache.doris.common.UserException;
-
-import org.apache.paimon.options.Options;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Disabled("wait metastore integration")
-public class AliyunDLFPropertiesTest {
- private static Map<String, String> baseProps;
-
- @BeforeAll
- public static void init() {
- baseProps = new HashMap<>();
- baseProps.put("paimon.catalog.type", "DLF");
- baseProps.put("dlf.access_key", "my-access-key");
- baseProps.put("dlf.secret_key", "my-secret-key");
- baseProps.put("dlf.region", "cn-hangzhou");
- baseProps.put("dlf.uid", "uid123");
- baseProps.put("dlf.access.public", "true");
- baseProps.put("dlf.extra.config", "extraValue");
- baseProps.put("not.dlf.key", "ignoreMe");
- }
-
- @Test
- public void testConstructor_shouldCaptureOnlyDlfPrefixedProps() throws
UserException {
- AliyunDLFProperties props = (AliyunDLFProperties)
MetastoreProperties.create(baseProps);
- Map<String, String> others = props.getOtherDlfProps();
- Assertions.assertTrue(others.containsKey("dlf.extra.config"));
- Assertions.assertFalse(others.containsKey("not.dlf.key"));
- }
-
- @Test
- public void testToPaimonOptions_withExplicitEndpoint() throws
UserException {
- baseProps.put("dlf.endpoint", "explicit.endpoint.aliyun.com");
-
- AliyunDLFProperties props = (AliyunDLFProperties)
MetastoreProperties.create(baseProps);
- Options options = new Options();
- props.toPaimonOptions(options);
-
- Assertions.assertEquals("explicit.endpoint.aliyun.com",
options.get("dlf.catalog.endpoint"));
- Assertions.assertEquals("my-access-key",
options.get("dlf.catalog.accessKeyId"));
- Assertions.assertEquals("my-secret-key",
options.get("dlf.catalog.accessKeySecret"));
- Assertions.assertEquals("cn-hangzhou",
options.get("dlf.catalog.region"));
- Assertions.assertEquals("uid123", options.get("dlf.catalog.uid"));
- Assertions.assertEquals("true",
options.get("dlf.catalog.accessPublic"));
- Assertions.assertEquals("DLF_ONLY",
options.get("dlf.catalog.proxyMode"));
- Assertions.assertEquals("false",
options.get("dlf.catalog.createDefaultDBIfNotExist"));
-
- // extra config
- Assertions.assertEquals("extraValue", options.get("dlf.extra.config"));
- }
-
- @Test
- public void testToPaimonOptions_publicAccess() throws UserException {
- baseProps.remove("dlf.endpoint");
- baseProps.put("dlf.access.public", "TrUe"); // 测试大小写
-
- AliyunDLFProperties props = (AliyunDLFProperties)
MetastoreProperties.create(baseProps);
-
- Options options = new Options();
- props.toPaimonOptions(options);
-
- Assertions.assertEquals("dlf.cn-hangzhou.aliyuncs.com",
options.get("dlf.catalog.endpoint"));
- }
-
- @Test
- public void testToPaimonOptions_privateVpcAccess() throws UserException {
- baseProps.remove("dlf.endpoint");
- baseProps.put("dlf.access.public", "true");
-
- AliyunDLFProperties props = (AliyunDLFProperties)
MetastoreProperties.create(baseProps);
- Options options = new Options();
- props.toPaimonOptions(options);
-
- Assertions.assertEquals("dlf.cn-hangzhou.aliyuncs.com",
options.get("dlf.catalog.endpoint"));
- }
-
- @Test
- public void testToPaimonOptions_defaultVpcWhenPublicMissing() throws
UserException {
- baseProps.remove("dlf.endpoint");
- baseProps.put("dlf.access.public", "false");
-
- AliyunDLFProperties props = (AliyunDLFProperties)
MetastoreProperties.create(baseProps);
-
- Options options = new Options();
- props.toPaimonOptions(options);
-
- Assertions.assertEquals("dlf-vpc.cn-hangzhou.aliyuncs.com",
options.get("dlf.catalog.endpoint"));
- }
-
- @Test
- public void testToPaimonOptions_emptyConstructor() throws UserException {
- AliyunDLFProperties props = (AliyunDLFProperties)
MetastoreProperties.create(baseProps);
-
-
- Options options = new Options();
- props.toPaimonOptions(options);
- // 检查关键字段存在
- Assertions.assertEquals("DLF_ONLY",
options.get("dlf.catalog.proxyMode"));
- Assertions.assertEquals("false",
options.get("dlf.catalog.createDefaultDBIfNotExist"));
- }
-
- @Test
- public void testGetResourceConfigPropName() {
- AliyunDLFProperties props = new AliyunDLFProperties(baseProps);
- Assertions.assertEquals("dlf.resource_config",
props.getResourceConfigPropName());
- }
-}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
index 0df7c5d6be9..ede08f399fc 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
@@ -145,7 +145,7 @@ public class OSSPropertiesTest {
origProps.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
origProps.put("oss.secret_key", "myOSSSecretKey");
Assertions.assertThrows(StoragePropertiesException.class, () ->
StorageProperties.createPrimary(origProps),
- "Please set access_key and secret_key or omit both for
anonymous access to public bucket.");
+ "Please set access_key and secret_key or omit both for
anonymous access to public bucket.");
}
@Test
@@ -154,6 +154,22 @@ public class OSSPropertiesTest {
origProps.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
origProps.put("oss.access_key", "myOSSAccessKey");
Assertions.assertThrows(StoragePropertiesException.class, () ->
StorageProperties.createPrimary(origProps),
- "Please set access_key and secret_key or omit both for
anonymous access to public bucket.");
+ "Please set access_key and secret_key or omit both for
anonymous access to public bucket.");
+ }
+
+ @Test
+ public void testNotEndpoint() throws UserException {
+ Map<String, String> origProps = new HashMap<>();
+ origProps.put("uri", "oss://examplebucket-1250000000/test/file.txt");
+ origProps.put("oss.access_key", "myOSSAccessKey");
+ origProps.put("oss.secret_key", "myOSSSecretKey");
+ origProps.put("oss.region", "cn-hangzhou");
+ Assertions.assertEquals("oss-cn-hangzhou-internal.aliyuncs.com",
+ ((OSSProperties)
StorageProperties.createPrimary(origProps)).getEndpoint());
+ origProps.put("dlf.access.public", "true");
+ Assertions.assertEquals("oss-cn-hangzhou.aliyuncs.com",
+ ((OSSProperties)
StorageProperties.createPrimary(origProps)).getEndpoint());
+ origProps.put("uri",
"https://doris-regression-hk.oss-cn-hangzhou-internal.aliyuncs.com/regression/datalake/pipeline_data/data_page_v2_gzip.parquet");
+ Assertions.assertEquals("oss-cn-hangzhou-internal.aliyuncs.com",
((OSSProperties) StorageProperties.createPrimary(origProps)).getEndpoint());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
index ea2ff562bb4..c8bc82e67a1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
@@ -19,13 +19,15 @@ package org.apache.doris.planner;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
+import org.apache.doris.common.util.PathUtils;
import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.ThriftHMSCachedClient;
+import org.apache.doris.datasource.property.storage.StorageProperties;
import mockit.Mock;
import mockit.MockUp;
@@ -40,13 +42,16 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
public class HiveTableSinkTest {
@Test
- public void testBindDataSink() throws AnalysisException {
+ public void testBindDataSink() throws UserException {
new MockUp<ThriftHMSCachedClient>() {
@Mock
@@ -67,7 +72,26 @@ public class HiveTableSinkTest {
};
}
};
+ Map<String, String> storageProperties = new HashMap<>();
+ storageProperties.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
+ storageProperties.put("oss.access_key", "access_key");
+ storageProperties.put("oss.secret_key", "secret_key");
+ storageProperties.put("s3.endpoint", "s3.north-1.amazonaws.com");
+ storageProperties.put("s3.access_key", "access_key");
+ storageProperties.put("s3.secret_key", "secret_key");
+ storageProperties.put("cos.endpoint", "cos.cn-hangzhou.myqcloud.com");
+ storageProperties.put("cos.access_key", "access_key");
+ storageProperties.put("cos.secret_key", "secret_key");
+ List<StorageProperties> storagePropertiesList =
StorageProperties.createAll(storageProperties);
+ Map<StorageProperties.Type, StorageProperties> storagePropertiesMap =
storagePropertiesList.stream()
+ .collect(Collectors.toMap(StorageProperties::getType,
Function.identity()));
+ new MockUp<HMSExternalTable>() {
+ @Mock
+ public Map<StorageProperties.Type, StorageProperties>
getStoragePropertiesMap() {
+ return storagePropertiesMap;
+ }
+ };
new MockUp<HMSExternalCatalog>() {
@Mock
public HMSCachedClient getClient() {
@@ -93,9 +117,7 @@ public class HiveTableSinkTest {
HMSExternalTable tbl = new HMSExternalTable(10001, "hive_tbl1",
"hive_db1", hmsExternalCatalog, db);
HiveTableSink hiveTableSink = new HiveTableSink(tbl);
hiveTableSink.bindDataSink(Optional.empty());
-
-
Assert.assertEquals(hiveTableSink.tDataSink.hive_table_sink.location.original_write_path,
location);
-
Assert.assertEquals(hiveTableSink.tDataSink.hive_table_sink.location.target_path,
location);
+
Assert.assertTrue(PathUtils.equalsIgnoreSchemeIfOneIsS3(hiveTableSink.tDataSink.hive_table_sink.location.write_path,
location));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]