This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ffbacbcdc64 [Feat](params-refactor): Refactor Paimon Metastore
parameter integration (#54114)
ffbacbcdc64 is described below
commit ffbacbcdc64b01b38292a7a26c2a182eaf517863
Author: Calvin Kirs <[email protected]>
AuthorDate: Sun Aug 3 00:50:05 2025 +0800
[Feat](params-refactor): Refactor Paimon Metastore parameter integration
(#54114)
### What problem does this PR solve?
#50238
[](https://github.com/apache/doris/commit/cba9b29bd0f9e3e18b90f18f6bab9e435ea90d50)
This PR introduces a refactored parameter integration framework for
Apache Paimon metastore catalogs. The goal is to standardize parameter
injection, improving configuration consistency and ease of use.
Key Changes
Introduced an abstract class AbstractPaimonProperties to provide a
unified parameter injection workflow and extension point.
The new parameter system supports multiple metastore types, including:
HMS (Hive Metastore)
Aliyun DLF (an HMS-based adaptation)
FileSystem (local or HDFS)
All configuration parameters are constructed through a standardized
Options object, eliminating the need for ad-hoc parameter mapping or
conversions.
Subclasses inject metastore-specific parameters (e.g.,
hive.metastore.uris, DLF credentials) by implementing the
↳appendCustomCatalogOptions method.
---
.../java/org/apache/doris/catalog/Resource.java | 16 +-
.../org/apache/doris/datasource/CatalogIf.java | 7 +-
.../org/apache/doris/datasource/CatalogLog.java | 1 +
.../org/apache/doris/datasource/CatalogMgr.java | 17 --
.../apache/doris/datasource/CatalogProperty.java | 173 +++++++++++--------
.../apache/doris/datasource/ExternalCatalog.java | 5 -
.../doris/datasource/hive/HMSExternalTable.java | 10 +-
.../doris/datasource/hive/source/HiveScanNode.java | 4 +-
.../doris/datasource/hudi/source/HudiScanNode.java | 4 +-
.../datasource/iceberg/IcebergExternalCatalog.java | 9 -
.../doris/datasource/jdbc/JdbcExternalCatalog.java | 2 +-
.../paimon/PaimonDLFExternalCatalog.java | 26 ---
.../datasource/paimon/PaimonExternalCatalog.java | 154 ++++++-----------
.../paimon/PaimonExternalCatalogFactory.java | 6 +-
.../paimon/PaimonFileExternalCatalog.java | 65 -------
.../paimon/PaimonHMSExternalCatalog.java | 36 ----
.../datasource/paimon/source/PaimonScanNode.java | 16 +-
.../metastore/AbstractPaimonProperties.java | 188 +++++++++++++++++++++
.../property/metastore/MetastoreProperties.java | 2 +
.../PaimonAliyunDLFMetaStoreProperties.java | 123 ++++++++++++++
.../PaimonFileSystemMetaStoreProperties.java | 76 +++++++++
.../metastore/PaimonHMSMetaStoreProperties.java | 117 +++++++++++++
.../metastore/PaimonPropertiesFactory.java | 37 ++++
.../storage/AbstractS3CompatibleProperties.java | 20 ++-
.../datasource/property/storage/COSProperties.java | 16 +-
.../property/storage/MinioProperties.java | 15 --
.../datasource/property/storage/OBSProperties.java | 11 +-
.../datasource/property/storage/OSSProperties.java | 13 +-
.../datasource/property/storage/S3Properties.java | 53 ++----
.../trees/plans/logical/LogicalHudiScan.java | 2 +-
.../org/apache/doris/planner/HiveTableSink.java | 2 +-
31 files changed, 751 insertions(+), 475 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
index eb5c41808b6..6250d92ab0d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
@@ -24,7 +24,6 @@ import org.apache.doris.common.io.DeepCopy;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
-import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.trees.plans.commands.CreateResourceCommand;
import org.apache.doris.nereids.trees.plans.commands.info.CreateResourceInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -292,20 +291,7 @@ public abstract class Resource implements Writable,
GsonPostProcessable {
private void notifyUpdate(Map<String, String> properties) {
references.entrySet().stream().collect(Collectors.groupingBy(Entry::getValue)).forEach((type,
refs) -> {
if (type == ReferenceType.CATALOG) {
- for (Map.Entry<String, ReferenceType> ref : refs) {
- String catalogName =
ref.getKey().split(REFERENCE_SPLIT)[0];
- CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
- if (catalog == null) {
- LOG.warn("Can't find the reference catalog {} for
resource {}", catalogName, name);
- continue;
- }
- if (!name.equals(catalog.getResource())) {
- LOG.warn("Failed to update catalog {} for different
resource "
- + "names(resource={}, catalog.resource={})",
catalogName, name, catalog.getResource());
- continue;
- }
- catalog.notifyPropertiesUpdated(properties);
- }
+ // No longer support resource in Catalog.
}
});
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
index c888e6e6541..df908cdc2c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
@@ -36,7 +36,6 @@ import
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo
import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -89,10 +88,6 @@ public interface CatalogIf<T extends DatabaseIf> {
Map<String, String> getProperties();
- default String getResource() {
- return null;
- }
-
default void notifyPropertiesUpdated(Map<String, String> updatedProps) {
if (this instanceof ExternalCatalog) {
((ExternalCatalog) this).resetToUninitialized(false);
@@ -175,7 +170,7 @@ public interface CatalogIf<T extends DatabaseIf> {
CatalogLog log = new CatalogLog();
log.setCatalogId(getId());
log.setCatalogName(getName());
- log.setResource(Strings.nullToEmpty(getResource()));
+ log.setResource("");
log.setComment(getComment());
log.setProps(getProperties());
return log;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
index d862a3ef44c..08a08d49d84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
@@ -56,6 +56,7 @@ public class CatalogLog implements Writable {
@SerializedName(value = "invalidCache")
private boolean invalidCache;
+ @Deprecated
@SerializedName(value = "resource")
private String resource;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index bf4d7e4b5e1..f4b2d4cc816 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -23,8 +23,6 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
-import org.apache.doris.catalog.Resource;
-import org.apache.doris.catalog.Resource.ReferenceType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
@@ -121,12 +119,6 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
((ExternalCatalog) catalog).resetToUninitialized(false);
}
- if (!Strings.isNullOrEmpty(catalog.getResource())) {
- Resource resource =
Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource());
- if (resource != null) {
- resource.addReference(catalog.getName(),
ReferenceType.CATALOG);
- }
- }
}
private CatalogIf removeCatalog(long catalogId) {
@@ -139,12 +131,6 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
ConnectContext.get().removeLastDBOfCatalog(catalog.getName());
}
Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId());
- if (!Strings.isNullOrEmpty(catalog.getResource())) {
- Resource catalogResource =
Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource());
- if (catalogResource != null) {
- catalogResource.removeReference(catalog.getName(),
ReferenceType.CATALOG);
- }
- }
Env.getCurrentEnv().getQueryStats().clear(catalog.getId());
}
return catalog;
@@ -436,9 +422,6 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
ConnectContext.get().getQualifiedUser(),
catalog.getName());
}
- if (!Strings.isNullOrEmpty(catalog.getResource())) {
- rows.add(Arrays.asList("resource", catalog.getResource()));
- }
Map<String, String> sortedMap =
getCatalogPropertiesWithPrintable(catalog);
sortedMap.forEach((k, v) -> rows.add(Arrays.asList(k, v)));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
index ebb7c6f292d..da598e982e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
@@ -17,14 +17,11 @@
package org.apache.doris.datasource;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.Resource;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
-import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.MapUtils;
@@ -35,6 +32,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* CatalogProperty to store the properties for catalog.
@@ -43,127 +41,160 @@ import java.util.function.Function;
public class CatalogProperty {
private static final Logger LOG =
LogManager.getLogger(CatalogProperty.class);
+ @Deprecated
@SerializedName(value = "resource")
private String resource;
+
@SerializedName(value = "properties")
private Map<String, String> properties;
+ // Lazy-loaded storage properties map, using volatile to ensure visibility
private volatile Map<StorageProperties.Type, StorageProperties>
storagePropertiesMap;
- private MetastoreProperties metastoreProperties;
+ // Lazy-loaded metastore properties, using volatile to ensure visibility
+ private volatile MetastoreProperties metastoreProperties;
+
+ // Lazy-loaded backend storage properties, using volatile to ensure
visibility
+ private volatile Map<String, String> backendStorageProperties;
- private volatile Resource catalogResource = null;
+ // Lazy-loaded Hadoop properties, using volatile to ensure visibility
+ private volatile Map<String, String> hadoopProperties;
public CatalogProperty(String resource, Map<String, String> properties) {
- this.resource = Strings.nullToEmpty(resource);
+ this.resource = resource; // Keep but not used
this.properties = properties;
if (this.properties == null) {
this.properties = Maps.newConcurrentMap();
}
}
- private Resource catalogResource() {
- if (!Strings.isNullOrEmpty(resource) && catalogResource == null) {
- synchronized (this) {
- if (catalogResource == null) {
- catalogResource =
Env.getCurrentEnv().getResourceMgr().getResource(resource);
- }
- }
- }
- return catalogResource;
- }
-
public String getOrDefault(String key, String defaultVal) {
- String val = properties.get(key);
- if (val == null) {
- Resource res = catalogResource();
- if (res != null) {
- val = res.getCopiedProperties().getOrDefault(key, defaultVal);
- } else {
- val = defaultVal;
- }
- }
- return val;
+ return properties.getOrDefault(key, defaultVal);
}
public Map<String, String> getProperties() {
- Map<String, String> mergedProperties = Maps.newHashMap();
- if (!Strings.isNullOrEmpty(resource)) {
- Resource res = catalogResource();
- if (res != null) {
- mergedProperties = res.getCopiedProperties();
- }
- }
- mergedProperties.putAll(properties);
- return mergedProperties;
- }
-
- public String getResource() {
- return resource;
+ return Maps.newHashMap(properties);
}
public void modifyCatalogProps(Map<String, String> props) {
- properties.putAll(PropertyConverter.convertToMetaProperties(props));
- this.storagePropertiesMap = null;
- }
-
- private void reInitCatalogStorageProperties() {
- List<StorageProperties> storageProperties;
- try {
- storageProperties = StorageProperties.createAll(getProperties());
- this.storagePropertiesMap = (storageProperties.stream()
-
.collect(java.util.stream.Collectors.toMap(StorageProperties::getType,
Function.identity())));
- } catch (UserException e) {
- throw new RuntimeException(e);
+ synchronized (this) {
+
properties.putAll(PropertyConverter.convertToMetaProperties(props));
+ resetAllCaches();
}
-
}
public void rollBackCatalogProps(Map<String, String> props) {
- properties.clear();
- properties = new HashMap<>(props);
- this.storagePropertiesMap = null;
- }
-
-
- public Map<String, String> getHadoopProperties() {
- Map<String, String> hadoopProperties = getProperties();
-
hadoopProperties.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties()));
- return hadoopProperties;
+ synchronized (this) {
+ properties = new HashMap<>(props);
+ resetAllCaches();
+ }
}
public void addProperty(String key, String val) {
- this.properties.put(key, val);
- this.storagePropertiesMap = null; // reset storage properties map
+ synchronized (this) {
+ this.properties.put(key, val);
+ resetAllCaches();
+ }
}
public void deleteProperty(String key) {
- this.properties.remove(key);
+ synchronized (this) {
+ this.properties.remove(key);
+ resetAllCaches();
+ }
+ }
+
+ /**
+ * Unified cache reset method to ensure all caches are properly cleared
+ */
+ private void resetAllCaches() {
this.storagePropertiesMap = null;
+ this.metastoreProperties = null;
+ this.backendStorageProperties = null;
+ this.hadoopProperties = null;
}
+ /**
+ * Get storage properties map with lazy loading, using double-check
locking to ensure thread safety
+ */
public Map<StorageProperties.Type, StorageProperties>
getStoragePropertiesMap() {
if (storagePropertiesMap == null) {
synchronized (this) {
if (storagePropertiesMap == null) {
- reInitCatalogStorageProperties();
+ try {
+ List<StorageProperties> storageProperties =
StorageProperties.createAll(getProperties());
+ this.storagePropertiesMap = storageProperties.stream()
+
.collect(Collectors.toMap(StorageProperties::getType, Function.identity()));
+ } catch (UserException e) {
+ LOG.warn("Failed to initialize catalog storage
properties", e);
+ throw new RuntimeException("Failed to initialize
storage properties for catalog", e);
+ }
}
}
}
return storagePropertiesMap;
}
+ /**
+ * Get metastore properties with lazy loading, using double-check locking
to ensure thread safety
+ */
public MetastoreProperties getMetastoreProperties() {
if (MapUtils.isEmpty(getProperties())) {
return null;
}
+
if (metastoreProperties == null) {
- try {
- metastoreProperties =
MetastoreProperties.create(getProperties());
- } catch (UserException e) {
- throw new RuntimeException(e);
+ synchronized (this) {
+ if (metastoreProperties == null) {
+ try {
+ metastoreProperties =
MetastoreProperties.create(getProperties());
+ } catch (UserException e) {
+ LOG.warn("Failed to create metastore properties", e);
+ throw new RuntimeException("Failed to create metastore
properties", e);
+ }
+ }
}
}
return metastoreProperties;
}
+
+ /**
+ * Get backend storage properties with lazy loading, using double-check
locking to ensure thread safety
+ */
+ public Map<String, String> getBackendStorageProperties() {
+ if (backendStorageProperties == null) {
+ synchronized (this) {
+ if (backendStorageProperties == null) {
+ Map<String, String> result = new HashMap<>();
+ Map<StorageProperties.Type, StorageProperties> storageMap
= getStoragePropertiesMap();
+
+ for (StorageProperties sp : storageMap.values()) {
+ Map<String, String> backendProps =
sp.getBackendConfigProperties();
+ if (backendProps != null) {
+ result.putAll(backendProps);
+ }
+ }
+
+ this.backendStorageProperties = result;
+ }
+ }
+ }
+ return backendStorageProperties;
+ }
+
+ /**
+ * Get Hadoop properties with lazy loading, using double-check locking to
ensure thread safety
+ */
+ public Map<String, String> getHadoopProperties() {
+ if (hadoopProperties == null) {
+ synchronized (this) {
+ if (hadoopProperties == null) {
+ Map<String, String> result = getProperties();
+
result.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties()));
+ this.hadoopProperties = result;
+ }
+ }
+ }
+ return hadoopProperties;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 059226b5f5e..e590b221e37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -675,11 +675,6 @@ public abstract class ExternalCatalog
+ ", table id: " + tableId);
}
- @Override
- public String getResource() {
- return catalogProperty.getResource();
- }
-
@Nullable
@Override
public ExternalDatabase<? extends ExternalTable> getDbNullable(String
dbName) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 5e54673d359..a156184e67c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -601,14 +601,8 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return catalog.getCatalogProperty().getStoragePropertiesMap();
}
- public Map<String, String> getHadoopProperties() {
- return getStoragePropertiesMap().values().stream()
- .flatMap(m ->
m.getBackendConfigProperties().entrySet().stream())
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- Map.Entry::getValue,
- (v1, v2) -> v2));
-
+ public Map<String, String> getBackendStorageProperties() {
+ return catalog.getCatalogProperty().getBackendStorageProperties();
}
public List<ColumnStatisticsObj> getHiveTableColumnStats(List<String>
columns) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 2a0953f052e..bf714c6d89e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -477,8 +477,8 @@ public class HiveScanNode extends FileQueryScanNode {
}
@Override
- protected Map<String, String> getLocationProperties() throws UserException
{
- return hmsTable.getHadoopProperties();
+ protected Map<String, String> getLocationProperties() {
+ return hmsTable.getBackendStorageProperties();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index bb1ca6a25fc..0e17e49f3ae 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -231,12 +231,12 @@ public class HudiScanNode extends HiveScanNode {
}
@Override
- protected Map<String, String> getLocationProperties() throws UserException
{
+ protected Map<String, String> getLocationProperties() {
if (incrementalRead) {
return incrementalRelation.getHoodieParams();
} else {
// HudiJniScanner uses hadoop client to read data.
- return hmsTable.getHadoopProperties();
+ return hmsTable.getBackendStorageProperties();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 2d87889d50e..b1178363b9b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -23,18 +23,14 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
-import org.apache.doris.datasource.property.PropertyConverter;
import
org.apache.doris.datasource.property.metastore.AbstractIcebergProperties;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.transaction.TransactionManagerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.s3a.Constants;
import org.apache.iceberg.catalog.Catalog;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
public abstract class IcebergExternalCatalog extends ExternalCatalog {
@@ -142,11 +138,6 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
}
}
- protected void initS3Param(Configuration conf) {
- Map<String, String> properties = catalogProperty.getHadoopProperties();
- conf.set(Constants.AWS_CREDENTIALS_PROVIDER,
PropertyConverter.getAWSCredentialsProviders(properties));
- }
-
@Override
public boolean viewExists(String dbName, String viewName) {
return metadataOps.viewExists(dbName, viewName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index 85e3b29e1a3..e8e92e60ac1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -361,7 +361,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
jdbcTable.setDriverClass(this.getDriverClass());
jdbcTable.setDriverUrl(this.getDriverUrl());
jdbcTable.setCheckSum(this.getCheckSum());
- jdbcTable.setResourceName(this.getResource());
+ jdbcTable.setResourceName("");
jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
index 00363c1f799..fba59545922 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
@@ -17,38 +17,12 @@
package org.apache.doris.datasource.paimon;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import java.util.Map;
public class PaimonDLFExternalCatalog extends PaimonExternalCatalog {
- private static final Logger LOG =
LogManager.getLogger(PaimonDLFExternalCatalog.class);
public PaimonDLFExternalCatalog(long catalogId, String name, String
resource,
Map<String, String> props, String comment)
{
super(catalogId, name, resource, props, comment);
}
-
- @Override
- protected void initLocalObjectsImpl() {
- super.initLocalObjectsImpl();
- catalogType = PAIMON_DLF;
- catalog = createCatalog();
- }
-
- @Override
- protected void setPaimonCatalogOptions(Map<String, String> properties,
Map<String, String> options) {
- options.put(PaimonProperties.PAIMON_CATALOG_TYPE,
PaimonProperties.PAIMON_HMS_CATALOG);
- options.put(PaimonProperties.PAIMON_METASTORE_CLIENT,
ProxyMetaStoreClient.class.getName());
- options.put(PaimonProperties.PAIMON_OSS_ENDPOINT,
- properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT));
- options.put(PaimonProperties.PAIMON_OSS_ACCESS_KEY,
- properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
- options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
- properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index 93252d61d58..b8decdc032d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -18,38 +18,28 @@
package org.apache.doris.datasource.paimon;
import org.apache.doris.common.DdlException;
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-import
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.SessionContext;
-import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.datasource.property.constants.HMSProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties;
+import org.apache.doris.datasource.property.metastore.MetastoreProperties;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Catalog.TableNotExistException;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public abstract class PaimonExternalCatalog extends ExternalCatalog {
+public class PaimonExternalCatalog extends ExternalCatalog {
private static final Logger LOG =
LogManager.getLogger(PaimonExternalCatalog.class);
public static final String PAIMON_CATALOG_TYPE = "paimon.catalog.type";
public static final String PAIMON_FILESYSTEM = "filesystem";
@@ -57,35 +47,32 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
public static final String PAIMON_DLF = "dlf";
protected String catalogType;
protected Catalog catalog;
- protected AuthenticationConfig authConf;
- protected HadoopAuthenticator hadoopAuthenticator;
- private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
- PaimonProperties.WAREHOUSE
- );
+ private AbstractPaimonProperties paimonProperties;
- public PaimonExternalCatalog(long catalogId, String name, String resource,
- Map<String, String> props, String comment) {
+ public PaimonExternalCatalog(long catalogId, String name, String resource,
Map<String, String> props,
+ String comment) {
super(catalogId, name, InitCatalogLog.Type.PAIMON, comment);
- props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
}
@Override
protected void initLocalObjectsImpl() {
- Configuration conf =
DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth());
- for (Map.Entry<String, String> propEntry :
this.catalogProperty.getHadoopProperties().entrySet()) {
- conf.set(propEntry.getKey(), propEntry.getValue());
+ try {
+ paimonProperties = (AbstractPaimonProperties)
MetastoreProperties.create(catalogProperty.getProperties());
+ } catch (UserException e) {
+ throw new IllegalArgumentException("Failed to create Paimon
properties from catalog properties,exception: "
+ + ExceptionUtils.getRootCauseMessage(e), e);
}
- authConf = AuthenticationConfig.getKerberosConfig(conf);
- hadoopAuthenticator =
HadoopAuthenticator.getHadoopAuthenticator(authConf);
+ catalogType = paimonProperties.getPaimonCatalogType();
+ catalog = createCatalog();
initPreExecutionAuthenticator();
}
@Override
protected synchronized void initPreExecutionAuthenticator() {
if (executionAuthenticator == null) {
- executionAuthenticator = new
HadoopExecutionAuthenticator(hadoopAuthenticator);
+ executionAuthenticator =
paimonProperties.getExecutionAuthenticator();
}
}
@@ -96,8 +83,8 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
protected List<String> listDatabaseNames() {
try {
- return hadoopAuthenticator.doAs(() -> new
ArrayList<>(catalog.listDatabases()));
- } catch (IOException e) {
+ return executionAuthenticator.execute(() -> new
ArrayList<>(catalog.listDatabases()));
+ } catch (Exception e) {
throw new RuntimeException("Failed to list databases names,
catalog name: " + getName(), e);
}
}
@@ -106,7 +93,7 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
makeSureInitialized();
try {
- return hadoopAuthenticator.doAs(() -> {
+ return executionAuthenticator.execute(() -> {
try {
catalog.getTable(Identifier.create(dbName, tblName));
return true;
@@ -115,8 +102,9 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
}
});
- } catch (IOException e) {
- throw new RuntimeException("Failed to check table existence,
catalog name: " + getName(), e);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to check table existence,
catalog name: " + getName()
+ + "error message is:" +
ExceptionUtils.getRootCauseMessage(e), e);
}
}
@@ -124,7 +112,7 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
try {
- return hadoopAuthenticator.doAs(() -> {
+ return executionAuthenticator.execute(() -> {
List<String> tableNames = null;
try {
tableNames = catalog.listTables(dbName);
@@ -133,7 +121,7 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
}
return tableNames;
});
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException("Failed to list table names, catalog
name: " + getName(), e);
}
}
@@ -141,32 +129,31 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
public org.apache.paimon.table.Table getPaimonTable(NameMapping
nameMapping) {
makeSureInitialized();
try {
- return hadoopAuthenticator.doAs(() -> catalog.getTable(
- Identifier.create(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName())));
+ return executionAuthenticator.execute(() ->
catalog.getTable(Identifier.create(nameMapping
+ .getRemoteDbName(), nameMapping.getRemoteTblName())));
} catch (Exception e) {
- throw new RuntimeException("Failed to get Paimon table:" +
getName() + "."
- + nameMapping.getLocalDbName() + "." +
nameMapping.getLocalTblName() + ", because "
- + e.getMessage(), e);
+ throw new RuntimeException("Failed to get Paimon table:" +
getName() + "." + nameMapping.getLocalDbName()
+ + "." + nameMapping.getLocalTblName() + ", because " +
ExceptionUtils.getRootCauseMessage(e), e);
}
}
public List<Partition> getPaimonPartitions(NameMapping nameMapping) {
makeSureInitialized();
try {
- return hadoopAuthenticator.doAs(() -> {
+ return executionAuthenticator.execute(() -> {
List<Partition> partitions = new ArrayList<>();
try {
- partitions = catalog.listPartitions(
- Identifier.create(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName()));
+ partitions =
catalog.listPartitions(Identifier.create(nameMapping.getRemoteDbName(),
+ nameMapping.getRemoteTblName()));
} catch (Catalog.TableNotExistException e) {
LOG.warn("TableNotExistException", e);
}
return partitions;
});
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException("Failed to get Paimon table
partitions:" + getName() + "."
- + nameMapping.getRemoteDbName() + "." +
nameMapping.getRemoteTblName()
- + ", because " + e.getMessage(), e);
+ + nameMapping.getRemoteDbName() + "." +
nameMapping.getRemoteTblName() + ", because "
+ + ExceptionUtils.getRootCauseMessage(e), e);
}
}
@@ -174,79 +161,44 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
return getPaimonSystemTable(nameMapping, null, queryType);
}
- public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping
nameMapping, String branch,
- String queryType) {
+ public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping
nameMapping,
+ String branch,
String queryType) {
makeSureInitialized();
try {
- return hadoopAuthenticator.doAs(() -> catalog.getTable(new
Identifier(nameMapping.getRemoteDbName(),
+ return executionAuthenticator.execute(() -> catalog.getTable(new
Identifier(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName(), branch, queryType)));
} catch (Exception e) {
throw new RuntimeException("Failed to get Paimon system table:" +
getName() + "."
+ nameMapping.getRemoteDbName() + "." +
nameMapping.getRemoteTblName() + "$" + queryType
- + ", because " + e.getMessage(), e);
+ + ", because " + ExceptionUtils.getRootCauseMessage(e), e);
}
}
- protected String getPaimonCatalogType(String catalogType) {
- if (PAIMON_HMS.equalsIgnoreCase(catalogType)) {
- return PaimonProperties.PAIMON_HMS_CATALOG;
- } else {
- return PaimonProperties.PAIMON_FILESYSTEM_CATALOG;
- }
- }
protected Catalog createCatalog() {
try {
- return hadoopAuthenticator.doAs(() -> {
- Options options = new Options();
- Map<String, String> paimonOptionsMap = getPaimonOptionsMap();
- for (Map.Entry<String, String> kv :
paimonOptionsMap.entrySet()) {
- options.set(kv.getKey(), kv.getValue());
- }
- CatalogContext context = CatalogContext.create(options,
getConfiguration());
- return createCatalogImpl(context);
- });
- } catch (IOException e) {
- throw new RuntimeException("Failed to create catalog, catalog
name: " + getName(), e);
+ return paimonProperties.initializeCatalog(getName(), new
ArrayList<>(catalogProperty
+ .getStoragePropertiesMap().values()));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create catalog, catalog
name: " + getName() + ", exception: "
+ + ExceptionUtils.getRootCauseMessage(e), e);
}
}
- protected Catalog createCatalogImpl(CatalogContext context) {
- return CatalogFactory.createCatalog(context);
- }
-
public Map<String, String> getPaimonOptionsMap() {
- Map<String, String> properties = catalogProperty.getHadoopProperties();
- Map<String, String> options = Maps.newHashMap();
- options.put(PaimonProperties.WAREHOUSE,
properties.get(PaimonProperties.WAREHOUSE));
- setPaimonCatalogOptions(properties, options);
- setPaimonExtraOptions(properties, options);
- return options;
- }
-
- protected abstract void setPaimonCatalogOptions(Map<String, String>
properties, Map<String, String> options);
-
- protected void setPaimonExtraOptions(Map<String, String> properties,
Map<String, String> options) {
- for (Map.Entry<String, String> kv : properties.entrySet()) {
- if (kv.getKey().startsWith(PaimonProperties.PAIMON_PREFIX)) {
-
options.put(kv.getKey().substring(PaimonProperties.PAIMON_PREFIX.length()),
kv.getValue());
- }
- }
-
- // hive version.
- // This property is used for both FE and BE, so it has no "paimon."
prefix.
- // We need to handle it separately.
- if (properties.containsKey(HMSProperties.HIVE_VERSION)) {
- options.put(HMSProperties.HIVE_VERSION,
properties.get(HMSProperties.HIVE_VERSION));
- }
+ makeSureInitialized();
+ return paimonProperties.getCatalogOptionsMap();
}
@Override
public void checkProperties() throws DdlException {
- super.checkProperties();
- for (String requiredProperty : REQUIRED_PROPERTIES) {
- if
(!catalogProperty.getProperties().containsKey(requiredProperty)) {
- throw new DdlException("Required property '" +
requiredProperty + "' is missing");
+ if (null != paimonProperties) {
+ try {
+ this.paimonProperties = (AbstractPaimonProperties)
MetastoreProperties
+ .create(catalogProperty.getProperties());
+ } catch (UserException e) {
+ throw new DdlException("Failed to create Paimon properties
from catalog properties, exception: "
+ + ExceptionUtils.getRootCauseMessage(e), e);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
index 53e790d8c9e..b904ac169ff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
@@ -19,7 +19,6 @@ package org.apache.doris.datasource.paimon;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.commons.lang3.StringUtils;
@@ -36,12 +35,9 @@ public class PaimonExternalCatalogFactory {
metastoreType = metastoreType.toLowerCase();
switch (metastoreType) {
case PaimonExternalCatalog.PAIMON_HMS:
- return new PaimonHMSExternalCatalog(catalogId, name, resource,
props, comment);
case PaimonExternalCatalog.PAIMON_FILESYSTEM:
- return new PaimonFileExternalCatalog(catalogId, name,
resource, props, comment);
case PaimonExternalCatalog.PAIMON_DLF:
- props.put(HMSProperties.HIVE_METASTORE_TYPE,
HMSProperties.DLF_TYPE);
- return new PaimonDLFExternalCatalog(catalogId, name, resource,
props, comment);
+ return new PaimonExternalCatalog(catalogId, name, resource,
props, comment);
default:
throw new DdlException("Unknown " +
PaimonExternalCatalog.PAIMON_CATALOG_TYPE
+ " value: " + metastoreType);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
index e74f3deeaf5..c2c3e139567 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
@@ -17,77 +17,12 @@
package org.apache.doris.datasource.paimon;
-import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.datasource.property.constants.CosProperties;
-import org.apache.doris.datasource.property.constants.ObsProperties;
-import org.apache.doris.datasource.property.constants.OssProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import java.util.Map;
-
public class PaimonFileExternalCatalog extends PaimonExternalCatalog {
- private static final Logger LOG =
LogManager.getLogger(PaimonFileExternalCatalog.class);
public PaimonFileExternalCatalog(long catalogId, String name, String
resource,
Map<String, String> props, String comment) {
super(catalogId, name, resource, props, comment);
}
-
- @Override
- protected void initLocalObjectsImpl() {
- super.initLocalObjectsImpl();
- catalogType = PAIMON_FILESYSTEM;
- catalog = createCatalog();
- }
-
- @Override
- protected void setPaimonCatalogOptions(Map<String, String> properties,
Map<String, String> options) {
- if (properties.containsKey(PaimonProperties.PAIMON_S3_ENDPOINT)) {
- options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
- properties.get(PaimonProperties.PAIMON_S3_ENDPOINT));
- options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
- properties.get(PaimonProperties.PAIMON_S3_ACCESS_KEY));
- options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
- properties.get(PaimonProperties.PAIMON_S3_SECRET_KEY));
- } else if
(properties.containsKey(PaimonProperties.PAIMON_OSS_ENDPOINT)) {
- boolean hdfsEnabled = Boolean.parseBoolean(properties.getOrDefault(
- OssProperties.OSS_HDFS_ENABLED, "false"));
- if
(!LocationPath.isHdfsOnOssEndpoint(properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT))
- && !hdfsEnabled) {
- options.put(PaimonProperties.PAIMON_OSS_ENDPOINT,
- properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT));
- options.put(PaimonProperties.PAIMON_OSS_ACCESS_KEY,
-
properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
- options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
-
properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
- }
- } else if (properties.containsKey(CosProperties.ENDPOINT)) {
- options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
- properties.get(CosProperties.ENDPOINT));
- options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
- properties.get(CosProperties.ACCESS_KEY));
- options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
- properties.get(CosProperties.SECRET_KEY));
- options.put(PaimonProperties.WAREHOUSE,
- options.get(PaimonProperties.WAREHOUSE).replace("cosn://",
"s3://"));
- } else if (properties.containsKey(ObsProperties.ENDPOINT)) {
- options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
- properties.get(ObsProperties.ENDPOINT));
- options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
- properties.get(ObsProperties.ACCESS_KEY));
- options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
- properties.get(ObsProperties.SECRET_KEY));
- options.put(PaimonProperties.WAREHOUSE,
- options.get(PaimonProperties.WAREHOUSE).replace("obs://",
"s3://"));
- }
-
- if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
- options.put(PaimonProperties.S3_PATH_STYLE,
properties.get(PropertyConverter.USE_PATH_STYLE));
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
index fb27bb56696..ae5cf7073dd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
@@ -17,48 +17,12 @@
package org.apache.doris.datasource.paimon;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.datasource.property.constants.HMSProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
import java.util.Map;
public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {
- private static final Logger LOG =
LogManager.getLogger(PaimonHMSExternalCatalog.class);
- private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
- HMSProperties.HIVE_METASTORE_URIS
- );
public PaimonHMSExternalCatalog(long catalogId, String name, String
resource,
Map<String, String> props, String comment) {
super(catalogId, name, resource, props, comment);
}
-
- @Override
- protected void initLocalObjectsImpl() {
- super.initLocalObjectsImpl();
- catalogType = PAIMON_HMS;
- catalog = createCatalog();
- }
-
- @Override
- protected void setPaimonCatalogOptions(Map<String, String> properties,
Map<String, String> options) {
- options.put(PaimonProperties.PAIMON_CATALOG_TYPE,
getPaimonCatalogType(catalogType));
- options.put(PaimonProperties.HIVE_METASTORE_URIS,
properties.get(HMSProperties.HIVE_METASTORE_URIS));
- }
-
- @Override
- public void checkProperties() throws DdlException {
- super.checkProperties();
- for (String requiredProperty : REQUIRED_PROPERTIES) {
- if
(!catalogProperty.getProperties().containsKey(requiredProperty)) {
- throw new DdlException("Required property '" +
requiredProperty + "' is missing");
- }
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 0d0bf8d3b32..9eecaa20d35 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -139,9 +139,9 @@ public class PaimonScanNode extends FileQueryScanNode {
protected ConcurrentHashMap<Long, Boolean> currentQuerySchema = new
ConcurrentHashMap<>();
public PaimonScanNode(PlanNodeId id,
- TupleDescriptor desc,
- boolean needCheckColumnPriv,
- SessionVariable sv) {
+ TupleDescriptor desc,
+ boolean needCheckColumnPriv,
+ SessionVariable sv) {
super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE,
needCheckColumnPriv, sv);
}
@@ -421,14 +421,8 @@ public class PaimonScanNode extends FileQueryScanNode {
}
@Override
- public Map<String, String> getLocationProperties() throws
MetaNotFoundException, DdlException {
- HashMap<String, String> map = new
HashMap<>(source.getCatalog().getProperties());
-
source.getCatalog().getCatalogProperty().getHadoopProperties().forEach((k, v)
-> {
- if (!map.containsKey(k)) {
- map.put(k, v);
- }
- });
- return map;
+ protected Map<String, String> getLocationProperties() {
+ return
source.getCatalog().getCatalogProperty().getBackendStorageProperties();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
new file mode 100644
index 00000000000..b5e12a4970c
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.S3Properties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractPaimonProperties extends MetastoreProperties {
+ @ConnectorProperty(
+ names = {"warehouse"},
+ description = "The location of the Paimon warehouse. This is where
the tables will be stored."
+ )
+ protected String warehouse;
+
+ @Getter
+ protected ExecutionAuthenticator executionAuthenticator = new
ExecutionAuthenticator() {
+ };
+
+ @Getter
+ protected Options catalogOptions;
+
+ private final AtomicReference<Map<String, String>> catalogOptionsMapRef =
new AtomicReference<>();
+
+ public abstract String getPaimonCatalogType();
+
+ private static final String USER_PROPERTY_PREFIX = "paimon.";
+
+ protected AbstractPaimonProperties(Map<String, String> props) {
+ super(props);
+ }
+
+ public abstract Catalog initializeCatalog(String catalogName,
List<StorageProperties> storagePropertiesList);
+
+ /**
+ * Adapt S3 storage properties for Apache Paimon's S3 file system.
+ *
+ * <p>Paimon's S3 file system does not follow the standard
Hadoop-compatible
+ * configuration keys (like fs.s3a.access.key). Instead, it expects
specific
+ * keys such as "s3.access.key", "s3.secret.key", etc.
+ *
+ * <p>Therefore, we explicitly map our internal S3 configuration (usually
designed
+ * for HDFS-compatible systems) to Paimon's expected format.
+ *
+ * <p>See: org.apache.paimon.s3.S3Loader
+ *
+ * @param storagePropertiesList the list of configured storage backends
+ */
+ protected void appendS3PropertiesIsNeeded(List<StorageProperties>
storagePropertiesList) {
+
+ S3Properties s3Properties = (S3Properties)
storagePropertiesList.stream()
+ .filter(storageProperties -> storageProperties.getType() ==
StorageProperties.Type.S3)
+ .findFirst()
+ .orElse(null);
+ if (s3Properties != null) {
+ catalogOptions.set("s3.access.key", s3Properties.getSecretKey());
+ catalogOptions.set("s3.secret.key", s3Properties.getAccessKey());
+ catalogOptions.set("s3.endpoint", s3Properties.getEndpoint());
+ catalogOptions.set("s3.region", s3Properties.getRegion());
+ }
+
+ }
+
+ protected void appendCatalogOptions(List<StorageProperties>
storagePropertiesList) {
+ if (StringUtils.isNotBlank(warehouse)) {
+ catalogOptions.set(CatalogOptions.WAREHOUSE.key(), warehouse);
+ }
+ catalogOptions.set(CatalogOptions.METASTORE.key(), getMetastoreType());
+ origProps.forEach((k, v) -> {
+ if (k.toLowerCase().startsWith(USER_PROPERTY_PREFIX)) {
+ String newKey = k.substring(USER_PROPERTY_PREFIX.length());
+ if (StringUtils.isNotBlank(newKey)) {
+ catalogOptions.set(newKey, v);
+ }
+ }
+ });
+ appendS3PropertiesIsNeeded(storagePropertiesList);
+ }
+
+ /**
+ * Build catalog options including common and subclass-specific ones.
+ */
+ public void buildCatalogOptions(List<StorageProperties>
storagePropertiesList) {
+ catalogOptions = new Options();
+ appendCatalogOptions(storagePropertiesList);
+ appendCustomCatalogOptions();
+ }
+
+ public Map<String, String> getCatalogOptionsMap() {
+ // Return the cached map if already initialized
+ Map<String, String> existing = catalogOptionsMapRef.get();
+ if (existing != null) {
+ return existing;
+ }
+
+ // Check that the catalog options source is available
+ if (catalogOptions == null) {
+ throw new IllegalStateException("Catalog options have not been
initialized. Call"
+ + " buildCatalogOptions first.");
+ }
+
+ // Construct the map manually using the provided keys
+ Map<String, String> computed = new HashMap<>();
+ for (String key : catalogOptions.keySet()) {
+ computed.put(key, catalogOptions.get(key));
+ }
+
+ // Attempt to set the constructed map atomically; only one thread wins
+ if (catalogOptionsMapRef.compareAndSet(null, computed)) {
+ return computed;
+ } else {
+ // Another thread already initialized it; return the existing one
+ return catalogOptionsMapRef.get();
+ }
+ }
+
+
+ /**
+ * Hook method for subclasses to append metastore-specific or custom
catalog options.
+ *
+ * <p>This method is invoked after common catalog options (e.g., warehouse
path,
+ * metastore type, user-defined keys, and S3 compatibility mappings) have
been
+ * added to the {@link org.apache.paimon.options.Options} instance.
+ *
+ * <p>Subclasses should override this method to inject additional
configuration
+ * required for their specific metastore or environment. For example:
+ *
+ * <ul>
+ * <li>DLF-based catalog may require a custom metastore client
class.</li>
+ * <li>HMS-based catalog may include URI and client pool parameters.</li>
+ * <li>Other environments may inject authentication, endpoint, or
caching options.</li>
+ * </ul>
+ *
+ * <p>If the subclass does not require any special options beyond the
common ones,
+ * it can safely leave this method empty.
+ */
+ protected abstract void appendCustomCatalogOptions();
+
+ /**
+ * Returns the metastore type identifier used by the Paimon catalog
factory.
+ *
+ * <p>This identifier must match one of the known metastore types
supported by
+ * Apache Paimon. Internally, the value returned here is used to configure
the
+ * `metastore` option in {@code Options}, which determines the specific
+ * {@link org.apache.paimon.catalog.CatalogFactory} implementation to be
used
+ * when instantiating the catalog.
+ *
+ * <p>You can find valid identifiers by reviewing implementations of the
+ * {@link org.apache.paimon.catalog.CatalogFactory} interface. Each
implementation
+ * declares its identifier via a static {@code IDENTIFIER} field or
equivalent constant.
+ *
+ * <p>Examples:
+ * <ul>
+ * <li>{@code "filesystem"} - for {@link
org.apache.paimon.catalog.FileSystemCatalogFactory}</li>
+ * <li>{@code "hive"} - for {@link
org.apache.paimon.hive.HiveCatalogFactory}</li>
+ * </ul>
+ *
+ * @return the metastore type identifier string
+ */
+ protected abstract String getMetastoreType();
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
index 3e5e2273b08..48cba3c0bcc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
@@ -46,6 +46,7 @@ public class MetastoreProperties extends ConnectionProperties
{
public enum Type {
HMS("hms"),
ICEBERG("iceberg"),
+ PAIMON("paimon"),
GLUE("glue"),
DLF("dlf"),
DATAPROC("dataproc"),
@@ -83,6 +84,7 @@ public class MetastoreProperties extends ConnectionProperties
{
//subclasses should be registered here
register(Type.HMS, new HMSPropertiesFactory());
register(Type.ICEBERG, new IcebergPropertiesFactory());
+ register(Type.PAIMON, new PaimonPropertiesFactory());
}
public static void register(Type type, MetastorePropertiesFactory factory)
{
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
new file mode 100644
index 00000000000..2f65dbad9d8
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.storage.OSSProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import com.aliyun.datalake.metastore.common.DataLakeConfig;
+import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.hive.HiveCatalogOptions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * PaimonAliyunDLFMetaStoreProperties
+ *
+ * <p>This class provides configuration support for using Apache Paimon with
+ * Aliyun Data Lake Formation (DLF) as the metastore. Although DLF is not an
+ * officially supported metastore type in Paimon, this implementation adapts
+ * DLF by treating it as a Hive Metastore (HMS) underneath, enabling
+ * interoperability with Paimon's HiveCatalog.
+ *
+ * <p>Key Characteristics:
+ * <ul>
+ * <li>Internally uses HiveCatalog with custom HiveConf configured for
Aliyun DLF.</li>
+ * <li>Relies on {@link ProxyMetaStoreClient} to bridge DLF
compatibility.</li>
+ * <li>Requires Aliyun OSS as the storage backend. Other storage types are
not
+ * currently verified for compatibility.</li>
+ * </ul>
+ *
+ * <p>Note: This is an internal extension and not an officially supported
Paimon
+ * metastore type. Future compatibility should be validated when upgrading
Paimon
+ * or changing storage backends.
+ *
+ * @see org.apache.paimon.hive.HiveCatalog
+ * @see org.apache.paimon.catalog.CatalogFactory
+ * @see ProxyMetaStoreClient
+ */
+public class PaimonAliyunDLFMetaStoreProperties extends
AbstractPaimonProperties {
+
+ private AliyunDLFBaseProperties baseProperties;
+
+ protected PaimonAliyunDLFMetaStoreProperties(Map<String, String> props) {
+ super(props);
+ }
+
+ @Override
+ public void initNormalizeAndCheckProps() {
+ super.initNormalizeAndCheckProps();
+ baseProperties = AliyunDLFBaseProperties.of(origProps);
+ }
+
+ private HiveConf buildHiveConf() {
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.set(DataLakeConfig.CATALOG_ACCESS_KEY_ID,
baseProperties.dlfAccessKey);
+ hiveConf.set(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET,
baseProperties.dlfSecretKey);
+ hiveConf.set(DataLakeConfig.CATALOG_ENDPOINT,
baseProperties.dlfEndpoint);
+ hiveConf.set(DataLakeConfig.CATALOG_REGION_ID,
baseProperties.dlfRegion);
+ hiveConf.set(DataLakeConfig.CATALOG_SECURITY_TOKEN,
baseProperties.dlfSessionToken);
+ hiveConf.set(DataLakeConfig.CATALOG_USER_ID, baseProperties.dlfUid);
+ hiveConf.set(DataLakeConfig.CATALOG_ID, baseProperties.dlfCatalogId);
+ hiveConf.set(DataLakeConfig.CATALOG_PROXY_MODE,
baseProperties.dlfProxyMode);
+ return hiveConf;
+ }
+
+ @Override
+ public Catalog initializeCatalog(String catalogName,
List<StorageProperties> storagePropertiesList) {
+ HiveConf hiveConf = buildHiveConf();
+ buildCatalogOptions(storagePropertiesList);
+ StorageProperties ossProps = storagePropertiesList.stream()
+ .filter(sp -> sp.getType() == StorageProperties.Type.OSS)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("Paimon DLF
metastore requires OSS storage properties."));
+
+ if (!(ossProps instanceof OSSProperties)) {
+ throw new IllegalStateException("Expected OSSProperties type.");
+ }
+ OSSProperties ossProperties = (OSSProperties) ossProps;
+ for (Map.Entry<String, String> entry :
ossProperties.getHadoopStorageConfig()) {
+ catalogOptions.set(entry.getKey(), entry.getValue());
+ }
+ hiveConf.addResource(ossProperties.getHadoopStorageConfig());
+ CatalogContext catalogContext = CatalogContext.create(catalogOptions,
hiveConf);
+ return CatalogFactory.createCatalog(catalogContext);
+ }
+
+
+ @Override
+ protected void appendCustomCatalogOptions() {
+ catalogOptions.set("metastore.client.class",
ProxyMetaStoreClient.class.getName());
+ }
+
+ @Override
+ protected String getMetastoreType() {
+ return HiveCatalogOptions.IDENTIFIER;
+ }
+
+ @Override
+ public String getPaimonCatalogType() {
+ return PaimonExternalCatalog.PAIMON_DLF;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
new file mode 100644
index 00000000000..adfa53eab94
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.storage.HdfsProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.FileSystemCatalogFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class PaimonFileSystemMetaStoreProperties extends
AbstractPaimonProperties {
+ protected PaimonFileSystemMetaStoreProperties(Map<String, String> props) {
+ super(props);
+ }
+
+ @Override
+ public Catalog initializeCatalog(String catalogName,
List<StorageProperties> storagePropertiesList) {
+ buildCatalogOptions(storagePropertiesList);
+ Configuration conf = new Configuration(false);
+ storagePropertiesList.forEach(storageProperties -> {
+ for (Map.Entry<String, String> entry :
storageProperties.getHadoopStorageConfig()) {
+ catalogOptions.set(entry.getKey(), entry.getValue());
+ }
+ conf.addResource(storageProperties.getHadoopStorageConfig());
+ if
(storageProperties.getType().equals(StorageProperties.Type.HDFS)) {
+ this.executionAuthenticator = new
HadoopExecutionAuthenticator(((HdfsProperties) storageProperties)
+ .getHadoopAuthenticator());
+ }
+ });
+
+ CatalogContext catalogContext = CatalogContext.create(catalogOptions,
conf);
+ try {
+ return this.executionAuthenticator.execute(() ->
CatalogFactory.createCatalog(catalogContext));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void appendCustomCatalogOptions() {
+ //nothing need to do
+ }
+
+ @Override
+ protected String getMetastoreType() {
+ return FileSystemCatalogFactory.IDENTIFIER;
+ }
+
+ @Override
+ public String getPaimonCatalogType() {
+ return PaimonExternalCatalog.PAIMON_FILESYSTEM;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
new file mode 100644
index 00000000000..7c2d2ee79a3
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.hive.HiveCatalogOptions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class PaimonHMSMetaStoreProperties extends AbstractPaimonProperties {
+
+ private HMSBaseProperties hmsBaseProperties;
+
+ private static final String CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY =
"client-pool-cache.eviction-interval-ms";
+
+ private static final String LOCATION_IN_PROPERTIES_KEY =
"location-in-properties";
+
+ @ConnectorProperty(
+ names = {CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY},
+ required = false,
+ description = "Setting the client's pool cache eviction
interval(ms).")
+ private long clientPoolCacheEvictionIntervalMs =
TimeUnit.MINUTES.toMillis(5L);
+
+ @ConnectorProperty(
+ names = {LOCATION_IN_PROPERTIES_KEY},
+ required = false,
+ description = "Setting whether to use the location in the
properties.")
+ private boolean locationInProperties = false;
+
+
+ @Override
+ public String getPaimonCatalogType() {
+ return PaimonExternalCatalog.PAIMON_DLF;
+ }
+
+ protected PaimonHMSMetaStoreProperties(Map<String, String> props) {
+ super(props);
+ }
+
+ @Override
+ public void initNormalizeAndCheckProps() {
+ super.initNormalizeAndCheckProps();
+ hmsBaseProperties = HMSBaseProperties.of(origProps);
+ hmsBaseProperties.initAndCheckParams();
+ this.executionAuthenticator = new
HadoopExecutionAuthenticator(hmsBaseProperties.getHmsAuthenticator());
+ }
+
+
+ /**
+ * Builds the Hadoop Configuration by adding hive-site.xml and
storage-specific configs.
+ */
+ private Configuration buildHiveConfiguration(List<StorageProperties>
storagePropertiesList) {
+ Configuration conf = hmsBaseProperties.getHiveConf();
+
+ for (StorageProperties sp : storagePropertiesList) {
+ if (sp.getHadoopStorageConfig() != null) {
+ conf.addResource(sp.getHadoopStorageConfig());
+ }
+ }
+ return conf;
+ }
+
+ @Override
+ public Catalog initializeCatalog(String catalogName,
List<StorageProperties> storagePropertiesList) {
+ Configuration conf = buildHiveConfiguration(storagePropertiesList);
+ buildCatalogOptions(storagePropertiesList);
+ for (Map.Entry<String, String> entry : conf) {
+ catalogOptions.set(entry.getKey(), entry.getValue());
+ }
+ CatalogContext catalogContext = CatalogContext.create(catalogOptions,
conf);
+ try {
+ return executionAuthenticator.execute(() ->
CatalogFactory.createCatalog(catalogContext));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create Paimon catalog with
HMS metastore", e);
+ }
+
+ }
+
+ @Override
+ protected String getMetastoreType() {
+ //See org.apache.paimon.hive.HiveCatalogFactory
+ return HiveCatalogOptions.IDENTIFIER;
+ }
+
+ @Override
+ protected void appendCustomCatalogOptions() {
+ catalogOptions.set(CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY,
+ String.valueOf(clientPoolCacheEvictionIntervalMs));
+ catalogOptions.set(LOCATION_IN_PROPERTIES_KEY,
String.valueOf(locationInProperties));
+ catalogOptions.set("uri", hmsBaseProperties.getHiveMetastoreUri());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
new file mode 100644
index 00000000000..12dac3bb739
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import java.util.Map;
+
+public class PaimonPropertiesFactory extends
AbstractMetastorePropertiesFactory {
+
+ private static final String KEY = "paimon.catalog.type";
+ private static final String DEFAULT_TYPE = "filesystem";
+
+ public PaimonPropertiesFactory() {
+ register("dlf", PaimonAliyunDLFMetaStoreProperties::new);
+ register("filesystem", PaimonFileSystemMetaStoreProperties::new);
+ register("hms", PaimonHMSMetaStoreProperties::new);
+ }
+
+ @Override
+ public MetastoreProperties create(Map<String, String> props) {
+ return createInternal(props, KEY, DEFAULT_TYPE);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
index 8cec9d6a63d..b36e4309701 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
@@ -24,6 +24,7 @@ import com.google.common.base.Strings;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
@@ -257,8 +258,23 @@ public abstract class AbstractS3CompatibleProperties
extends StorageProperties i
@Override
public void initializeHadoopStorageConfig() {
- throw new UnsupportedOperationException("Hadoop storage config
initialization is not"
- + " supported for S3 compatible storage.");
+ hadoopStorageConfig = new Configuration();
+ // Compatibility note: Due to historical reasons, even when the
underlying
+ // storage is OSS, OBS, etc., users may still configure the schema as
"s3://".
+ // To ensure backward compatibility, we append S3-related properties
by default.
+ appendS3HdfsProperties(hadoopStorageConfig);
+ }
+
+ private void appendS3HdfsProperties(Configuration hadoopStorageConfig) {
+ hadoopStorageConfig.set("fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+ hadoopStorageConfig.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+ hadoopStorageConfig.set("fs.s3a.endpoint", getEndpoint());
+ hadoopStorageConfig.set("fs.s3a.access.key", getAccessKey());
+ hadoopStorageConfig.set("fs.s3a.secret.key", getSecretKey());
+ hadoopStorageConfig.set("fs.s3a.connection.maximum",
getMaxConnections());
+ hadoopStorageConfig.set("fs.s3a.connection.request.timeout",
getRequestTimeoutS());
+ hadoopStorageConfig.set("fs.s3a.connection.timeout",
getConnectionTimeoutS());
+ hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
index a2ae545f048..6fea7cf18ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -130,17 +129,8 @@ public class COSProperties extends
AbstractS3CompatibleProperties {
@Override
public void initializeHadoopStorageConfig() {
- hadoopStorageConfig = new Configuration();
- hadoopStorageConfig.set("fs.cos.impl",
"org.apache.hadoop.fs.CosFileSystem");
- hadoopStorageConfig.set("fs.cosn.impl",
"org.apache.hadoop.fs.CosFileSystem");
- hadoopStorageConfig.set("fs.AbstractFileSystem.cos.impl",
"org.apache.hadoop.fs.Cos");
- hadoopStorageConfig.set("fs.cos.secretId", accessKey);
- hadoopStorageConfig.set("fs.cos.secretKey", secretKey);
- hadoopStorageConfig.set("fs.cos.region", region);
- hadoopStorageConfig.set("fs.cos.endpoint", endpoint);
- hadoopStorageConfig.set("fs.cos.connection.timeout",
connectionTimeoutS);
- hadoopStorageConfig.set("fs.cos.connection.request.timeout",
requestTimeoutS);
- hadoopStorageConfig.set("fs.cos.connection.maximum", maxConnections);
- hadoopStorageConfig.set("fs.cos.use.path.style", usePathStyle);
+ super.initializeHadoopStorageConfig();
+ hadoopStorageConfig.set("fs.cos.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+ hadoopStorageConfig.set("fs.cosn.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
index 0f5349e1452..75f9fc85881 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import java.util.Map;
import java.util.Set;
@@ -95,18 +94,4 @@ public class MinioProperties extends
AbstractS3CompatibleProperties {
protected Set<Pattern> endpointPatterns() {
return
ImmutableSet.of(Pattern.compile("^(?:https?://)?[a-zA-Z0-9.-]+(?::\\d+)?$"));
}
-
- @Override
- public void initializeHadoopStorageConfig() {
- hadoopStorageConfig = new Configuration();
- hadoopStorageConfig.set("fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
- hadoopStorageConfig.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
- hadoopStorageConfig.set("fs.s3a.endpoint", endpoint);
- hadoopStorageConfig.set("fs.s3a.access.key", accessKey);
- hadoopStorageConfig.set("fs.s3a.secret.key", secretKey);
- hadoopStorageConfig.set("fs.s3a.connection.maximum", maxConnections);
- hadoopStorageConfig.set("fs.s3a.connection.request.timeout",
requestTimeoutS);
- hadoopStorageConfig.set("fs.s3a.connection.timeout",
connectionTimeoutS);
- hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
index 051dd9a927c..513727416ef 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -135,13 +134,7 @@ public class OBSProperties extends
AbstractS3CompatibleProperties {
@Override
public void initializeHadoopStorageConfig() {
- hadoopStorageConfig = new Configuration();
- hadoopStorageConfig.set("fs.obs.impl",
"com.obs.services.hadoop.fs.OBSFileSystem");
- hadoopStorageConfig.set("fs.obs.access.key", accessKey);
- hadoopStorageConfig.set("fs.obs.secret.key", secretKey);
- hadoopStorageConfig.set("fs.obs.endpoint", endpoint);
- hadoopStorageConfig.set("fs.obs.connection.timeout",
connectionTimeoutS);
- hadoopStorageConfig.set("fs.obs.request.timeout", requestTimeoutS);
- hadoopStorageConfig.set("fs.obs.connection.max", maxConnections);
+ super.initializeHadoopStorageConfig();
+ hadoopStorageConfig.set("fs.obs.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
index 1e34311f9e9..7b27a1ea160 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -172,15 +171,7 @@ public class OSSProperties extends
AbstractS3CompatibleProperties {
@Override
public void initializeHadoopStorageConfig() {
- hadoopStorageConfig = new Configuration();
- hadoopStorageConfig.set("fs.oss.impl",
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
- hadoopStorageConfig.set("fs.oss.endpoint", endpoint);
- hadoopStorageConfig.set("fs.oss.region", region);
- hadoopStorageConfig.set("fs.oss.accessKeyId", accessKey);
- hadoopStorageConfig.set("fs.oss.accessKeySecret", secretKey);
- hadoopStorageConfig.set("fs.oss.connection.timeout",
connectionTimeoutS);
- hadoopStorageConfig.set("fs.oss.connection.max", maxConnections);
- hadoopStorageConfig.set("fs.oss.connection.request.timeout",
requestTimeoutS);
- hadoopStorageConfig.set("fs.oss.use.path.style.access", usePathStyle);
+ super.initializeHadoopStorageConfig();
+ hadoopStorageConfig.set("fs.oss.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index e664edc6188..19a9195ec2f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -23,11 +23,9 @@ import
org.apache.doris.datasource.property.storage.exception.StoragePropertiesE
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
@@ -39,8 +37,6 @@ import
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsPr
import software.amazon.awssdk.services.sts.StsClient;
import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
-import java.lang.reflect.Field;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -219,34 +215,6 @@ public class S3Properties extends
AbstractS3CompatibleProperties {
return ENDPOINT_PATTERN;
}
- private static List<Field> getIdentifyFields() {
- List<Field> fields = Lists.newArrayList();
- try {
- //todo AliyunDlfProperties should in OSS storage type.
- fields.add(S3Properties.class.getDeclaredField("s3AccessKey"));
- // fixme Add it when MS done
-
//fields.add(AliyunDLFProperties.class.getDeclaredField("dlfAccessKey"));
-
//fields.add(AWSGlueProperties.class.getDeclaredField("glueAccessKey"));
- return fields;
- } catch (NoSuchFieldException e) {
- // should not happen
- throw new RuntimeException("Failed to get field: " +
e.getMessage(), e);
- }
- }
-
- /*
- public void toPaimonOSSFileIOProperties(Options options) {
- options.set("fs.oss.endpoint", s3Endpoint);
- options.set("fs.oss.accessKeyId", s3AccessKey);
- options.set("fs.oss.accessKeySecret", s3SecretKey);
- }
-
- public void toPaimonS3FileIOProperties(Options options) {
- options.set("s3.endpoint", s3Endpoint);
- options.set("s3.access-key", s3AccessKey);
- options.set("s3.secret-key", s3SecretKey);
- }*/
-
@Override
public Map<String, String> getBackendConfigProperties() {
Map<String, String> backendProperties =
generateBackendS3Configuration(s3ConnectionMaximum,
@@ -297,18 +265,17 @@ public class S3Properties extends
AbstractS3CompatibleProperties {
InstanceProfileCredentialsProvider.create());
}
-
@Override
public void initializeHadoopStorageConfig() {
- hadoopStorageConfig = new Configuration();
- hadoopStorageConfig.set("fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
- hadoopStorageConfig.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
- hadoopStorageConfig.set("fs.s3a.endpoint", endpoint);
- hadoopStorageConfig.set("fs.s3a.access.key", accessKey);
- hadoopStorageConfig.set("fs.s3a.secret.key", secretKey);
- hadoopStorageConfig.set("fs.s3a.connection.maximum",
s3ConnectionMaximum);
- hadoopStorageConfig.set("fs.s3a.connection.request.timeout",
s3ConnectionRequestTimeoutS);
- hadoopStorageConfig.set("fs.s3a.connection.timeout",
s3ConnectionTimeoutS);
- hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
+ super.initializeHadoopStorageConfig();
+ //Set assumed_roles
+ //@See
https://hadoop.apache.org/docs/r3.4.1/hadoop-aws/tools/hadoop-aws/assumed_roles.html
+ if (StringUtils.isNotBlank(s3ExternalId) &&
StringUtils.isNotBlank(s3IAMRole)) {
+ //@See org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider
+ hadoopStorageConfig.set("fs.s3a.assumed.role.external.id",
s3ExternalId);
+ hadoopStorageConfig.set("fs.s3a.assumed.role.arn", s3IAMRole);
+ hadoopStorageConfig.set("fs.s3a.aws.credentials.provider",
+
"org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
index 8835d64a47c..71a871200bb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -187,7 +187,7 @@ public class LogicalHudiScan extends LogicalFileScan {
Optional<IncrementalRelation> newIncrementalRelation =
Optional.empty();
if (optScanParams.isPresent() &&
optScanParams.get().incrementalRead()) {
TableScanParams scanParams = optScanParams.get();
- Map<String, String> optParams = table.getHadoopProperties();
+ Map<String, String> optParams =
table.getBackendStorageProperties();
if (scanParams.getMapParams().containsKey("beginTime")) {
optParams.put("hoodie.datasource.read.begin.instanttime",
scanParams.getMapParams().get("beginTime"));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index 0eaeed81d4c..d9f75763510 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -162,7 +162,7 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
}
- tSink.setHadoopConfig(targetTable.getHadoopProperties());
+ tSink.setHadoopConfig(targetTable.getBackendStorageProperties());
tDataSink = new TDataSink(getDataSinkType());
tDataSink.setHiveTableSink(tSink);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]