This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 1be903b0bdd branch-3.1: [Feat](params-refactor): Refactor Paimon
Metastore parameter integration (#54114) (#54262)
1be903b0bdd is described below
commit 1be903b0bddcd57c777e490150eebbd26dfde09e
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Aug 4 14:14:39 2025 +0800
branch-3.1: [Feat](params-refactor): Refactor Paimon Metastore parameter
integration (#54114) (#54262)
#54114
---
.../java/org/apache/doris/catalog/Resource.java | 16 +-
.../org/apache/doris/datasource/CatalogIf.java | 7 +-
.../org/apache/doris/datasource/CatalogLog.java | 1 +
.../org/apache/doris/datasource/CatalogMgr.java | 17 --
.../apache/doris/datasource/CatalogProperty.java | 173 +++++++++++--------
.../apache/doris/datasource/ExternalCatalog.java | 5 -
.../doris/datasource/hive/HMSExternalTable.java | 10 +-
.../doris/datasource/hive/source/HiveScanNode.java | 4 +-
.../doris/datasource/hudi/source/HudiScanNode.java | 4 +-
.../datasource/iceberg/IcebergExternalCatalog.java | 9 -
.../doris/datasource/jdbc/JdbcExternalCatalog.java | 2 +-
.../paimon/PaimonDLFExternalCatalog.java | 26 ---
.../datasource/paimon/PaimonExternalCatalog.java | 154 ++++++-----------
.../paimon/PaimonExternalCatalogFactory.java | 6 +-
.../paimon/PaimonFileExternalCatalog.java | 65 -------
.../paimon/PaimonHMSExternalCatalog.java | 36 ----
.../datasource/paimon/source/PaimonScanNode.java | 16 +-
.../metastore/AbstractPaimonProperties.java | 188 +++++++++++++++++++++
.../property/metastore/MetastoreProperties.java | 2 +
.../PaimonAliyunDLFMetaStoreProperties.java | 123 ++++++++++++++
.../PaimonFileSystemMetaStoreProperties.java | 76 +++++++++
.../metastore/PaimonHMSMetaStoreProperties.java | 117 +++++++++++++
.../metastore/PaimonPropertiesFactory.java | 37 ++++
.../storage/AbstractS3CompatibleProperties.java | 20 ++-
.../datasource/property/storage/COSProperties.java | 16 +-
.../property/storage/MinioProperties.java | 15 --
.../datasource/property/storage/OBSProperties.java | 11 +-
.../datasource/property/storage/OSSProperties.java | 13 +-
.../datasource/property/storage/S3Properties.java | 53 ++----
.../trees/plans/logical/LogicalHudiScan.java | 2 +-
.../org/apache/doris/planner/HiveTableSink.java | 2 +-
31 files changed, 751 insertions(+), 475 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
index a56c15a8f1e..31219e2a1df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
@@ -25,7 +25,6 @@ import org.apache.doris.common.io.DeepCopy;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
-import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -286,20 +285,7 @@ public abstract class Resource implements Writable,
GsonPostProcessable {
private void notifyUpdate(Map<String, String> properties) {
references.entrySet().stream().collect(Collectors.groupingBy(Entry::getValue)).forEach((type,
refs) -> {
if (type == ReferenceType.CATALOG) {
- for (Map.Entry<String, ReferenceType> ref : refs) {
- String catalogName =
ref.getKey().split(REFERENCE_SPLIT)[0];
- CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
- if (catalog == null) {
- LOG.warn("Can't find the reference catalog {} for
resource {}", catalogName, name);
- continue;
- }
- if (!name.equals(catalog.getResource())) {
- LOG.warn("Failed to update catalog {} for different
resource "
- + "names(resource={}, catalog.resource={})",
catalogName, name, catalog.getResource());
- continue;
- }
- catalog.notifyPropertiesUpdated(properties);
- }
+ // No longer support resource in Catalog.
}
});
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
index 50250b62e16..646510e01d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
@@ -35,7 +35,6 @@ import
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo
import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -88,10 +87,6 @@ public interface CatalogIf<T extends DatabaseIf> {
Map<String, String> getProperties();
- default String getResource() {
- return null;
- }
-
default void notifyPropertiesUpdated(Map<String, String> updatedProps) {
if (this instanceof ExternalCatalog) {
((ExternalCatalog) this).resetToUninitialized(false);
@@ -174,7 +169,7 @@ public interface CatalogIf<T extends DatabaseIf> {
CatalogLog log = new CatalogLog();
log.setCatalogId(getId());
log.setCatalogName(getName());
- log.setResource(Strings.nullToEmpty(getResource()));
+ log.setResource("");
log.setComment(getComment());
log.setProps(getProperties());
return log;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
index d862a3ef44c..08a08d49d84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
@@ -56,6 +56,7 @@ public class CatalogLog implements Writable {
@SerializedName(value = "invalidCache")
private boolean invalidCache;
+ @Deprecated
@SerializedName(value = "resource")
private String resource;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index b3e73f68d08..7ee865576b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -28,8 +28,6 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
-import org.apache.doris.catalog.Resource;
-import org.apache.doris.catalog.Resource.ReferenceType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
@@ -126,12 +124,6 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
((ExternalCatalog) catalog).resetToUninitialized(false);
}
- if (!Strings.isNullOrEmpty(catalog.getResource())) {
- Resource resource =
Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource());
- if (resource != null) {
- resource.addReference(catalog.getName(),
ReferenceType.CATALOG);
- }
- }
}
private CatalogIf removeCatalog(long catalogId) {
@@ -144,12 +136,6 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
ConnectContext.get().removeLastDBOfCatalog(catalog.getName());
}
Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId());
- if (!Strings.isNullOrEmpty(catalog.getResource())) {
- Resource catalogResource =
Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource());
- if (catalogResource != null) {
- catalogResource.removeReference(catalog.getName(),
ReferenceType.CATALOG);
- }
- }
Env.getCurrentEnv().getQueryStats().clear(catalog.getId());
}
return catalog;
@@ -417,9 +403,6 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
ErrorReport.reportAnalysisException(ErrorCode.ERR_CATALOG_ACCESS_DENIED,
ConnectContext.get().getQualifiedUser(),
catalog.getName());
}
- if (!Strings.isNullOrEmpty(catalog.getResource())) {
- rows.add(Arrays.asList("resource", catalog.getResource()));
- }
Map<String, String> sortedMap =
getCatalogPropertiesWithPrintable(catalog);
sortedMap.forEach((k, v) -> rows.add(Arrays.asList(k, v)));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
index cbb5543f4b1..ffba2489ae3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
@@ -17,8 +17,6 @@
package org.apache.doris.datasource;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.Resource;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@@ -27,7 +25,6 @@ import
org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.persist.gson.GsonUtils;
-import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.MapUtils;
@@ -41,6 +38,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* CatalogProperty to store the properties for catalog.
@@ -49,116 +47,103 @@ import java.util.function.Function;
public class CatalogProperty implements Writable {
private static final Logger LOG =
LogManager.getLogger(CatalogProperty.class);
+ @Deprecated
@SerializedName(value = "resource")
private String resource;
+
@SerializedName(value = "properties")
private Map<String, String> properties;
+ // Lazy-loaded storage properties map, using volatile to ensure visibility
private volatile Map<StorageProperties.Type, StorageProperties>
storagePropertiesMap;
- private MetastoreProperties metastoreProperties;
+ // Lazy-loaded metastore properties, using volatile to ensure visibility
+ private volatile MetastoreProperties metastoreProperties;
+
+ // Lazy-loaded backend storage properties, using volatile to ensure
visibility
+ private volatile Map<String, String> backendStorageProperties;
- private volatile Resource catalogResource = null;
+ // Lazy-loaded Hadoop properties, using volatile to ensure visibility
+ private volatile Map<String, String> hadoopProperties;
public CatalogProperty(String resource, Map<String, String> properties) {
- this.resource = Strings.nullToEmpty(resource);
+ this.resource = resource; // Keep but not used
this.properties = properties;
if (this.properties == null) {
this.properties = Maps.newConcurrentMap();
}
}
- private Resource catalogResource() {
- if (!Strings.isNullOrEmpty(resource) && catalogResource == null) {
- synchronized (this) {
- if (catalogResource == null) {
- catalogResource =
Env.getCurrentEnv().getResourceMgr().getResource(resource);
- }
- }
- }
- return catalogResource;
- }
-
public String getOrDefault(String key, String defaultVal) {
- String val = properties.get(key);
- if (val == null) {
- Resource res = catalogResource();
- if (res != null) {
- val = res.getCopiedProperties().getOrDefault(key, defaultVal);
- } else {
- val = defaultVal;
- }
- }
- return val;
+ return properties.getOrDefault(key, defaultVal);
}
public Map<String, String> getProperties() {
- Map<String, String> mergedProperties = Maps.newHashMap();
- if (!Strings.isNullOrEmpty(resource)) {
- Resource res = catalogResource();
- if (res != null) {
- mergedProperties = res.getCopiedProperties();
- }
- }
- mergedProperties.putAll(properties);
- return mergedProperties;
- }
-
- public String getResource() {
- return resource;
+ return Maps.newHashMap(properties);
}
public void modifyCatalogProps(Map<String, String> props) {
- properties.putAll(PropertyConverter.convertToMetaProperties(props));
- this.storagePropertiesMap = null;
- }
-
- private void reInitCatalogStorageProperties() {
- List<StorageProperties> storageProperties;
- try {
- storageProperties = StorageProperties.createAll(getProperties());
- this.storagePropertiesMap = (storageProperties.stream()
-
.collect(java.util.stream.Collectors.toMap(StorageProperties::getType,
Function.identity())));
- } catch (UserException e) {
- throw new RuntimeException(e);
+ synchronized (this) {
+
properties.putAll(PropertyConverter.convertToMetaProperties(props));
+ resetAllCaches();
}
-
}
public void rollBackCatalogProps(Map<String, String> props) {
- properties.clear();
- properties = new HashMap<>(props);
- this.storagePropertiesMap = null;
- }
-
-
- public Map<String, String> getHadoopProperties() {
- Map<String, String> hadoopProperties = getProperties();
-
hadoopProperties.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties()));
- return hadoopProperties;
+ synchronized (this) {
+ properties = new HashMap<>(props);
+ resetAllCaches();
+ }
}
public void addProperty(String key, String val) {
- this.properties.put(key, val);
- this.storagePropertiesMap = null; // reset storage properties map
+ synchronized (this) {
+ this.properties.put(key, val);
+ resetAllCaches();
+ }
}
public void deleteProperty(String key) {
- this.properties.remove(key);
+ synchronized (this) {
+ this.properties.remove(key);
+ resetAllCaches();
+ }
+ }
+
+ /**
+ * Unified cache reset method to ensure all caches are properly cleared
+ */
+ private void resetAllCaches() {
this.storagePropertiesMap = null;
+ this.metastoreProperties = null;
+ this.backendStorageProperties = null;
+ this.hadoopProperties = null;
}
+ /**
+ * Get storage properties map with lazy loading, using double-check
locking to ensure thread safety
+ */
public Map<StorageProperties.Type, StorageProperties>
getStoragePropertiesMap() {
if (storagePropertiesMap == null) {
synchronized (this) {
if (storagePropertiesMap == null) {
- reInitCatalogStorageProperties();
+ try {
+ List<StorageProperties> storageProperties =
StorageProperties.createAll(getProperties());
+ this.storagePropertiesMap = storageProperties.stream()
+
.collect(Collectors.toMap(StorageProperties::getType, Function.identity()));
+ } catch (UserException e) {
+ LOG.warn("Failed to initialize catalog storage
properties", e);
+ throw new RuntimeException("Failed to initialize
storage properties for catalog", e);
+ }
}
}
}
return storagePropertiesMap;
}
+ /**
+ * Get metastore properties with lazy loading, using double-check locking
to ensure thread safety
+ */
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
@@ -173,13 +158,59 @@ public class CatalogProperty implements Writable {
if (MapUtils.isEmpty(getProperties())) {
return null;
}
+
if (metastoreProperties == null) {
- try {
- metastoreProperties =
MetastoreProperties.create(getProperties());
- } catch (UserException e) {
- throw new RuntimeException(e);
+ synchronized (this) {
+ if (metastoreProperties == null) {
+ try {
+ metastoreProperties =
MetastoreProperties.create(getProperties());
+ } catch (UserException e) {
+ LOG.warn("Failed to create metastore properties", e);
+ throw new RuntimeException("Failed to create metastore
properties", e);
+ }
+ }
}
}
return metastoreProperties;
}
+
+ /**
+ * Get backend storage properties with lazy loading, using double-check
locking to ensure thread safety
+ */
+ public Map<String, String> getBackendStorageProperties() {
+ if (backendStorageProperties == null) {
+ synchronized (this) {
+ if (backendStorageProperties == null) {
+ Map<String, String> result = new HashMap<>();
+ Map<StorageProperties.Type, StorageProperties> storageMap
= getStoragePropertiesMap();
+
+ for (StorageProperties sp : storageMap.values()) {
+ Map<String, String> backendProps =
sp.getBackendConfigProperties();
+ if (backendProps != null) {
+ result.putAll(backendProps);
+ }
+ }
+
+ this.backendStorageProperties = result;
+ }
+ }
+ }
+ return backendStorageProperties;
+ }
+
+ /**
+ * Get Hadoop properties with lazy loading, using double-check locking to
ensure thread safety
+ */
+ public Map<String, String> getHadoopProperties() {
+ if (hadoopProperties == null) {
+ synchronized (this) {
+ if (hadoopProperties == null) {
+ Map<String, String> result = getProperties();
+
result.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties()));
+ this.hadoopProperties = result;
+ }
+ }
+ }
+ return hadoopProperties;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index e7e4eb67504..d5312fe5e3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -680,11 +680,6 @@ public abstract class ExternalCatalog
+ ", table id: " + tableId);
}
- @Override
- public String getResource() {
- return catalogProperty.getResource();
- }
-
@Nullable
@Override
public ExternalDatabase<? extends ExternalTable> getDbNullable(String
dbName) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index fabd7448555..c0aeb6da888 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -575,14 +575,8 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return catalog.getCatalogProperty().getStoragePropertiesMap();
}
- public Map<String, String> getHadoopProperties() {
- return getStoragePropertiesMap().values().stream()
- .flatMap(m ->
m.getBackendConfigProperties().entrySet().stream())
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- Map.Entry::getValue,
- (v1, v2) -> v2));
-
+ public Map<String, String> getBackendStorageProperties() {
+ return catalog.getCatalogProperty().getBackendStorageProperties();
}
public List<ColumnStatisticsObj> getHiveTableColumnStats(List<String>
columns) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 552bbd9156b..1b259dfb283 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -478,8 +478,8 @@ public class HiveScanNode extends FileQueryScanNode {
}
@Override
- protected Map<String, String> getLocationProperties() throws UserException
{
- return hmsTable.getHadoopProperties();
+ protected Map<String, String> getLocationProperties() {
+ return hmsTable.getBackendStorageProperties();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 23df0415762..948e1f8c5e2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -230,12 +230,12 @@ public class HudiScanNode extends HiveScanNode {
}
@Override
- protected Map<String, String> getLocationProperties() throws UserException
{
+ protected Map<String, String> getLocationProperties() {
if (incrementalRead) {
return incrementalRelation.getHoodieParams();
} else {
// HudiJniScanner uses hadoop client to read data.
- return hmsTable.getHadoopProperties();
+ return hmsTable.getBackendStorageProperties();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 2d87889d50e..b1178363b9b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -23,18 +23,14 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
-import org.apache.doris.datasource.property.PropertyConverter;
import
org.apache.doris.datasource.property.metastore.AbstractIcebergProperties;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.transaction.TransactionManagerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.s3a.Constants;
import org.apache.iceberg.catalog.Catalog;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
public abstract class IcebergExternalCatalog extends ExternalCatalog {
@@ -142,11 +138,6 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
}
}
- protected void initS3Param(Configuration conf) {
- Map<String, String> properties = catalogProperty.getHadoopProperties();
- conf.set(Constants.AWS_CREDENTIALS_PROVIDER,
PropertyConverter.getAWSCredentialsProviders(properties));
- }
-
@Override
public boolean viewExists(String dbName, String viewName) {
return metadataOps.viewExists(dbName, viewName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index 85e3b29e1a3..e8e92e60ac1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -361,7 +361,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
jdbcTable.setDriverClass(this.getDriverClass());
jdbcTable.setDriverUrl(this.getDriverUrl());
jdbcTable.setCheckSum(this.getCheckSum());
- jdbcTable.setResourceName(this.getResource());
+ jdbcTable.setResourceName("");
jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
index 00363c1f799..fba59545922 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
@@ -17,38 +17,12 @@
package org.apache.doris.datasource.paimon;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import java.util.Map;
public class PaimonDLFExternalCatalog extends PaimonExternalCatalog {
- private static final Logger LOG =
LogManager.getLogger(PaimonDLFExternalCatalog.class);
public PaimonDLFExternalCatalog(long catalogId, String name, String
resource,
Map<String, String> props, String comment)
{
super(catalogId, name, resource, props, comment);
}
-
- @Override
- protected void initLocalObjectsImpl() {
- super.initLocalObjectsImpl();
- catalogType = PAIMON_DLF;
- catalog = createCatalog();
- }
-
- @Override
- protected void setPaimonCatalogOptions(Map<String, String> properties,
Map<String, String> options) {
- options.put(PaimonProperties.PAIMON_CATALOG_TYPE,
PaimonProperties.PAIMON_HMS_CATALOG);
- options.put(PaimonProperties.PAIMON_METASTORE_CLIENT,
ProxyMetaStoreClient.class.getName());
- options.put(PaimonProperties.PAIMON_OSS_ENDPOINT,
- properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT));
- options.put(PaimonProperties.PAIMON_OSS_ACCESS_KEY,
- properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
- options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
- properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index 93252d61d58..b8decdc032d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -18,38 +18,28 @@
package org.apache.doris.datasource.paimon;
import org.apache.doris.common.DdlException;
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-import
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.SessionContext;
-import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.datasource.property.constants.HMSProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties;
+import org.apache.doris.datasource.property.metastore.MetastoreProperties;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Catalog.TableNotExistException;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public abstract class PaimonExternalCatalog extends ExternalCatalog {
+public class PaimonExternalCatalog extends ExternalCatalog {
private static final Logger LOG =
LogManager.getLogger(PaimonExternalCatalog.class);
public static final String PAIMON_CATALOG_TYPE = "paimon.catalog.type";
public static final String PAIMON_FILESYSTEM = "filesystem";
@@ -57,35 +47,32 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
public static final String PAIMON_DLF = "dlf";
protected String catalogType;
protected Catalog catalog;
- protected AuthenticationConfig authConf;
- protected HadoopAuthenticator hadoopAuthenticator;
- private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
- PaimonProperties.WAREHOUSE
- );
+ private AbstractPaimonProperties paimonProperties;
- public PaimonExternalCatalog(long catalogId, String name, String resource,
- Map<String, String> props, String comment) {
+ public PaimonExternalCatalog(long catalogId, String name, String resource,
Map<String, String> props,
+ String comment) {
super(catalogId, name, InitCatalogLog.Type.PAIMON, comment);
- props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
}
@Override
protected void initLocalObjectsImpl() {
- Configuration conf =
DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth());
- for (Map.Entry<String, String> propEntry :
this.catalogProperty.getHadoopProperties().entrySet()) {
- conf.set(propEntry.getKey(), propEntry.getValue());
+ try {
+ paimonProperties = (AbstractPaimonProperties)
MetastoreProperties.create(catalogProperty.getProperties());
+ } catch (UserException e) {
+ throw new IllegalArgumentException("Failed to create Paimon
properties from catalog properties,exception: "
+ + ExceptionUtils.getRootCauseMessage(e), e);
}
- authConf = AuthenticationConfig.getKerberosConfig(conf);
- hadoopAuthenticator =
HadoopAuthenticator.getHadoopAuthenticator(authConf);
+ catalogType = paimonProperties.getPaimonCatalogType();
+ catalog = createCatalog();
initPreExecutionAuthenticator();
}
@Override
protected synchronized void initPreExecutionAuthenticator() {
if (executionAuthenticator == null) {
- executionAuthenticator = new
HadoopExecutionAuthenticator(hadoopAuthenticator);
+ executionAuthenticator =
paimonProperties.getExecutionAuthenticator();
}
}
@@ -96,8 +83,8 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
protected List<String> listDatabaseNames() {
try {
- return hadoopAuthenticator.doAs(() -> new
ArrayList<>(catalog.listDatabases()));
- } catch (IOException e) {
+ return executionAuthenticator.execute(() -> new
ArrayList<>(catalog.listDatabases()));
+ } catch (Exception e) {
throw new RuntimeException("Failed to list databases names,
catalog name: " + getName(), e);
}
}
@@ -106,7 +93,7 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
makeSureInitialized();
try {
- return hadoopAuthenticator.doAs(() -> {
+ return executionAuthenticator.execute(() -> {
try {
catalog.getTable(Identifier.create(dbName, tblName));
return true;
@@ -115,8 +102,9 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
}
});
- } catch (IOException e) {
- throw new RuntimeException("Failed to check table existence,
catalog name: " + getName(), e);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to check table existence,
catalog name: " + getName()
+ + "error message is:" +
ExceptionUtils.getRootCauseMessage(e), e);
}
}
@@ -124,7 +112,7 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
try {
- return hadoopAuthenticator.doAs(() -> {
+ return executionAuthenticator.execute(() -> {
List<String> tableNames = null;
try {
tableNames = catalog.listTables(dbName);
@@ -133,7 +121,7 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
}
return tableNames;
});
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException("Failed to list table names, catalog
name: " + getName(), e);
}
}
@@ -141,32 +129,31 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
public org.apache.paimon.table.Table getPaimonTable(NameMapping
nameMapping) {
makeSureInitialized();
try {
- return hadoopAuthenticator.doAs(() -> catalog.getTable(
- Identifier.create(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName())));
+ return executionAuthenticator.execute(() ->
catalog.getTable(Identifier.create(nameMapping
+ .getRemoteDbName(), nameMapping.getRemoteTblName())));
} catch (Exception e) {
- throw new RuntimeException("Failed to get Paimon table:" +
getName() + "."
- + nameMapping.getLocalDbName() + "." +
nameMapping.getLocalTblName() + ", because "
- + e.getMessage(), e);
+ throw new RuntimeException("Failed to get Paimon table:" +
getName() + "." + nameMapping.getLocalDbName()
+ + "." + nameMapping.getLocalTblName() + ", because " +
ExceptionUtils.getRootCauseMessage(e), e);
}
}
public List<Partition> getPaimonPartitions(NameMapping nameMapping) {
makeSureInitialized();
try {
- return hadoopAuthenticator.doAs(() -> {
+ return executionAuthenticator.execute(() -> {
List<Partition> partitions = new ArrayList<>();
try {
- partitions = catalog.listPartitions(
- Identifier.create(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName()));
+ partitions =
catalog.listPartitions(Identifier.create(nameMapping.getRemoteDbName(),
+ nameMapping.getRemoteTblName()));
} catch (Catalog.TableNotExistException e) {
LOG.warn("TableNotExistException", e);
}
return partitions;
});
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException("Failed to get Paimon table
partitions:" + getName() + "."
- + nameMapping.getRemoteDbName() + "." +
nameMapping.getRemoteTblName()
- + ", because " + e.getMessage(), e);
+ + nameMapping.getRemoteDbName() + "." +
nameMapping.getRemoteTblName() + ", because "
+ + ExceptionUtils.getRootCauseMessage(e), e);
}
}
@@ -174,79 +161,44 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
return getPaimonSystemTable(nameMapping, null, queryType);
}
- public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping
nameMapping, String branch,
- String queryType) {
+ public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping
nameMapping,
+ String branch,
String queryType) {
makeSureInitialized();
try {
- return hadoopAuthenticator.doAs(() -> catalog.getTable(new
Identifier(nameMapping.getRemoteDbName(),
+ return executionAuthenticator.execute(() -> catalog.getTable(new
Identifier(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName(), branch, queryType)));
} catch (Exception e) {
throw new RuntimeException("Failed to get Paimon system table:" +
getName() + "."
+ nameMapping.getRemoteDbName() + "." +
nameMapping.getRemoteTblName() + "$" + queryType
- + ", because " + e.getMessage(), e);
+ + ", because " + ExceptionUtils.getRootCauseMessage(e), e);
}
}
- protected String getPaimonCatalogType(String catalogType) {
- if (PAIMON_HMS.equalsIgnoreCase(catalogType)) {
- return PaimonProperties.PAIMON_HMS_CATALOG;
- } else {
- return PaimonProperties.PAIMON_FILESYSTEM_CATALOG;
- }
- }
protected Catalog createCatalog() {
try {
- return hadoopAuthenticator.doAs(() -> {
- Options options = new Options();
- Map<String, String> paimonOptionsMap = getPaimonOptionsMap();
- for (Map.Entry<String, String> kv :
paimonOptionsMap.entrySet()) {
- options.set(kv.getKey(), kv.getValue());
- }
- CatalogContext context = CatalogContext.create(options,
getConfiguration());
- return createCatalogImpl(context);
- });
- } catch (IOException e) {
- throw new RuntimeException("Failed to create catalog, catalog
name: " + getName(), e);
+ return paimonProperties.initializeCatalog(getName(), new
ArrayList<>(catalogProperty
+ .getStoragePropertiesMap().values()));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create catalog, catalog
name: " + getName() + ", exception: "
+ + ExceptionUtils.getRootCauseMessage(e), e);
}
}
- protected Catalog createCatalogImpl(CatalogContext context) {
- return CatalogFactory.createCatalog(context);
- }
-
public Map<String, String> getPaimonOptionsMap() {
- Map<String, String> properties = catalogProperty.getHadoopProperties();
- Map<String, String> options = Maps.newHashMap();
- options.put(PaimonProperties.WAREHOUSE,
properties.get(PaimonProperties.WAREHOUSE));
- setPaimonCatalogOptions(properties, options);
- setPaimonExtraOptions(properties, options);
- return options;
- }
-
- protected abstract void setPaimonCatalogOptions(Map<String, String>
properties, Map<String, String> options);
-
- protected void setPaimonExtraOptions(Map<String, String> properties,
Map<String, String> options) {
- for (Map.Entry<String, String> kv : properties.entrySet()) {
- if (kv.getKey().startsWith(PaimonProperties.PAIMON_PREFIX)) {
-
options.put(kv.getKey().substring(PaimonProperties.PAIMON_PREFIX.length()),
kv.getValue());
- }
- }
-
- // hive version.
- // This property is used for both FE and BE, so it has no "paimon."
prefix.
- // We need to handle it separately.
- if (properties.containsKey(HMSProperties.HIVE_VERSION)) {
- options.put(HMSProperties.HIVE_VERSION,
properties.get(HMSProperties.HIVE_VERSION));
- }
+ makeSureInitialized();
+ return paimonProperties.getCatalogOptionsMap();
}
@Override
public void checkProperties() throws DdlException {
- super.checkProperties();
- for (String requiredProperty : REQUIRED_PROPERTIES) {
- if
(!catalogProperty.getProperties().containsKey(requiredProperty)) {
- throw new DdlException("Required property '" +
requiredProperty + "' is missing");
+ if (null != paimonProperties) {
+ try {
+ this.paimonProperties = (AbstractPaimonProperties)
MetastoreProperties
+ .create(catalogProperty.getProperties());
+ } catch (UserException e) {
+ throw new DdlException("Failed to create Paimon properties
from catalog properties, exception: "
+ + ExceptionUtils.getRootCauseMessage(e), e);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
index 53e790d8c9e..b904ac169ff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
@@ -19,7 +19,6 @@ package org.apache.doris.datasource.paimon;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.commons.lang3.StringUtils;
@@ -36,12 +35,9 @@ public class PaimonExternalCatalogFactory {
metastoreType = metastoreType.toLowerCase();
switch (metastoreType) {
case PaimonExternalCatalog.PAIMON_HMS:
- return new PaimonHMSExternalCatalog(catalogId, name, resource,
props, comment);
case PaimonExternalCatalog.PAIMON_FILESYSTEM:
- return new PaimonFileExternalCatalog(catalogId, name,
resource, props, comment);
case PaimonExternalCatalog.PAIMON_DLF:
- props.put(HMSProperties.HIVE_METASTORE_TYPE,
HMSProperties.DLF_TYPE);
- return new PaimonDLFExternalCatalog(catalogId, name, resource,
props, comment);
+ return new PaimonExternalCatalog(catalogId, name, resource,
props, comment);
default:
throw new DdlException("Unknown " +
PaimonExternalCatalog.PAIMON_CATALOG_TYPE
+ " value: " + metastoreType);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
index e74f3deeaf5..c2c3e139567 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
@@ -17,77 +17,12 @@
package org.apache.doris.datasource.paimon;
-import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.datasource.property.constants.CosProperties;
-import org.apache.doris.datasource.property.constants.ObsProperties;
-import org.apache.doris.datasource.property.constants.OssProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import java.util.Map;
-
public class PaimonFileExternalCatalog extends PaimonExternalCatalog {
- private static final Logger LOG =
LogManager.getLogger(PaimonFileExternalCatalog.class);
public PaimonFileExternalCatalog(long catalogId, String name, String
resource,
Map<String, String> props, String comment) {
super(catalogId, name, resource, props, comment);
}
-
- @Override
- protected void initLocalObjectsImpl() {
- super.initLocalObjectsImpl();
- catalogType = PAIMON_FILESYSTEM;
- catalog = createCatalog();
- }
-
- @Override
- protected void setPaimonCatalogOptions(Map<String, String> properties,
Map<String, String> options) {
- if (properties.containsKey(PaimonProperties.PAIMON_S3_ENDPOINT)) {
- options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
- properties.get(PaimonProperties.PAIMON_S3_ENDPOINT));
- options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
- properties.get(PaimonProperties.PAIMON_S3_ACCESS_KEY));
- options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
- properties.get(PaimonProperties.PAIMON_S3_SECRET_KEY));
- } else if
(properties.containsKey(PaimonProperties.PAIMON_OSS_ENDPOINT)) {
- boolean hdfsEnabled = Boolean.parseBoolean(properties.getOrDefault(
- OssProperties.OSS_HDFS_ENABLED, "false"));
- if
(!LocationPath.isHdfsOnOssEndpoint(properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT))
- && !hdfsEnabled) {
- options.put(PaimonProperties.PAIMON_OSS_ENDPOINT,
- properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT));
- options.put(PaimonProperties.PAIMON_OSS_ACCESS_KEY,
-
properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
- options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
-
properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
- }
- } else if (properties.containsKey(CosProperties.ENDPOINT)) {
- options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
- properties.get(CosProperties.ENDPOINT));
- options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
- properties.get(CosProperties.ACCESS_KEY));
- options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
- properties.get(CosProperties.SECRET_KEY));
- options.put(PaimonProperties.WAREHOUSE,
- options.get(PaimonProperties.WAREHOUSE).replace("cosn://",
"s3://"));
- } else if (properties.containsKey(ObsProperties.ENDPOINT)) {
- options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
- properties.get(ObsProperties.ENDPOINT));
- options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
- properties.get(ObsProperties.ACCESS_KEY));
- options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
- properties.get(ObsProperties.SECRET_KEY));
- options.put(PaimonProperties.WAREHOUSE,
- options.get(PaimonProperties.WAREHOUSE).replace("obs://",
"s3://"));
- }
-
- if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
- options.put(PaimonProperties.S3_PATH_STYLE,
properties.get(PropertyConverter.USE_PATH_STYLE));
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
index fb27bb56696..ae5cf7073dd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
@@ -17,48 +17,12 @@
package org.apache.doris.datasource.paimon;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.datasource.property.constants.HMSProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
import java.util.Map;
public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {
- private static final Logger LOG =
LogManager.getLogger(PaimonHMSExternalCatalog.class);
- private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
- HMSProperties.HIVE_METASTORE_URIS
- );
public PaimonHMSExternalCatalog(long catalogId, String name, String
resource,
Map<String, String> props, String comment) {
super(catalogId, name, resource, props, comment);
}
-
- @Override
- protected void initLocalObjectsImpl() {
- super.initLocalObjectsImpl();
- catalogType = PAIMON_HMS;
- catalog = createCatalog();
- }
-
- @Override
- protected void setPaimonCatalogOptions(Map<String, String> properties,
Map<String, String> options) {
- options.put(PaimonProperties.PAIMON_CATALOG_TYPE,
getPaimonCatalogType(catalogType));
- options.put(PaimonProperties.HIVE_METASTORE_URIS,
properties.get(HMSProperties.HIVE_METASTORE_URIS));
- }
-
- @Override
- public void checkProperties() throws DdlException {
- super.checkProperties();
- for (String requiredProperty : REQUIRED_PROPERTIES) {
- if
(!catalogProperty.getProperties().containsKey(requiredProperty)) {
- throw new DdlException("Required property '" +
requiredProperty + "' is missing");
- }
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 08227a1fc73..0db54afc1a9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -139,9 +139,9 @@ public class PaimonScanNode extends FileQueryScanNode {
protected ConcurrentHashMap<Long, Boolean> currentQuerySchema = new
ConcurrentHashMap<>();
public PaimonScanNode(PlanNodeId id,
- TupleDescriptor desc,
- boolean needCheckColumnPriv,
- SessionVariable sv) {
+ TupleDescriptor desc,
+ boolean needCheckColumnPriv,
+ SessionVariable sv) {
super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE,
needCheckColumnPriv, sv);
}
@@ -420,14 +420,8 @@ public class PaimonScanNode extends FileQueryScanNode {
}
@Override
- public Map<String, String> getLocationProperties() throws
MetaNotFoundException, DdlException {
- HashMap<String, String> map = new
HashMap<>(source.getCatalog().getProperties());
-
source.getCatalog().getCatalogProperty().getHadoopProperties().forEach((k, v)
-> {
- if (!map.containsKey(k)) {
- map.put(k, v);
- }
- });
- return map;
+ protected Map<String, String> getLocationProperties() {
+ return
source.getCatalog().getCatalogProperty().getBackendStorageProperties();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
new file mode 100644
index 00000000000..b5e12a4970c
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.S3Properties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractPaimonProperties extends MetastoreProperties {
+ @ConnectorProperty(
+ names = {"warehouse"},
+ description = "The location of the Paimon warehouse. This is where
the tables will be stored."
+ )
+ protected String warehouse;
+
+ @Getter
+ protected ExecutionAuthenticator executionAuthenticator = new
ExecutionAuthenticator() {
+ };
+
+ @Getter
+ protected Options catalogOptions;
+
+ private final AtomicReference<Map<String, String>> catalogOptionsMapRef =
new AtomicReference<>();
+
+ public abstract String getPaimonCatalogType();
+
+ private static final String USER_PROPERTY_PREFIX = "paimon.";
+
+ protected AbstractPaimonProperties(Map<String, String> props) {
+ super(props);
+ }
+
+ public abstract Catalog initializeCatalog(String catalogName,
List<StorageProperties> storagePropertiesList);
+
+ /**
+ * Adapt S3 storage properties for Apache Paimon's S3 file system.
+ *
+ * <p>Paimon's S3 file system does not follow the standard
Hadoop-compatible
+ * configuration keys (like fs.s3a.access.key). Instead, it expects
specific
+ * keys such as "s3.access.key", "s3.secret.key", etc.
+ *
+ * <p>Therefore, we explicitly map our internal S3 configuration (usually
designed
+ * for HDFS-compatible systems) to Paimon's expected format.
+ *
+ * <p>See: org.apache.paimon.s3.S3Loader
+ *
+ * @param storagePropertiesList the list of configured storage backends
+ */
+ protected void appendS3PropertiesIsNeeded(List<StorageProperties>
storagePropertiesList) {
+
+ S3Properties s3Properties = (S3Properties)
storagePropertiesList.stream()
+ .filter(storageProperties -> storageProperties.getType() ==
StorageProperties.Type.S3)
+ .findFirst()
+ .orElse(null);
+ if (s3Properties != null) {
+ catalogOptions.set("s3.access.key", s3Properties.getSecretKey());
+ catalogOptions.set("s3.secret.key", s3Properties.getAccessKey());
+ catalogOptions.set("s3.endpoint", s3Properties.getEndpoint());
+ catalogOptions.set("s3.region", s3Properties.getRegion());
+ }
+
+ }
+
+ protected void appendCatalogOptions(List<StorageProperties>
storagePropertiesList) {
+ if (StringUtils.isNotBlank(warehouse)) {
+ catalogOptions.set(CatalogOptions.WAREHOUSE.key(), warehouse);
+ }
+ catalogOptions.set(CatalogOptions.METASTORE.key(), getMetastoreType());
+ origProps.forEach((k, v) -> {
+ if (k.toLowerCase().startsWith(USER_PROPERTY_PREFIX)) {
+ String newKey = k.substring(USER_PROPERTY_PREFIX.length());
+ if (StringUtils.isNotBlank(newKey)) {
+ catalogOptions.set(newKey, v);
+ }
+ }
+ });
+ appendS3PropertiesIsNeeded(storagePropertiesList);
+ }
+
+ /**
+ * Build catalog options including common and subclass-specific ones.
+ */
+ public void buildCatalogOptions(List<StorageProperties>
storagePropertiesList) {
+ catalogOptions = new Options();
+ appendCatalogOptions(storagePropertiesList);
+ appendCustomCatalogOptions();
+ }
+
+ public Map<String, String> getCatalogOptionsMap() {
+ // Return the cached map if already initialized
+ Map<String, String> existing = catalogOptionsMapRef.get();
+ if (existing != null) {
+ return existing;
+ }
+
+ // Check that the catalog options source is available
+ if (catalogOptions == null) {
+ throw new IllegalStateException("Catalog options have not been
initialized. Call"
+ + " buildCatalogOptions first.");
+ }
+
+ // Construct the map manually using the provided keys
+ Map<String, String> computed = new HashMap<>();
+ for (String key : catalogOptions.keySet()) {
+ computed.put(key, catalogOptions.get(key));
+ }
+
+ // Attempt to set the constructed map atomically; only one thread wins
+ if (catalogOptionsMapRef.compareAndSet(null, computed)) {
+ return computed;
+ } else {
+ // Another thread already initialized it; return the existing one
+ return catalogOptionsMapRef.get();
+ }
+ }
+
+
+ /**
+ * Hook method for subclasses to append metastore-specific or custom
catalog options.
+ *
+ * <p>This method is invoked after common catalog options (e.g., warehouse
path,
+ * metastore type, user-defined keys, and S3 compatibility mappings) have
been
+ * added to the {@link org.apache.paimon.options.Options} instance.
+ *
+ * <p>Subclasses should override this method to inject additional
configuration
+ * required for their specific metastore or environment. For example:
+ *
+ * <ul>
+ * <li>DLF-based catalog may require a custom metastore client
class.</li>
+ * <li>HMS-based catalog may include URI and client pool parameters.</li>
+ * <li>Other environments may inject authentication, endpoint, or
caching options.</li>
+ * </ul>
+ *
+ * <p>If the subclass does not require any special options beyond the
common ones,
+ * it can safely leave this method empty.
+ */
+ protected abstract void appendCustomCatalogOptions();
+
+ /**
+ * Returns the metastore type identifier used by the Paimon catalog
factory.
+ *
+ * <p>This identifier must match one of the known metastore types
supported by
+ * Apache Paimon. Internally, the value returned here is used to configure
the
+ * `metastore` option in {@code Options}, which determines the specific
+ * {@link org.apache.paimon.catalog.CatalogFactory} implementation to be
used
+ * when instantiating the catalog.
+ *
+ * <p>You can find valid identifiers by reviewing implementations of the
+ * {@link org.apache.paimon.catalog.CatalogFactory} interface. Each
implementation
+ * declares its identifier via a static {@code IDENTIFIER} field or
equivalent constant.
+ *
+ * <p>Examples:
+ * <ul>
+ * <li>{@code "filesystem"} - for {@link
org.apache.paimon.catalog.FileSystemCatalogFactory}</li>
+ * <li>{@code "hive"} - for {@link
org.apache.paimon.hive.HiveCatalogFactory}</li>
+ * </ul>
+ *
+ * @return the metastore type identifier string
+ */
+ protected abstract String getMetastoreType();
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
index 3e5e2273b08..48cba3c0bcc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
@@ -46,6 +46,7 @@ public class MetastoreProperties extends ConnectionProperties
{
public enum Type {
HMS("hms"),
ICEBERG("iceberg"),
+ PAIMON("paimon"),
GLUE("glue"),
DLF("dlf"),
DATAPROC("dataproc"),
@@ -83,6 +84,7 @@ public class MetastoreProperties extends ConnectionProperties
{
//subclasses should be registered here
register(Type.HMS, new HMSPropertiesFactory());
register(Type.ICEBERG, new IcebergPropertiesFactory());
+ register(Type.PAIMON, new PaimonPropertiesFactory());
}
public static void register(Type type, MetastorePropertiesFactory factory)
{
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
new file mode 100644
index 00000000000..2f65dbad9d8
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.storage.OSSProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import com.aliyun.datalake.metastore.common.DataLakeConfig;
+import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.hive.HiveCatalogOptions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * PaimonAliyunDLFMetaStoreProperties
+ *
+ * <p>This class provides configuration support for using Apache Paimon with
+ * Aliyun Data Lake Formation (DLF) as the metastore. Although DLF is not an
+ * officially supported metastore type in Paimon, this implementation adapts
+ * DLF by treating it as a Hive Metastore (HMS) underneath, enabling
+ * interoperability with Paimon's HiveCatalog.
+ *
+ * <p>Key Characteristics:
+ * <ul>
+ * <li>Internally uses HiveCatalog with custom HiveConf configured for
Aliyun DLF.</li>
+ * <li>Relies on {@link ProxyMetaStoreClient} to bridge DLF
compatibility.</li>
+ * <li>Requires Aliyun OSS as the storage backend. Other storage types are
not
+ * currently verified for compatibility.</li>
+ * </ul>
+ *
+ * <p>Note: This is an internal extension and not an officially supported
Paimon
+ * metastore type. Future compatibility should be validated when upgrading
Paimon
+ * or changing storage backends.
+ *
+ * @see org.apache.paimon.hive.HiveCatalog
+ * @see org.apache.paimon.catalog.CatalogFactory
+ * @see ProxyMetaStoreClient
+ */
+public class PaimonAliyunDLFMetaStoreProperties extends
AbstractPaimonProperties {
+
+ private AliyunDLFBaseProperties baseProperties;
+
+ protected PaimonAliyunDLFMetaStoreProperties(Map<String, String> props) {
+ super(props);
+ }
+
+ @Override
+ public void initNormalizeAndCheckProps() {
+ super.initNormalizeAndCheckProps();
+ baseProperties = AliyunDLFBaseProperties.of(origProps);
+ }
+
+ private HiveConf buildHiveConf() {
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.set(DataLakeConfig.CATALOG_ACCESS_KEY_ID,
baseProperties.dlfAccessKey);
+ hiveConf.set(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET,
baseProperties.dlfSecretKey);
+ hiveConf.set(DataLakeConfig.CATALOG_ENDPOINT,
baseProperties.dlfEndpoint);
+ hiveConf.set(DataLakeConfig.CATALOG_REGION_ID,
baseProperties.dlfRegion);
+ hiveConf.set(DataLakeConfig.CATALOG_SECURITY_TOKEN,
baseProperties.dlfSessionToken);
+ hiveConf.set(DataLakeConfig.CATALOG_USER_ID, baseProperties.dlfUid);
+ hiveConf.set(DataLakeConfig.CATALOG_ID, baseProperties.dlfCatalogId);
+ hiveConf.set(DataLakeConfig.CATALOG_PROXY_MODE,
baseProperties.dlfProxyMode);
+ return hiveConf;
+ }
+
+ @Override
+ public Catalog initializeCatalog(String catalogName,
List<StorageProperties> storagePropertiesList) {
+ HiveConf hiveConf = buildHiveConf();
+ buildCatalogOptions(storagePropertiesList);
+ StorageProperties ossProps = storagePropertiesList.stream()
+ .filter(sp -> sp.getType() == StorageProperties.Type.OSS)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("Paimon DLF
metastore requires OSS storage properties."));
+
+ if (!(ossProps instanceof OSSProperties)) {
+ throw new IllegalStateException("Expected OSSProperties type.");
+ }
+ OSSProperties ossProperties = (OSSProperties) ossProps;
+ for (Map.Entry<String, String> entry :
ossProperties.getHadoopStorageConfig()) {
+ catalogOptions.set(entry.getKey(), entry.getValue());
+ }
+ hiveConf.addResource(ossProperties.getHadoopStorageConfig());
+ CatalogContext catalogContext = CatalogContext.create(catalogOptions,
hiveConf);
+ return CatalogFactory.createCatalog(catalogContext);
+ }
+
+
+ @Override
+ protected void appendCustomCatalogOptions() {
+ catalogOptions.set("metastore.client.class",
ProxyMetaStoreClient.class.getName());
+ }
+
+ @Override
+ protected String getMetastoreType() {
+ return HiveCatalogOptions.IDENTIFIER;
+ }
+
+ @Override
+ public String getPaimonCatalogType() {
+ return PaimonExternalCatalog.PAIMON_DLF;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
new file mode 100644
index 00000000000..adfa53eab94
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.storage.HdfsProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.FileSystemCatalogFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class PaimonFileSystemMetaStoreProperties extends
AbstractPaimonProperties {
+ protected PaimonFileSystemMetaStoreProperties(Map<String, String> props) {
+ super(props);
+ }
+
+ @Override
+ public Catalog initializeCatalog(String catalogName,
List<StorageProperties> storagePropertiesList) {
+ buildCatalogOptions(storagePropertiesList);
+ Configuration conf = new Configuration(false);
+ storagePropertiesList.forEach(storageProperties -> {
+ for (Map.Entry<String, String> entry :
storageProperties.getHadoopStorageConfig()) {
+ catalogOptions.set(entry.getKey(), entry.getValue());
+ }
+ conf.addResource(storageProperties.getHadoopStorageConfig());
+ if
(storageProperties.getType().equals(StorageProperties.Type.HDFS)) {
+ this.executionAuthenticator = new
HadoopExecutionAuthenticator(((HdfsProperties) storageProperties)
+ .getHadoopAuthenticator());
+ }
+ });
+
+ CatalogContext catalogContext = CatalogContext.create(catalogOptions,
conf);
+ try {
+ return this.executionAuthenticator.execute(() ->
CatalogFactory.createCatalog(catalogContext));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void appendCustomCatalogOptions() {
+ //nothing need to do
+ }
+
+ @Override
+ protected String getMetastoreType() {
+ return FileSystemCatalogFactory.IDENTIFIER;
+ }
+
+ @Override
+ public String getPaimonCatalogType() {
+ return PaimonExternalCatalog.PAIMON_FILESYSTEM;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
new file mode 100644
index 00000000000..7c2d2ee79a3
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.hive.HiveCatalogOptions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class PaimonHMSMetaStoreProperties extends AbstractPaimonProperties {
+
+ private HMSBaseProperties hmsBaseProperties;
+
+ private static final String CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY =
"client-pool-cache.eviction-interval-ms";
+
+ private static final String LOCATION_IN_PROPERTIES_KEY =
"location-in-properties";
+
+ @ConnectorProperty(
+ names = {CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY},
+ required = false,
+ description = "Setting the client's pool cache eviction
interval(ms).")
+ private long clientPoolCacheEvictionIntervalMs =
TimeUnit.MINUTES.toMillis(5L);
+
+ @ConnectorProperty(
+ names = {LOCATION_IN_PROPERTIES_KEY},
+ required = false,
+ description = "Setting whether to use the location in the
properties.")
+ private boolean locationInProperties = false;
+
+
+ @Override
+ public String getPaimonCatalogType() {
+ return PaimonExternalCatalog.PAIMON_DLF;
+ }
+
+ protected PaimonHMSMetaStoreProperties(Map<String, String> props) {
+ super(props);
+ }
+
+ @Override
+ public void initNormalizeAndCheckProps() {
+ super.initNormalizeAndCheckProps();
+ hmsBaseProperties = HMSBaseProperties.of(origProps);
+ hmsBaseProperties.initAndCheckParams();
+ this.executionAuthenticator = new
HadoopExecutionAuthenticator(hmsBaseProperties.getHmsAuthenticator());
+ }
+
+
+ /**
+ * Builds the Hadoop Configuration by adding hive-site.xml and
storage-specific configs.
+ */
+ private Configuration buildHiveConfiguration(List<StorageProperties>
storagePropertiesList) {
+ Configuration conf = hmsBaseProperties.getHiveConf();
+
+ for (StorageProperties sp : storagePropertiesList) {
+ if (sp.getHadoopStorageConfig() != null) {
+ conf.addResource(sp.getHadoopStorageConfig());
+ }
+ }
+ return conf;
+ }
+
+ @Override
+ public Catalog initializeCatalog(String catalogName,
List<StorageProperties> storagePropertiesList) {
+ Configuration conf = buildHiveConfiguration(storagePropertiesList);
+ buildCatalogOptions(storagePropertiesList);
+ for (Map.Entry<String, String> entry : conf) {
+ catalogOptions.set(entry.getKey(), entry.getValue());
+ }
+ CatalogContext catalogContext = CatalogContext.create(catalogOptions,
conf);
+ try {
+ return executionAuthenticator.execute(() ->
CatalogFactory.createCatalog(catalogContext));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create Paimon catalog with
HMS metastore", e);
+ }
+
+ }
+
+ @Override
+ protected String getMetastoreType() {
+ //See org.apache.paimon.hive.HiveCatalogFactory
+ return HiveCatalogOptions.IDENTIFIER;
+ }
+
+ @Override
+ protected void appendCustomCatalogOptions() {
+ catalogOptions.set(CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY,
+ String.valueOf(clientPoolCacheEvictionIntervalMs));
+ catalogOptions.set(LOCATION_IN_PROPERTIES_KEY,
String.valueOf(locationInProperties));
+ catalogOptions.set("uri", hmsBaseProperties.getHiveMetastoreUri());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
new file mode 100644
index 00000000000..12dac3bb739
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import java.util.Map;
+
+public class PaimonPropertiesFactory extends
AbstractMetastorePropertiesFactory {
+
+ private static final String KEY = "paimon.catalog.type";
+ private static final String DEFAULT_TYPE = "filesystem";
+
+ public PaimonPropertiesFactory() {
+ register("dlf", PaimonAliyunDLFMetaStoreProperties::new);
+ register("filesystem", PaimonFileSystemMetaStoreProperties::new);
+ register("hms", PaimonHMSMetaStoreProperties::new);
+ }
+
+ @Override
+ public MetastoreProperties create(Map<String, String> props) {
+ return createInternal(props, KEY, DEFAULT_TYPE);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
index 8cec9d6a63d..b36e4309701 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
@@ -24,6 +24,7 @@ import com.google.common.base.Strings;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
@@ -257,8 +258,23 @@ public abstract class AbstractS3CompatibleProperties
extends StorageProperties i
@Override
public void initializeHadoopStorageConfig() {
- throw new UnsupportedOperationException("Hadoop storage config
initialization is not"
- + " supported for S3 compatible storage.");
+ hadoopStorageConfig = new Configuration();
+ // Compatibility note: Due to historical reasons, even when the
underlying
+ // storage is OSS, OBS, etc., users may still configure the schema as
"s3://".
+ // To ensure backward compatibility, we append S3-related properties
by default.
+ appendS3HdfsProperties(hadoopStorageConfig);
+ }
+
+ private void appendS3HdfsProperties(Configuration hadoopStorageConfig) {
+ hadoopStorageConfig.set("fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+ hadoopStorageConfig.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+ hadoopStorageConfig.set("fs.s3a.endpoint", getEndpoint());
+ hadoopStorageConfig.set("fs.s3a.access.key", getAccessKey());
+ hadoopStorageConfig.set("fs.s3a.secret.key", getSecretKey());
+ hadoopStorageConfig.set("fs.s3a.connection.maximum",
getMaxConnections());
+ hadoopStorageConfig.set("fs.s3a.connection.request.timeout",
getRequestTimeoutS());
+ hadoopStorageConfig.set("fs.s3a.connection.timeout",
getConnectionTimeoutS());
+ hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
index a2ae545f048..6fea7cf18ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -130,17 +129,8 @@ public class COSProperties extends
AbstractS3CompatibleProperties {
@Override
public void initializeHadoopStorageConfig() {
- hadoopStorageConfig = new Configuration();
- hadoopStorageConfig.set("fs.cos.impl",
"org.apache.hadoop.fs.CosFileSystem");
- hadoopStorageConfig.set("fs.cosn.impl",
"org.apache.hadoop.fs.CosFileSystem");
- hadoopStorageConfig.set("fs.AbstractFileSystem.cos.impl",
"org.apache.hadoop.fs.Cos");
- hadoopStorageConfig.set("fs.cos.secretId", accessKey);
- hadoopStorageConfig.set("fs.cos.secretKey", secretKey);
- hadoopStorageConfig.set("fs.cos.region", region);
- hadoopStorageConfig.set("fs.cos.endpoint", endpoint);
- hadoopStorageConfig.set("fs.cos.connection.timeout",
connectionTimeoutS);
- hadoopStorageConfig.set("fs.cos.connection.request.timeout",
requestTimeoutS);
- hadoopStorageConfig.set("fs.cos.connection.maximum", maxConnections);
- hadoopStorageConfig.set("fs.cos.use.path.style", usePathStyle);
+ super.initializeHadoopStorageConfig();
+ hadoopStorageConfig.set("fs.cos.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+ hadoopStorageConfig.set("fs.cosn.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
index 0f5349e1452..75f9fc85881 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import java.util.Map;
import java.util.Set;
@@ -95,18 +94,4 @@ public class MinioProperties extends
AbstractS3CompatibleProperties {
protected Set<Pattern> endpointPatterns() {
return
ImmutableSet.of(Pattern.compile("^(?:https?://)?[a-zA-Z0-9.-]+(?::\\d+)?$"));
}
-
- @Override
- public void initializeHadoopStorageConfig() {
- hadoopStorageConfig = new Configuration();
- hadoopStorageConfig.set("fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
- hadoopStorageConfig.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
- hadoopStorageConfig.set("fs.s3a.endpoint", endpoint);
- hadoopStorageConfig.set("fs.s3a.access.key", accessKey);
- hadoopStorageConfig.set("fs.s3a.secret.key", secretKey);
- hadoopStorageConfig.set("fs.s3a.connection.maximum", maxConnections);
- hadoopStorageConfig.set("fs.s3a.connection.request.timeout",
requestTimeoutS);
- hadoopStorageConfig.set("fs.s3a.connection.timeout",
connectionTimeoutS);
- hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
index 051dd9a927c..513727416ef 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -135,13 +134,7 @@ public class OBSProperties extends
AbstractS3CompatibleProperties {
@Override
public void initializeHadoopStorageConfig() {
- hadoopStorageConfig = new Configuration();
- hadoopStorageConfig.set("fs.obs.impl",
"com.obs.services.hadoop.fs.OBSFileSystem");
- hadoopStorageConfig.set("fs.obs.access.key", accessKey);
- hadoopStorageConfig.set("fs.obs.secret.key", secretKey);
- hadoopStorageConfig.set("fs.obs.endpoint", endpoint);
- hadoopStorageConfig.set("fs.obs.connection.timeout",
connectionTimeoutS);
- hadoopStorageConfig.set("fs.obs.request.timeout", requestTimeoutS);
- hadoopStorageConfig.set("fs.obs.connection.max", maxConnections);
+ super.initializeHadoopStorageConfig();
+ hadoopStorageConfig.set("fs.obs.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
index 1e34311f9e9..7b27a1ea160 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -172,15 +171,7 @@ public class OSSProperties extends
AbstractS3CompatibleProperties {
@Override
public void initializeHadoopStorageConfig() {
- hadoopStorageConfig = new Configuration();
- hadoopStorageConfig.set("fs.oss.impl",
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
- hadoopStorageConfig.set("fs.oss.endpoint", endpoint);
- hadoopStorageConfig.set("fs.oss.region", region);
- hadoopStorageConfig.set("fs.oss.accessKeyId", accessKey);
- hadoopStorageConfig.set("fs.oss.accessKeySecret", secretKey);
- hadoopStorageConfig.set("fs.oss.connection.timeout",
connectionTimeoutS);
- hadoopStorageConfig.set("fs.oss.connection.max", maxConnections);
- hadoopStorageConfig.set("fs.oss.connection.request.timeout",
requestTimeoutS);
- hadoopStorageConfig.set("fs.oss.use.path.style.access", usePathStyle);
+ super.initializeHadoopStorageConfig();
+ hadoopStorageConfig.set("fs.oss.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index e664edc6188..19a9195ec2f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -23,11 +23,9 @@ import
org.apache.doris.datasource.property.storage.exception.StoragePropertiesE
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
@@ -39,8 +37,6 @@ import
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsPr
import software.amazon.awssdk.services.sts.StsClient;
import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
-import java.lang.reflect.Field;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -219,34 +215,6 @@ public class S3Properties extends
AbstractS3CompatibleProperties {
return ENDPOINT_PATTERN;
}
- private static List<Field> getIdentifyFields() {
- List<Field> fields = Lists.newArrayList();
- try {
- //todo AliyunDlfProperties should in OSS storage type.
- fields.add(S3Properties.class.getDeclaredField("s3AccessKey"));
- // fixme Add it when MS done
-
//fields.add(AliyunDLFProperties.class.getDeclaredField("dlfAccessKey"));
-
//fields.add(AWSGlueProperties.class.getDeclaredField("glueAccessKey"));
- return fields;
- } catch (NoSuchFieldException e) {
- // should not happen
- throw new RuntimeException("Failed to get field: " +
e.getMessage(), e);
- }
- }
-
- /*
- public void toPaimonOSSFileIOProperties(Options options) {
- options.set("fs.oss.endpoint", s3Endpoint);
- options.set("fs.oss.accessKeyId", s3AccessKey);
- options.set("fs.oss.accessKeySecret", s3SecretKey);
- }
-
- public void toPaimonS3FileIOProperties(Options options) {
- options.set("s3.endpoint", s3Endpoint);
- options.set("s3.access-key", s3AccessKey);
- options.set("s3.secret-key", s3SecretKey);
- }*/
-
@Override
public Map<String, String> getBackendConfigProperties() {
Map<String, String> backendProperties =
generateBackendS3Configuration(s3ConnectionMaximum,
@@ -297,18 +265,17 @@ public class S3Properties extends
AbstractS3CompatibleProperties {
InstanceProfileCredentialsProvider.create());
}
-
@Override
public void initializeHadoopStorageConfig() {
- hadoopStorageConfig = new Configuration();
- hadoopStorageConfig.set("fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
- hadoopStorageConfig.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
- hadoopStorageConfig.set("fs.s3a.endpoint", endpoint);
- hadoopStorageConfig.set("fs.s3a.access.key", accessKey);
- hadoopStorageConfig.set("fs.s3a.secret.key", secretKey);
- hadoopStorageConfig.set("fs.s3a.connection.maximum",
s3ConnectionMaximum);
- hadoopStorageConfig.set("fs.s3a.connection.request.timeout",
s3ConnectionRequestTimeoutS);
- hadoopStorageConfig.set("fs.s3a.connection.timeout",
s3ConnectionTimeoutS);
- hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
+ super.initializeHadoopStorageConfig();
+ //Set assumed_roles
+ //@See
https://hadoop.apache.org/docs/r3.4.1/hadoop-aws/tools/hadoop-aws/assumed_roles.html
+ if (StringUtils.isNotBlank(s3ExternalId) &&
StringUtils.isNotBlank(s3IAMRole)) {
+ //@See org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider
+ hadoopStorageConfig.set("fs.s3a.assumed.role.external.id",
s3ExternalId);
+ hadoopStorageConfig.set("fs.s3a.assumed.role.arn", s3IAMRole);
+ hadoopStorageConfig.set("fs.s3a.aws.credentials.provider",
+
"org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
index 35f399409f7..131b5195c7e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -169,7 +169,7 @@ public class LogicalHudiScan extends LogicalFileScan {
Optional<IncrementalRelation> newIncrementalRelation =
Optional.empty();
if (optScanParams.isPresent() &&
optScanParams.get().incrementalRead()) {
TableScanParams scanParams = optScanParams.get();
- Map<String, String> optParams = table.getHadoopProperties();
+ Map<String, String> optParams =
table.getBackendStorageProperties();
if (scanParams.getMapParams().containsKey("beginTime")) {
optParams.put("hoodie.datasource.read.begin.instanttime",
scanParams.getMapParams().get("beginTime"));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index d44b6dd3df3..0a696f64340 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -162,7 +162,7 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
}
- tSink.setHadoopConfig(targetTable.getHadoopProperties());
+ tSink.setHadoopConfig(targetTable.getBackendStorageProperties());
tDataSink = new TDataSink(getDataSinkType());
tDataSink.setHiveTableSink(tSink);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]