This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 1be903b0bdd branch-3.1: [Feat](params-refactor): Refactor Paimon 
Metastore parameter integration (#54114) (#54262)
1be903b0bdd is described below

commit 1be903b0bddcd57c777e490150eebbd26dfde09e
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Aug 4 14:14:39 2025 +0800

    branch-3.1: [Feat](params-refactor): Refactor Paimon Metastore parameter 
integration (#54114) (#54262)
    
    #54114
---
 .../java/org/apache/doris/catalog/Resource.java    |  16 +-
 .../org/apache/doris/datasource/CatalogIf.java     |   7 +-
 .../org/apache/doris/datasource/CatalogLog.java    |   1 +
 .../org/apache/doris/datasource/CatalogMgr.java    |  17 --
 .../apache/doris/datasource/CatalogProperty.java   | 173 +++++++++++--------
 .../apache/doris/datasource/ExternalCatalog.java   |   5 -
 .../doris/datasource/hive/HMSExternalTable.java    |  10 +-
 .../doris/datasource/hive/source/HiveScanNode.java |   4 +-
 .../doris/datasource/hudi/source/HudiScanNode.java |   4 +-
 .../datasource/iceberg/IcebergExternalCatalog.java |   9 -
 .../doris/datasource/jdbc/JdbcExternalCatalog.java |   2 +-
 .../paimon/PaimonDLFExternalCatalog.java           |  26 ---
 .../datasource/paimon/PaimonExternalCatalog.java   | 154 ++++++-----------
 .../paimon/PaimonExternalCatalogFactory.java       |   6 +-
 .../paimon/PaimonFileExternalCatalog.java          |  65 -------
 .../paimon/PaimonHMSExternalCatalog.java           |  36 ----
 .../datasource/paimon/source/PaimonScanNode.java   |  16 +-
 .../metastore/AbstractPaimonProperties.java        | 188 +++++++++++++++++++++
 .../property/metastore/MetastoreProperties.java    |   2 +
 .../PaimonAliyunDLFMetaStoreProperties.java        | 123 ++++++++++++++
 .../PaimonFileSystemMetaStoreProperties.java       |  76 +++++++++
 .../metastore/PaimonHMSMetaStoreProperties.java    | 117 +++++++++++++
 .../metastore/PaimonPropertiesFactory.java         |  37 ++++
 .../storage/AbstractS3CompatibleProperties.java    |  20 ++-
 .../datasource/property/storage/COSProperties.java |  16 +-
 .../property/storage/MinioProperties.java          |  15 --
 .../datasource/property/storage/OBSProperties.java |  11 +-
 .../datasource/property/storage/OSSProperties.java |  13 +-
 .../datasource/property/storage/S3Properties.java  |  53 ++----
 .../trees/plans/logical/LogicalHudiScan.java       |   2 +-
 .../org/apache/doris/planner/HiveTableSink.java    |   2 +-
 31 files changed, 751 insertions(+), 475 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
index a56c15a8f1e..31219e2a1df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
@@ -25,7 +25,6 @@ import org.apache.doris.common.io.DeepCopy;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.proc.BaseProcResult;
-import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 
@@ -286,20 +285,7 @@ public abstract class Resource implements Writable, 
GsonPostProcessable {
     private void notifyUpdate(Map<String, String> properties) {
         
references.entrySet().stream().collect(Collectors.groupingBy(Entry::getValue)).forEach((type,
 refs) -> {
             if (type == ReferenceType.CATALOG) {
-                for (Map.Entry<String, ReferenceType> ref : refs) {
-                    String catalogName = 
ref.getKey().split(REFERENCE_SPLIT)[0];
-                    CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
-                    if (catalog == null) {
-                        LOG.warn("Can't find the reference catalog {} for 
resource {}", catalogName, name);
-                        continue;
-                    }
-                    if (!name.equals(catalog.getResource())) {
-                        LOG.warn("Failed to update catalog {} for different 
resource "
-                                + "names(resource={}, catalog.resource={})", 
catalogName, name, catalog.getResource());
-                        continue;
-                    }
-                    catalog.notifyPropertiesUpdated(properties);
-                }
+                // No longer support resource in Catalog.
             }
         });
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
index 50250b62e16..646510e01d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
@@ -35,7 +35,6 @@ import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo
 import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo;
 import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -88,10 +87,6 @@ public interface CatalogIf<T extends DatabaseIf> {
 
     Map<String, String> getProperties();
 
-    default String getResource() {
-        return null;
-    }
-
     default void notifyPropertiesUpdated(Map<String, String> updatedProps) {
         if (this instanceof ExternalCatalog) {
             ((ExternalCatalog) this).resetToUninitialized(false);
@@ -174,7 +169,7 @@ public interface CatalogIf<T extends DatabaseIf> {
         CatalogLog log = new CatalogLog();
         log.setCatalogId(getId());
         log.setCatalogName(getName());
-        log.setResource(Strings.nullToEmpty(getResource()));
+        log.setResource("");
         log.setComment(getComment());
         log.setProps(getProperties());
         return log;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
index d862a3ef44c..08a08d49d84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
@@ -56,6 +56,7 @@ public class CatalogLog implements Writable {
     @SerializedName(value = "invalidCache")
     private boolean invalidCache;
 
+    @Deprecated
     @SerializedName(value = "resource")
     private String resource;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index b3e73f68d08..7ee865576b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -28,8 +28,6 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EnvFactory;
-import org.apache.doris.catalog.Resource;
-import org.apache.doris.catalog.Resource.ReferenceType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
@@ -126,12 +124,6 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
             ((ExternalCatalog) catalog).resetToUninitialized(false);
         }
-        if (!Strings.isNullOrEmpty(catalog.getResource())) {
-            Resource resource = 
Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource());
-            if (resource != null) {
-                resource.addReference(catalog.getName(), 
ReferenceType.CATALOG);
-            }
-        }
     }
 
     private CatalogIf removeCatalog(long catalogId) {
@@ -144,12 +136,6 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
                 ConnectContext.get().removeLastDBOfCatalog(catalog.getName());
             }
             
Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId());
-            if (!Strings.isNullOrEmpty(catalog.getResource())) {
-                Resource catalogResource = 
Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource());
-                if (catalogResource != null) {
-                    catalogResource.removeReference(catalog.getName(), 
ReferenceType.CATALOG);
-                }
-            }
             Env.getCurrentEnv().getQueryStats().clear(catalog.getId());
         }
         return catalog;
@@ -417,9 +403,6 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
                     
ErrorReport.reportAnalysisException(ErrorCode.ERR_CATALOG_ACCESS_DENIED,
                             ConnectContext.get().getQualifiedUser(), 
catalog.getName());
                 }
-                if (!Strings.isNullOrEmpty(catalog.getResource())) {
-                    rows.add(Arrays.asList("resource", catalog.getResource()));
-                }
                 Map<String, String> sortedMap = 
getCatalogPropertiesWithPrintable(catalog);
                 sortedMap.forEach((k, v) -> rows.add(Arrays.asList(k, v)));
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
index cbb5543f4b1..ffba2489ae3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.datasource;
 
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.Resource;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
@@ -27,7 +25,6 @@ import 
org.apache.doris.datasource.property.metastore.MetastoreProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.persist.gson.GsonUtils;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 import org.apache.commons.collections.MapUtils;
@@ -41,6 +38,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * CatalogProperty to store the properties for catalog.
@@ -49,116 +47,103 @@ import java.util.function.Function;
 public class CatalogProperty implements Writable {
     private static final Logger LOG = 
LogManager.getLogger(CatalogProperty.class);
 
+    @Deprecated
     @SerializedName(value = "resource")
     private String resource;
+
     @SerializedName(value = "properties")
     private Map<String, String> properties;
 
+    // Lazy-loaded storage properties map, using volatile to ensure visibility
     private volatile Map<StorageProperties.Type, StorageProperties> 
storagePropertiesMap;
 
-    private MetastoreProperties metastoreProperties;
+    // Lazy-loaded metastore properties, using volatile to ensure visibility
+    private volatile MetastoreProperties metastoreProperties;
+
+    // Lazy-loaded backend storage properties, using volatile to ensure 
visibility
+    private volatile Map<String, String> backendStorageProperties;
 
-    private volatile Resource catalogResource = null;
+    // Lazy-loaded Hadoop properties, using volatile to ensure visibility
+    private volatile Map<String, String> hadoopProperties;
 
     public CatalogProperty(String resource, Map<String, String> properties) {
-        this.resource = Strings.nullToEmpty(resource);
+        this.resource = resource; // Keep but not used
         this.properties = properties;
         if (this.properties == null) {
             this.properties = Maps.newConcurrentMap();
         }
     }
 
-    private Resource catalogResource() {
-        if (!Strings.isNullOrEmpty(resource) && catalogResource == null) {
-            synchronized (this) {
-                if (catalogResource == null) {
-                    catalogResource = 
Env.getCurrentEnv().getResourceMgr().getResource(resource);
-                }
-            }
-        }
-        return catalogResource;
-    }
-
     public String getOrDefault(String key, String defaultVal) {
-        String val = properties.get(key);
-        if (val == null) {
-            Resource res = catalogResource();
-            if (res != null) {
-                val = res.getCopiedProperties().getOrDefault(key, defaultVal);
-            } else {
-                val = defaultVal;
-            }
-        }
-        return val;
+        return properties.getOrDefault(key, defaultVal);
     }
 
     public Map<String, String> getProperties() {
-        Map<String, String> mergedProperties = Maps.newHashMap();
-        if (!Strings.isNullOrEmpty(resource)) {
-            Resource res = catalogResource();
-            if (res != null) {
-                mergedProperties = res.getCopiedProperties();
-            }
-        }
-        mergedProperties.putAll(properties);
-        return mergedProperties;
-    }
-
-    public String getResource() {
-        return resource;
+        return Maps.newHashMap(properties);
     }
 
     public void modifyCatalogProps(Map<String, String> props) {
-        properties.putAll(PropertyConverter.convertToMetaProperties(props));
-        this.storagePropertiesMap = null;
-    }
-
-    private void reInitCatalogStorageProperties() {
-        List<StorageProperties> storageProperties;
-        try {
-            storageProperties = StorageProperties.createAll(getProperties());
-            this.storagePropertiesMap = (storageProperties.stream()
-                    
.collect(java.util.stream.Collectors.toMap(StorageProperties::getType, 
Function.identity())));
-        } catch (UserException e) {
-            throw new RuntimeException(e);
+        synchronized (this) {
+            
properties.putAll(PropertyConverter.convertToMetaProperties(props));
+            resetAllCaches();
         }
-
     }
 
     public void rollBackCatalogProps(Map<String, String> props) {
-        properties.clear();
-        properties = new HashMap<>(props);
-        this.storagePropertiesMap = null;
-    }
-
-
-    public Map<String, String> getHadoopProperties() {
-        Map<String, String> hadoopProperties = getProperties();
-        
hadoopProperties.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties()));
-        return hadoopProperties;
+        synchronized (this) {
+            properties = new HashMap<>(props);
+            resetAllCaches();
+        }
     }
 
     public void addProperty(String key, String val) {
-        this.properties.put(key, val);
-        this.storagePropertiesMap = null; // reset storage properties map
+        synchronized (this) {
+            this.properties.put(key, val);
+            resetAllCaches();
+        }
     }
 
     public void deleteProperty(String key) {
-        this.properties.remove(key);
+        synchronized (this) {
+            this.properties.remove(key);
+            resetAllCaches();
+        }
+    }
+
+    /**
+     * Unified cache reset method to ensure all caches are properly cleared
+     */
+    private void resetAllCaches() {
         this.storagePropertiesMap = null;
+        this.metastoreProperties = null;
+        this.backendStorageProperties = null;
+        this.hadoopProperties = null;
     }
 
+    /**
+     * Get storage properties map with lazy loading, using double-check 
locking to ensure thread safety
+     */
     public Map<StorageProperties.Type, StorageProperties> 
getStoragePropertiesMap() {
         if (storagePropertiesMap == null) {
             synchronized (this) {
                 if (storagePropertiesMap == null) {
-                    reInitCatalogStorageProperties();
+                    try {
+                        List<StorageProperties> storageProperties = 
StorageProperties.createAll(getProperties());
+                        this.storagePropertiesMap = storageProperties.stream()
+                                
.collect(Collectors.toMap(StorageProperties::getType, Function.identity()));
+                    } catch (UserException e) {
+                        LOG.warn("Failed to initialize catalog storage 
properties", e);
+                        throw new RuntimeException("Failed to initialize 
storage properties for catalog", e);
+                    }
                 }
             }
         }
         return storagePropertiesMap;
     }
 
+    /**
+     * Get metastore properties with lazy loading, using double-check locking 
to ensure thread safety
+     */
     @Override
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, GsonUtils.GSON.toJson(this));
@@ -173,13 +158,59 @@ public class CatalogProperty implements Writable {
         if (MapUtils.isEmpty(getProperties())) {
             return null;
         }
+
         if (metastoreProperties == null) {
-            try {
-                metastoreProperties = 
MetastoreProperties.create(getProperties());
-            } catch (UserException e) {
-                throw new RuntimeException(e);
+            synchronized (this) {
+                if (metastoreProperties == null) {
+                    try {
+                        metastoreProperties = 
MetastoreProperties.create(getProperties());
+                    } catch (UserException e) {
+                        LOG.warn("Failed to create metastore properties", e);
+                        throw new RuntimeException("Failed to create metastore 
properties", e);
+                    }
+                }
             }
         }
         return metastoreProperties;
     }
+
+    /**
+     * Get backend storage properties with lazy loading, using double-check 
locking to ensure thread safety
+     */
+    public Map<String, String> getBackendStorageProperties() {
+        if (backendStorageProperties == null) {
+            synchronized (this) {
+                if (backendStorageProperties == null) {
+                    Map<String, String> result = new HashMap<>();
+                    Map<StorageProperties.Type, StorageProperties> storageMap 
= getStoragePropertiesMap();
+
+                    for (StorageProperties sp : storageMap.values()) {
+                        Map<String, String> backendProps = 
sp.getBackendConfigProperties();
+                        if (backendProps != null) {
+                            result.putAll(backendProps);
+                        }
+                    }
+
+                    this.backendStorageProperties = result;
+                }
+            }
+        }
+        return backendStorageProperties;
+    }
+
+    /**
+     * Get Hadoop properties with lazy loading, using double-check locking to 
ensure thread safety
+     */
+    public Map<String, String> getHadoopProperties() {
+        if (hadoopProperties == null) {
+            synchronized (this) {
+                if (hadoopProperties == null) {
+                    Map<String, String> result = getProperties();
+                    
result.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties()));
+                    this.hadoopProperties = result;
+                }
+            }
+        }
+        return hadoopProperties;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index e7e4eb67504..d5312fe5e3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -680,11 +680,6 @@ public abstract class ExternalCatalog
                 + ", table id: " + tableId);
     }
 
-    @Override
-    public String getResource() {
-        return catalogProperty.getResource();
-    }
-
     @Nullable
     @Override
     public ExternalDatabase<? extends ExternalTable> getDbNullable(String 
dbName) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index fabd7448555..c0aeb6da888 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -575,14 +575,8 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         return catalog.getCatalogProperty().getStoragePropertiesMap();
     }
 
-    public Map<String, String> getHadoopProperties() {
-        return getStoragePropertiesMap().values().stream()
-                .flatMap(m -> 
m.getBackendConfigProperties().entrySet().stream())
-                .collect(Collectors.toMap(
-                        Map.Entry::getKey,
-                        Map.Entry::getValue,
-                        (v1, v2) -> v2));
-
+    public Map<String, String> getBackendStorageProperties() {
+        return catalog.getCatalogProperty().getBackendStorageProperties();
     }
 
     public List<ColumnStatisticsObj> getHiveTableColumnStats(List<String> 
columns) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 552bbd9156b..1b259dfb283 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -478,8 +478,8 @@ public class HiveScanNode extends FileQueryScanNode {
     }
 
     @Override
-    protected Map<String, String> getLocationProperties() throws UserException 
 {
-        return hmsTable.getHadoopProperties();
+    protected Map<String, String> getLocationProperties() {
+        return hmsTable.getBackendStorageProperties();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 23df0415762..948e1f8c5e2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -230,12 +230,12 @@ public class HudiScanNode extends HiveScanNode {
     }
 
     @Override
-    protected Map<String, String> getLocationProperties() throws UserException 
{
+    protected Map<String, String> getLocationProperties() {
         if (incrementalRead) {
             return incrementalRelation.getHoodieParams();
         } else {
             // HudiJniScanner uses hadoop client to read data.
-            return hmsTable.getHadoopProperties();
+            return hmsTable.getBackendStorageProperties();
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 2d87889d50e..b1178363b9b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -23,18 +23,14 @@ import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.SessionContext;
 import org.apache.doris.datasource.operations.ExternalMetadataOperations;
-import org.apache.doris.datasource.property.PropertyConverter;
 import 
org.apache.doris.datasource.property.metastore.AbstractIcebergProperties;
 import org.apache.doris.datasource.property.metastore.MetastoreProperties;
 import org.apache.doris.transaction.TransactionManagerFactory;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.iceberg.catalog.Catalog;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 public abstract class IcebergExternalCatalog extends ExternalCatalog {
 
@@ -142,11 +138,6 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
         }
     }
 
-    protected void initS3Param(Configuration conf) {
-        Map<String, String> properties = catalogProperty.getHadoopProperties();
-        conf.set(Constants.AWS_CREDENTIALS_PROVIDER, 
PropertyConverter.getAWSCredentialsProviders(properties));
-    }
-
     @Override
     public boolean viewExists(String dbName, String viewName) {
         return metadataOps.viewExists(dbName, viewName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index 85e3b29e1a3..e8e92e60ac1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -361,7 +361,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
         jdbcTable.setDriverClass(this.getDriverClass());
         jdbcTable.setDriverUrl(this.getDriverUrl());
         jdbcTable.setCheckSum(this.getCheckSum());
-        jdbcTable.setResourceName(this.getResource());
+        jdbcTable.setResourceName("");
         jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
         jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
         
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
index 00363c1f799..fba59545922 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonDLFExternalCatalog.java
@@ -17,38 +17,12 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.util.Map;
 
 public class PaimonDLFExternalCatalog extends PaimonExternalCatalog {
-    private static final Logger LOG = 
LogManager.getLogger(PaimonDLFExternalCatalog.class);
 
     public PaimonDLFExternalCatalog(long catalogId, String name, String 
resource,
                                     Map<String, String> props, String comment) 
{
         super(catalogId, name, resource, props, comment);
     }
-
-    @Override
-    protected void initLocalObjectsImpl() {
-        super.initLocalObjectsImpl();
-        catalogType = PAIMON_DLF;
-        catalog = createCatalog();
-    }
-
-    @Override
-    protected void setPaimonCatalogOptions(Map<String, String> properties, 
Map<String, String> options) {
-        options.put(PaimonProperties.PAIMON_CATALOG_TYPE, 
PaimonProperties.PAIMON_HMS_CATALOG);
-        options.put(PaimonProperties.PAIMON_METASTORE_CLIENT, 
ProxyMetaStoreClient.class.getName());
-        options.put(PaimonProperties.PAIMON_OSS_ENDPOINT,
-                properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT));
-        options.put(PaimonProperties.PAIMON_OSS_ACCESS_KEY,
-                properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
-        options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
-                properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index 93252d61d58..b8decdc032d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -18,38 +18,28 @@
 package org.apache.doris.datasource.paimon;
 
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-import 
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.NameMapping;
 import org.apache.doris.datasource.SessionContext;
-import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.datasource.property.constants.HMSProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties;
+import org.apache.doris.datasource.property.metastore.MetastoreProperties;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Catalog.TableNotExistException;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public abstract class PaimonExternalCatalog extends ExternalCatalog {
+public class PaimonExternalCatalog extends ExternalCatalog {
     private static final Logger LOG = 
LogManager.getLogger(PaimonExternalCatalog.class);
     public static final String PAIMON_CATALOG_TYPE = "paimon.catalog.type";
     public static final String PAIMON_FILESYSTEM = "filesystem";
@@ -57,35 +47,32 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
     public static final String PAIMON_DLF = "dlf";
     protected String catalogType;
     protected Catalog catalog;
-    protected AuthenticationConfig authConf;
-    protected HadoopAuthenticator hadoopAuthenticator;
 
-    private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
-            PaimonProperties.WAREHOUSE
-    );
+    private AbstractPaimonProperties paimonProperties;
 
-    public PaimonExternalCatalog(long catalogId, String name, String resource,
-                                 Map<String, String> props, String comment) {
+    public PaimonExternalCatalog(long catalogId, String name, String resource, 
Map<String, String> props,
+                                 String comment) {
         super(catalogId, name, InitCatalogLog.Type.PAIMON, comment);
-        props = PropertyConverter.convertToMetaProperties(props);
         catalogProperty = new CatalogProperty(resource, props);
     }
 
     @Override
     protected void initLocalObjectsImpl() {
-        Configuration conf = 
DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth());
-        for (Map.Entry<String, String> propEntry : 
this.catalogProperty.getHadoopProperties().entrySet()) {
-            conf.set(propEntry.getKey(), propEntry.getValue());
+        try {
+            paimonProperties = (AbstractPaimonProperties) 
MetastoreProperties.create(catalogProperty.getProperties());
+        } catch (UserException e) {
+            throw new IllegalArgumentException("Failed to create Paimon 
properties from catalog properties,exception: "
+                    + ExceptionUtils.getRootCauseMessage(e), e);
         }
-        authConf = AuthenticationConfig.getKerberosConfig(conf);
-        hadoopAuthenticator = 
HadoopAuthenticator.getHadoopAuthenticator(authConf);
+        catalogType = paimonProperties.getPaimonCatalogType();
+        catalog = createCatalog();
         initPreExecutionAuthenticator();
     }
 
     @Override
     protected synchronized void initPreExecutionAuthenticator() {
         if (executionAuthenticator == null) {
-            executionAuthenticator = new 
HadoopExecutionAuthenticator(hadoopAuthenticator);
+            executionAuthenticator = 
paimonProperties.getExecutionAuthenticator();
         }
     }
 
@@ -96,8 +83,8 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
 
     protected List<String> listDatabaseNames() {
         try {
-            return hadoopAuthenticator.doAs(() -> new 
ArrayList<>(catalog.listDatabases()));
-        } catch (IOException e) {
+            return executionAuthenticator.execute(() -> new 
ArrayList<>(catalog.listDatabases()));
+        } catch (Exception e) {
             throw new RuntimeException("Failed to list databases names, 
catalog name: " + getName(), e);
         }
     }
@@ -106,7 +93,7 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
     public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
         makeSureInitialized();
         try {
-            return hadoopAuthenticator.doAs(() -> {
+            return executionAuthenticator.execute(() -> {
                 try {
                     catalog.getTable(Identifier.create(dbName, tblName));
                     return true;
@@ -115,8 +102,9 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
                 }
             });
 
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to check table existence, 
catalog name: " + getName(), e);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to check table existence, 
catalog name: " + getName()
+                    + "error message is:" + 
ExceptionUtils.getRootCauseMessage(e), e);
         }
     }
 
@@ -124,7 +112,7 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
     public List<String> listTableNames(SessionContext ctx, String dbName) {
         makeSureInitialized();
         try {
-            return hadoopAuthenticator.doAs(() -> {
+            return executionAuthenticator.execute(() -> {
                 List<String> tableNames = null;
                 try {
                     tableNames = catalog.listTables(dbName);
@@ -133,7 +121,7 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
                 }
                 return tableNames;
             });
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new RuntimeException("Failed to list table names, catalog 
name: " + getName(), e);
         }
     }
@@ -141,32 +129,31 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
     public org.apache.paimon.table.Table getPaimonTable(NameMapping 
nameMapping) {
         makeSureInitialized();
         try {
-            return hadoopAuthenticator.doAs(() -> catalog.getTable(
-                    Identifier.create(nameMapping.getRemoteDbName(), 
nameMapping.getRemoteTblName())));
+            return executionAuthenticator.execute(() -> 
catalog.getTable(Identifier.create(nameMapping
+                    .getRemoteDbName(), nameMapping.getRemoteTblName())));
         } catch (Exception e) {
-            throw new RuntimeException("Failed to get Paimon table:" + 
getName() + "."
-                    + nameMapping.getLocalDbName() + "." + 
nameMapping.getLocalTblName() + ", because "
-                    + e.getMessage(), e);
+            throw new RuntimeException("Failed to get Paimon table:" + 
getName() + "." + nameMapping.getLocalDbName()
+                    + "." + nameMapping.getLocalTblName() + ", because " + 
ExceptionUtils.getRootCauseMessage(e), e);
         }
     }
 
     public List<Partition> getPaimonPartitions(NameMapping nameMapping) {
         makeSureInitialized();
         try {
-            return hadoopAuthenticator.doAs(() -> {
+            return executionAuthenticator.execute(() -> {
                 List<Partition> partitions = new ArrayList<>();
                 try {
-                    partitions = catalog.listPartitions(
-                            Identifier.create(nameMapping.getRemoteDbName(), 
nameMapping.getRemoteTblName()));
+                    partitions = 
catalog.listPartitions(Identifier.create(nameMapping.getRemoteDbName(),
+                            nameMapping.getRemoteTblName()));
                 } catch (Catalog.TableNotExistException e) {
                     LOG.warn("TableNotExistException", e);
                 }
                 return partitions;
             });
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new RuntimeException("Failed to get Paimon table 
partitions:" + getName() + "."
-                    + nameMapping.getRemoteDbName() + "." + 
nameMapping.getRemoteTblName()
-                    + ", because " + e.getMessage(), e);
+                    + nameMapping.getRemoteDbName() + "." + 
nameMapping.getRemoteTblName() + ", because "
+                    + ExceptionUtils.getRootCauseMessage(e), e);
         }
     }
 
@@ -174,79 +161,44 @@ public abstract class PaimonExternalCatalog extends 
ExternalCatalog {
         return getPaimonSystemTable(nameMapping, null, queryType);
     }
 
-    public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping 
nameMapping, String branch,
-            String queryType) {
+    public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping 
nameMapping,
+                                                              String branch, 
String queryType) {
         makeSureInitialized();
         try {
-            return hadoopAuthenticator.doAs(() -> catalog.getTable(new 
Identifier(nameMapping.getRemoteDbName(),
+            return executionAuthenticator.execute(() -> catalog.getTable(new 
Identifier(nameMapping.getRemoteDbName(),
                     nameMapping.getRemoteTblName(), branch, queryType)));
         } catch (Exception e) {
             throw new RuntimeException("Failed to get Paimon system table:" + 
getName() + "."
                     + nameMapping.getRemoteDbName() + "." + 
nameMapping.getRemoteTblName() + "$" + queryType
-                    + ", because " + e.getMessage(), e);
+                    + ", because " + ExceptionUtils.getRootCauseMessage(e), e);
         }
     }
 
-    protected String getPaimonCatalogType(String catalogType) {
-        if (PAIMON_HMS.equalsIgnoreCase(catalogType)) {
-            return PaimonProperties.PAIMON_HMS_CATALOG;
-        } else {
-            return PaimonProperties.PAIMON_FILESYSTEM_CATALOG;
-        }
-    }
 
     protected Catalog createCatalog() {
         try {
-            return hadoopAuthenticator.doAs(() -> {
-                Options options = new Options();
-                Map<String, String> paimonOptionsMap = getPaimonOptionsMap();
-                for (Map.Entry<String, String> kv : 
paimonOptionsMap.entrySet()) {
-                    options.set(kv.getKey(), kv.getValue());
-                }
-                CatalogContext context = CatalogContext.create(options, 
getConfiguration());
-                return createCatalogImpl(context);
-            });
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to create catalog, catalog 
name: " + getName(), e);
+            return paimonProperties.initializeCatalog(getName(), new 
ArrayList<>(catalogProperty
+                    .getStoragePropertiesMap().values()));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create catalog, catalog 
name: " + getName() + ", exception: "
+                    + ExceptionUtils.getRootCauseMessage(e), e);
         }
     }
 
-    protected Catalog createCatalogImpl(CatalogContext context) {
-        return CatalogFactory.createCatalog(context);
-    }
-
     public Map<String, String> getPaimonOptionsMap() {
-        Map<String, String> properties = catalogProperty.getHadoopProperties();
-        Map<String, String> options = Maps.newHashMap();
-        options.put(PaimonProperties.WAREHOUSE, 
properties.get(PaimonProperties.WAREHOUSE));
-        setPaimonCatalogOptions(properties, options);
-        setPaimonExtraOptions(properties, options);
-        return options;
-    }
-
-    protected abstract void setPaimonCatalogOptions(Map<String, String> 
properties, Map<String, String> options);
-
-    protected void setPaimonExtraOptions(Map<String, String> properties, 
Map<String, String> options) {
-        for (Map.Entry<String, String> kv : properties.entrySet()) {
-            if (kv.getKey().startsWith(PaimonProperties.PAIMON_PREFIX)) {
-                
options.put(kv.getKey().substring(PaimonProperties.PAIMON_PREFIX.length()), 
kv.getValue());
-            }
-        }
-
-        // hive version.
-        // This property is used for both FE and BE, so it has no "paimon." 
prefix.
-        // We need to handle it separately.
-        if (properties.containsKey(HMSProperties.HIVE_VERSION)) {
-            options.put(HMSProperties.HIVE_VERSION, 
properties.get(HMSProperties.HIVE_VERSION));
-        }
+        makeSureInitialized();
+        return paimonProperties.getCatalogOptionsMap();
     }
 
     @Override
     public void checkProperties() throws DdlException {
-        super.checkProperties();
-        for (String requiredProperty : REQUIRED_PROPERTIES) {
-            if 
(!catalogProperty.getProperties().containsKey(requiredProperty)) {
-                throw new DdlException("Required property '" + 
requiredProperty + "' is missing");
+        if (null != paimonProperties) {
+            try {
+                this.paimonProperties = (AbstractPaimonProperties) 
MetastoreProperties
+                        .create(catalogProperty.getProperties());
+            } catch (UserException e) {
+                throw new DdlException("Failed to create Paimon properties 
from catalog properties, exception: "
+                        + ExceptionUtils.getRootCauseMessage(e), e);
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
index 53e790d8c9e..b904ac169ff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
@@ -19,7 +19,6 @@ package org.apache.doris.datasource.paimon;
 
 import org.apache.doris.common.DdlException;
 import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.property.constants.HMSProperties;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -36,12 +35,9 @@ public class PaimonExternalCatalogFactory {
         metastoreType = metastoreType.toLowerCase();
         switch (metastoreType) {
             case PaimonExternalCatalog.PAIMON_HMS:
-                return new PaimonHMSExternalCatalog(catalogId, name, resource, 
props, comment);
             case PaimonExternalCatalog.PAIMON_FILESYSTEM:
-                return new PaimonFileExternalCatalog(catalogId, name, 
resource, props, comment);
             case PaimonExternalCatalog.PAIMON_DLF:
-                props.put(HMSProperties.HIVE_METASTORE_TYPE, 
HMSProperties.DLF_TYPE);
-                return new PaimonDLFExternalCatalog(catalogId, name, resource, 
props, comment);
+                return new PaimonExternalCatalog(catalogId, name, resource, 
props, comment);
             default:
                 throw new DdlException("Unknown " + 
PaimonExternalCatalog.PAIMON_CATALOG_TYPE
                         + " value: " + metastoreType);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
index e74f3deeaf5..c2c3e139567 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonFileExternalCatalog.java
@@ -17,77 +17,12 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.datasource.property.PropertyConverter;
-import org.apache.doris.datasource.property.constants.CosProperties;
-import org.apache.doris.datasource.property.constants.ObsProperties;
-import org.apache.doris.datasource.property.constants.OssProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.util.Map;
 
-
 public class PaimonFileExternalCatalog extends PaimonExternalCatalog {
-    private static final Logger LOG = 
LogManager.getLogger(PaimonFileExternalCatalog.class);
 
     public PaimonFileExternalCatalog(long catalogId, String name, String 
resource,
             Map<String, String> props, String comment) {
         super(catalogId, name, resource, props, comment);
     }
-
-    @Override
-    protected void initLocalObjectsImpl() {
-        super.initLocalObjectsImpl();
-        catalogType = PAIMON_FILESYSTEM;
-        catalog = createCatalog();
-    }
-
-    @Override
-    protected void setPaimonCatalogOptions(Map<String, String> properties, 
Map<String, String> options) {
-        if (properties.containsKey(PaimonProperties.PAIMON_S3_ENDPOINT)) {
-            options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
-                    properties.get(PaimonProperties.PAIMON_S3_ENDPOINT));
-            options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
-                    properties.get(PaimonProperties.PAIMON_S3_ACCESS_KEY));
-            options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
-                    properties.get(PaimonProperties.PAIMON_S3_SECRET_KEY));
-        } else if 
(properties.containsKey(PaimonProperties.PAIMON_OSS_ENDPOINT)) {
-            boolean hdfsEnabled = Boolean.parseBoolean(properties.getOrDefault(
-                    OssProperties.OSS_HDFS_ENABLED, "false"));
-            if 
(!LocationPath.isHdfsOnOssEndpoint(properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT))
-                    && !hdfsEnabled) {
-                options.put(PaimonProperties.PAIMON_OSS_ENDPOINT,
-                        properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT));
-                options.put(PaimonProperties.PAIMON_OSS_ACCESS_KEY,
-                        
properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
-                options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
-                        
properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
-            }
-        } else if (properties.containsKey(CosProperties.ENDPOINT)) {
-            options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
-                    properties.get(CosProperties.ENDPOINT));
-            options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
-                    properties.get(CosProperties.ACCESS_KEY));
-            options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
-                    properties.get(CosProperties.SECRET_KEY));
-            options.put(PaimonProperties.WAREHOUSE,
-                    options.get(PaimonProperties.WAREHOUSE).replace("cosn://", 
"s3://"));
-        } else if (properties.containsKey(ObsProperties.ENDPOINT)) {
-            options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
-                    properties.get(ObsProperties.ENDPOINT));
-            options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
-                    properties.get(ObsProperties.ACCESS_KEY));
-            options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
-                    properties.get(ObsProperties.SECRET_KEY));
-            options.put(PaimonProperties.WAREHOUSE,
-                    options.get(PaimonProperties.WAREHOUSE).replace("obs://", 
"s3://"));
-        }
-
-        if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
-            options.put(PaimonProperties.S3_PATH_STYLE, 
properties.get(PropertyConverter.USE_PATH_STYLE));
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
index fb27bb56696..ae5cf7073dd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java
@@ -17,48 +17,12 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.common.DdlException;
-import org.apache.doris.datasource.property.constants.HMSProperties;
-import org.apache.doris.datasource.property.constants.PaimonProperties;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
 import java.util.Map;
 
 public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {
-    private static final Logger LOG = 
LogManager.getLogger(PaimonHMSExternalCatalog.class);
-    private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
-            HMSProperties.HIVE_METASTORE_URIS
-    );
 
     public PaimonHMSExternalCatalog(long catalogId, String name, String 
resource,
             Map<String, String> props, String comment) {
         super(catalogId, name, resource, props, comment);
     }
-
-    @Override
-    protected void initLocalObjectsImpl() {
-        super.initLocalObjectsImpl();
-        catalogType = PAIMON_HMS;
-        catalog = createCatalog();
-    }
-
-    @Override
-    protected void setPaimonCatalogOptions(Map<String, String> properties, 
Map<String, String> options) {
-        options.put(PaimonProperties.PAIMON_CATALOG_TYPE, 
getPaimonCatalogType(catalogType));
-        options.put(PaimonProperties.HIVE_METASTORE_URIS, 
properties.get(HMSProperties.HIVE_METASTORE_URIS));
-    }
-
-    @Override
-    public void checkProperties() throws DdlException {
-        super.checkProperties();
-        for (String requiredProperty : REQUIRED_PROPERTIES) {
-            if 
(!catalogProperty.getProperties().containsKey(requiredProperty)) {
-                throw new DdlException("Required property '" + 
requiredProperty + "' is missing");
-            }
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 08227a1fc73..0db54afc1a9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -139,9 +139,9 @@ public class PaimonScanNode extends FileQueryScanNode {
     protected ConcurrentHashMap<Long, Boolean> currentQuerySchema = new 
ConcurrentHashMap<>();
 
     public PaimonScanNode(PlanNodeId id,
-            TupleDescriptor desc,
-            boolean needCheckColumnPriv,
-            SessionVariable sv) {
+                          TupleDescriptor desc,
+                          boolean needCheckColumnPriv,
+                          SessionVariable sv) {
         super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, 
needCheckColumnPriv, sv);
     }
 
@@ -420,14 +420,8 @@ public class PaimonScanNode extends FileQueryScanNode {
     }
 
     @Override
-    public Map<String, String> getLocationProperties() throws 
MetaNotFoundException, DdlException {
-        HashMap<String, String> map = new 
HashMap<>(source.getCatalog().getProperties());
-        
source.getCatalog().getCatalogProperty().getHadoopProperties().forEach((k, v) 
-> {
-            if (!map.containsKey(k)) {
-                map.put(k, v);
-            }
-        });
-        return map;
+    protected Map<String, String> getLocationProperties() {
+        return 
source.getCatalog().getCatalogProperty().getBackendStorageProperties();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
new file mode 100644
index 00000000000..b5e12a4970c
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.S3Properties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractPaimonProperties extends MetastoreProperties {
+    @ConnectorProperty(
+            names = {"warehouse"},
+            description = "The location of the Paimon warehouse. This is where 
the tables will be stored."
+    )
+    protected String warehouse;
+
+    @Getter
+    protected ExecutionAuthenticator executionAuthenticator = new 
ExecutionAuthenticator() {
+    };
+
+    @Getter
+    protected Options catalogOptions;
+
+    private final AtomicReference<Map<String, String>> catalogOptionsMapRef = 
new AtomicReference<>();
+
+    public abstract String getPaimonCatalogType();
+
+    private static final String USER_PROPERTY_PREFIX = "paimon.";
+
+    protected AbstractPaimonProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    public abstract Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList);
+
+    /**
+     * Adapt S3 storage properties for Apache Paimon's S3 file system.
+     *
+     * <p>Paimon's S3 file system does not follow the standard 
Hadoop-compatible
+     * configuration keys (like fs.s3a.access.key). Instead, it expects 
specific
+     * keys such as "s3.access.key", "s3.secret.key", etc.
+     *
+     * <p>Therefore, we explicitly map our internal S3 configuration (usually 
designed
+     * for HDFS-compatible systems) to Paimon's expected format.
+     *
+     * <p>See: org.apache.paimon.s3.S3Loader
+     *
+     * @param storagePropertiesList the list of configured storage backends
+     */
+    protected void appendS3PropertiesIsNeeded(List<StorageProperties> 
storagePropertiesList) {
+
+        S3Properties s3Properties = (S3Properties) 
storagePropertiesList.stream()
+                .filter(storageProperties -> storageProperties.getType() == 
StorageProperties.Type.S3)
+                .findFirst()
+                .orElse(null);
+        if (s3Properties != null) {
+            catalogOptions.set("s3.access.key", s3Properties.getSecretKey());
+            catalogOptions.set("s3.secret.key", s3Properties.getAccessKey());
+            catalogOptions.set("s3.endpoint", s3Properties.getEndpoint());
+            catalogOptions.set("s3.region", s3Properties.getRegion());
+        }
+
+    }
+
+    protected void appendCatalogOptions(List<StorageProperties> 
storagePropertiesList) {
+        if (StringUtils.isNotBlank(warehouse)) {
+            catalogOptions.set(CatalogOptions.WAREHOUSE.key(), warehouse);
+        }
+        catalogOptions.set(CatalogOptions.METASTORE.key(), getMetastoreType());
+        origProps.forEach((k, v) -> {
+            if (k.toLowerCase().startsWith(USER_PROPERTY_PREFIX)) {
+                String newKey = k.substring(USER_PROPERTY_PREFIX.length());
+                if (StringUtils.isNotBlank(newKey)) {
+                    catalogOptions.set(newKey, v);
+                }
+            }
+        });
+        appendS3PropertiesIsNeeded(storagePropertiesList);
+    }
+
+    /**
+     * Build catalog options including common and subclass-specific ones.
+     */
+    public void buildCatalogOptions(List<StorageProperties> 
storagePropertiesList) {
+        catalogOptions = new Options();
+        appendCatalogOptions(storagePropertiesList);
+        appendCustomCatalogOptions();
+    }
+
+    public Map<String, String> getCatalogOptionsMap() {
+        // Return the cached map if already initialized
+        Map<String, String> existing = catalogOptionsMapRef.get();
+        if (existing != null) {
+            return existing;
+        }
+
+        // Check that the catalog options source is available
+        if (catalogOptions == null) {
+            throw new IllegalStateException("Catalog options have not been 
initialized. Call"
+                    + " buildCatalogOptions first.");
+        }
+
+        // Construct the map manually using the provided keys
+        Map<String, String> computed = new HashMap<>();
+        for (String key : catalogOptions.keySet()) {
+            computed.put(key, catalogOptions.get(key));
+        }
+
+        // Attempt to set the constructed map atomically; only one thread wins
+        if (catalogOptionsMapRef.compareAndSet(null, computed)) {
+            return computed;
+        } else {
+            // Another thread already initialized it; return the existing one
+            return catalogOptionsMapRef.get();
+        }
+    }
+
+
+    /**
+     * Hook method for subclasses to append metastore-specific or custom 
catalog options.
+     *
+     * <p>This method is invoked after common catalog options (e.g., warehouse 
path,
+     * metastore type, user-defined keys, and S3 compatibility mappings) have 
been
+     * added to the {@link org.apache.paimon.options.Options} instance.
+     *
+     * <p>Subclasses should override this method to inject additional 
configuration
+     * required for their specific metastore or environment. For example:
+     *
+     * <ul>
+     *   <li>DLF-based catalog may require a custom metastore client 
class.</li>
+     *   <li>HMS-based catalog may include URI and client pool parameters.</li>
+     *   <li>Other environments may inject authentication, endpoint, or 
caching options.</li>
+     * </ul>
+     *
+     * <p>If the subclass does not require any special options beyond the 
common ones,
+     * it can safely leave this method empty.
+     */
+    protected abstract void appendCustomCatalogOptions();
+
+    /**
+     * Returns the metastore type identifier used by the Paimon catalog 
factory.
+     *
+     * <p>This identifier must match one of the known metastore types 
supported by
+     * Apache Paimon. Internally, the value returned here is used to configure 
the
+     * `metastore` option in {@code Options}, which determines the specific
+     * {@link org.apache.paimon.catalog.CatalogFactory} implementation to be 
used
+     * when instantiating the catalog.
+     *
+     * <p>You can find valid identifiers by reviewing implementations of the
+     * {@link org.apache.paimon.catalog.CatalogFactory} interface. Each 
implementation
+     * declares its identifier via a static {@code IDENTIFIER} field or 
equivalent constant.
+     *
+     * <p>Examples:
+     * <ul>
+     *   <li>{@code "filesystem"} - for {@link 
org.apache.paimon.catalog.FileSystemCatalogFactory}</li>
+     *   <li>{@code "hive"} - for {@link 
org.apache.paimon.hive.HiveCatalogFactory}</li>
+     * </ul>
+     *
+     * @return the metastore type identifier string
+     */
+    protected abstract String getMetastoreType();
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
index 3e5e2273b08..48cba3c0bcc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
@@ -46,6 +46,7 @@ public class MetastoreProperties extends ConnectionProperties 
{
     public enum Type {
         HMS("hms"),
         ICEBERG("iceberg"),
+        PAIMON("paimon"),
         GLUE("glue"),
         DLF("dlf"),
         DATAPROC("dataproc"),
@@ -83,6 +84,7 @@ public class MetastoreProperties extends ConnectionProperties 
{
         //subclasses should be registered here
         register(Type.HMS, new HMSPropertiesFactory());
         register(Type.ICEBERG, new IcebergPropertiesFactory());
+        register(Type.PAIMON, new PaimonPropertiesFactory());
     }
 
     public static void register(Type type, MetastorePropertiesFactory factory) 
{
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
new file mode 100644
index 00000000000..2f65dbad9d8
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.storage.OSSProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import com.aliyun.datalake.metastore.common.DataLakeConfig;
+import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.hive.HiveCatalogOptions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * PaimonAliyunDLFMetaStoreProperties
+ *
+ * <p>This class provides configuration support for using Apache Paimon with
+ * Aliyun Data Lake Formation (DLF) as the metastore. Although DLF is not an
+ * officially supported metastore type in Paimon, this implementation adapts
+ * DLF by treating it as a Hive Metastore (HMS) underneath, enabling
+ * interoperability with Paimon's HiveCatalog.
+ *
+ * <p>Key Characteristics:
+ * <ul>
+ *   <li>Internally uses HiveCatalog with custom HiveConf configured for 
Aliyun DLF.</li>
+ *   <li>Relies on {@link ProxyMetaStoreClient} to bridge DLF 
compatibility.</li>
+ *   <li>Requires Aliyun OSS as the storage backend. Other storage types are 
not
+ *       currently verified for compatibility.</li>
+ * </ul>
+ *
+ * <p>Note: This is an internal extension and not an officially supported 
Paimon
+ * metastore type. Future compatibility should be validated when upgrading 
Paimon
+ * or changing storage backends.
+ *
+ * @see org.apache.paimon.hive.HiveCatalog
+ * @see org.apache.paimon.catalog.CatalogFactory
+ * @see ProxyMetaStoreClient
+ */
+public class PaimonAliyunDLFMetaStoreProperties extends 
AbstractPaimonProperties {
+
+    private AliyunDLFBaseProperties baseProperties;
+
+    protected PaimonAliyunDLFMetaStoreProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    @Override
+    public void initNormalizeAndCheckProps() {
+        super.initNormalizeAndCheckProps();
+        baseProperties = AliyunDLFBaseProperties.of(origProps);
+    }
+
+    private HiveConf buildHiveConf() {
+        HiveConf hiveConf = new HiveConf();
+        hiveConf.set(DataLakeConfig.CATALOG_ACCESS_KEY_ID, 
baseProperties.dlfAccessKey);
+        hiveConf.set(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET, 
baseProperties.dlfSecretKey);
+        hiveConf.set(DataLakeConfig.CATALOG_ENDPOINT, 
baseProperties.dlfEndpoint);
+        hiveConf.set(DataLakeConfig.CATALOG_REGION_ID, 
baseProperties.dlfRegion);
+        hiveConf.set(DataLakeConfig.CATALOG_SECURITY_TOKEN, 
baseProperties.dlfSessionToken);
+        hiveConf.set(DataLakeConfig.CATALOG_USER_ID, baseProperties.dlfUid);
+        hiveConf.set(DataLakeConfig.CATALOG_ID, baseProperties.dlfCatalogId);
+        hiveConf.set(DataLakeConfig.CATALOG_PROXY_MODE, 
baseProperties.dlfProxyMode);
+        return hiveConf;
+    }
+
+    @Override
+    public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
+        HiveConf hiveConf = buildHiveConf();
+        buildCatalogOptions(storagePropertiesList);
+        StorageProperties ossProps = storagePropertiesList.stream()
+                .filter(sp -> sp.getType() == StorageProperties.Type.OSS)
+                .findFirst()
+                .orElseThrow(() -> new IllegalStateException("Paimon DLF 
metastore requires OSS storage properties."));
+
+        if (!(ossProps instanceof OSSProperties)) {
+            throw new IllegalStateException("Expected OSSProperties type.");
+        }
+        OSSProperties ossProperties = (OSSProperties) ossProps;
+        for (Map.Entry<String, String> entry : 
ossProperties.getHadoopStorageConfig()) {
+            catalogOptions.set(entry.getKey(), entry.getValue());
+        }
+        hiveConf.addResource(ossProperties.getHadoopStorageConfig());
+        CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
hiveConf);
+        return CatalogFactory.createCatalog(catalogContext);
+    }
+
+
+    @Override
+    protected void appendCustomCatalogOptions() {
+        catalogOptions.set("metastore.client.class", 
ProxyMetaStoreClient.class.getName());
+    }
+
+    @Override
+    protected String getMetastoreType() {
+        return HiveCatalogOptions.IDENTIFIER;
+    }
+
+    @Override
+    public String getPaimonCatalogType() {
+        return PaimonExternalCatalog.PAIMON_DLF;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
new file mode 100644
index 00000000000..adfa53eab94
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import 
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.storage.HdfsProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.FileSystemCatalogFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class PaimonFileSystemMetaStoreProperties extends 
AbstractPaimonProperties {
+    protected PaimonFileSystemMetaStoreProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    @Override
+    public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
+        buildCatalogOptions(storagePropertiesList);
+        Configuration conf = new Configuration(false);
+        storagePropertiesList.forEach(storageProperties -> {
+            for (Map.Entry<String, String> entry : 
storageProperties.getHadoopStorageConfig()) {
+                catalogOptions.set(entry.getKey(), entry.getValue());
+            }
+            conf.addResource(storageProperties.getHadoopStorageConfig());
+            if 
(storageProperties.getType().equals(StorageProperties.Type.HDFS)) {
+                this.executionAuthenticator = new 
HadoopExecutionAuthenticator(((HdfsProperties) storageProperties)
+                        .getHadoopAuthenticator());
+            }
+        });
+
+        CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
conf);
+        try {
+            return this.executionAuthenticator.execute(() -> 
CatalogFactory.createCatalog(catalogContext));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void appendCustomCatalogOptions() {
+        //nothing need to do
+    }
+
+    @Override
+    protected String getMetastoreType() {
+        return FileSystemCatalogFactory.IDENTIFIER;
+    }
+
+    @Override
+    public String getPaimonCatalogType() {
+        return PaimonExternalCatalog.PAIMON_FILESYSTEM;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
new file mode 100644
index 00000000000..7c2d2ee79a3
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import 
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.hive.HiveCatalogOptions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class PaimonHMSMetaStoreProperties extends AbstractPaimonProperties {
+
+    private HMSBaseProperties hmsBaseProperties;
+
+    private static final String CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY = 
"client-pool-cache.eviction-interval-ms";
+
+    private static final String LOCATION_IN_PROPERTIES_KEY = 
"location-in-properties";
+
+    @ConnectorProperty(
+            names = {CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY},
+            required = false,
+            description = "Setting the client's pool cache eviction 
interval(ms).")
+    private long clientPoolCacheEvictionIntervalMs = 
TimeUnit.MINUTES.toMillis(5L);
+
+    @ConnectorProperty(
+            names = {LOCATION_IN_PROPERTIES_KEY},
+            required = false,
+            description = "Setting whether to use the location in the 
properties.")
+    private boolean locationInProperties = false;
+
+
+    @Override
+    public String getPaimonCatalogType() {
+        return PaimonExternalCatalog.PAIMON_DLF;
+    }
+
+    protected PaimonHMSMetaStoreProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    @Override
+    public void initNormalizeAndCheckProps() {
+        super.initNormalizeAndCheckProps();
+        hmsBaseProperties = HMSBaseProperties.of(origProps);
+        hmsBaseProperties.initAndCheckParams();
+        this.executionAuthenticator = new 
HadoopExecutionAuthenticator(hmsBaseProperties.getHmsAuthenticator());
+    }
+
+
+    /**
+     * Builds the Hadoop Configuration by adding hive-site.xml and 
storage-specific configs.
+     */
+    private Configuration buildHiveConfiguration(List<StorageProperties> 
storagePropertiesList) {
+        Configuration conf = hmsBaseProperties.getHiveConf();
+
+        for (StorageProperties sp : storagePropertiesList) {
+            if (sp.getHadoopStorageConfig() != null) {
+                conf.addResource(sp.getHadoopStorageConfig());
+            }
+        }
+        return conf;
+    }
+
+    @Override
+    public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
+        Configuration conf = buildHiveConfiguration(storagePropertiesList);
+        buildCatalogOptions(storagePropertiesList);
+        for (Map.Entry<String, String> entry : conf) {
+            catalogOptions.set(entry.getKey(), entry.getValue());
+        }
+        CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
conf);
+        try {
+            return executionAuthenticator.execute(() -> 
CatalogFactory.createCatalog(catalogContext));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create Paimon catalog with 
HMS metastore", e);
+        }
+
+    }
+
+    @Override
+    protected String getMetastoreType() {
+        //See org.apache.paimon.hive.HiveCatalogFactory
+        return HiveCatalogOptions.IDENTIFIER;
+    }
+
+    @Override
+    protected void appendCustomCatalogOptions() {
+        catalogOptions.set(CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_KEY,
+                String.valueOf(clientPoolCacheEvictionIntervalMs));
+        catalogOptions.set(LOCATION_IN_PROPERTIES_KEY, 
String.valueOf(locationInProperties));
+        catalogOptions.set("uri", hmsBaseProperties.getHiveMetastoreUri());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
new file mode 100644
index 00000000000..12dac3bb739
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import java.util.Map;
+
+public class PaimonPropertiesFactory extends 
AbstractMetastorePropertiesFactory {
+
+    private static final String KEY = "paimon.catalog.type";
+    private static final String DEFAULT_TYPE = "filesystem";
+
+    public PaimonPropertiesFactory() {
+        register("dlf", PaimonAliyunDLFMetaStoreProperties::new);
+        register("filesystem", PaimonFileSystemMetaStoreProperties::new);
+        register("hms", PaimonHMSMetaStoreProperties::new);
+    }
+
+    @Override
+    public MetastoreProperties create(Map<String, String> props) {
+        return createInternal(props, KEY, DEFAULT_TYPE);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
index 8cec9d6a63d..b36e4309701 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
@@ -24,6 +24,7 @@ import com.google.common.base.Strings;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
@@ -257,8 +258,23 @@ public abstract class AbstractS3CompatibleProperties 
extends StorageProperties i
 
     @Override
     public void initializeHadoopStorageConfig() {
-        throw new UnsupportedOperationException("Hadoop storage config 
initialization is not"
-                + " supported for S3 compatible storage.");
+        hadoopStorageConfig = new Configuration();
+        // Compatibility note: Due to historical reasons, even when the 
underlying
+        // storage is OSS, OBS, etc., users may still configure the schema as 
"s3://".
+        // To ensure backward compatibility, we append S3-related properties 
by default.
+        appendS3HdfsProperties(hadoopStorageConfig);
+    }
+
+    private void appendS3HdfsProperties(Configuration hadoopStorageConfig) {
+        hadoopStorageConfig.set("fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+        hadoopStorageConfig.set("fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+        hadoopStorageConfig.set("fs.s3a.endpoint", getEndpoint());
+        hadoopStorageConfig.set("fs.s3a.access.key", getAccessKey());
+        hadoopStorageConfig.set("fs.s3a.secret.key", getSecretKey());
+        hadoopStorageConfig.set("fs.s3a.connection.maximum", 
getMaxConnections());
+        hadoopStorageConfig.set("fs.s3a.connection.request.timeout", 
getRequestTimeoutS());
+        hadoopStorageConfig.set("fs.s3a.connection.timeout", 
getConnectionTimeoutS());
+        hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
index a2ae545f048..6fea7cf18ec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 
@@ -130,17 +129,8 @@ public class COSProperties extends 
AbstractS3CompatibleProperties {
 
     @Override
     public void initializeHadoopStorageConfig() {
-        hadoopStorageConfig = new Configuration();
-        hadoopStorageConfig.set("fs.cos.impl", 
"org.apache.hadoop.fs.CosFileSystem");
-        hadoopStorageConfig.set("fs.cosn.impl", 
"org.apache.hadoop.fs.CosFileSystem");
-        hadoopStorageConfig.set("fs.AbstractFileSystem.cos.impl", 
"org.apache.hadoop.fs.Cos");
-        hadoopStorageConfig.set("fs.cos.secretId", accessKey);
-        hadoopStorageConfig.set("fs.cos.secretKey", secretKey);
-        hadoopStorageConfig.set("fs.cos.region", region);
-        hadoopStorageConfig.set("fs.cos.endpoint", endpoint);
-        hadoopStorageConfig.set("fs.cos.connection.timeout", 
connectionTimeoutS);
-        hadoopStorageConfig.set("fs.cos.connection.request.timeout", 
requestTimeoutS);
-        hadoopStorageConfig.set("fs.cos.connection.maximum", maxConnections);
-        hadoopStorageConfig.set("fs.cos.use.path.style", usePathStyle);
+        super.initializeHadoopStorageConfig();
+        hadoopStorageConfig.set("fs.cos.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+        hadoopStorageConfig.set("fs.cosn.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
index 0f5349e1452..75f9fc85881 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 
 import java.util.Map;
 import java.util.Set;
@@ -95,18 +94,4 @@ public class MinioProperties extends 
AbstractS3CompatibleProperties {
     protected Set<Pattern> endpointPatterns() {
         return 
ImmutableSet.of(Pattern.compile("^(?:https?://)?[a-zA-Z0-9.-]+(?::\\d+)?$"));
     }
-
-    @Override
-    public void initializeHadoopStorageConfig() {
-        hadoopStorageConfig = new Configuration();
-        hadoopStorageConfig.set("fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
-        hadoopStorageConfig.set("fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
-        hadoopStorageConfig.set("fs.s3a.endpoint", endpoint);
-        hadoopStorageConfig.set("fs.s3a.access.key", accessKey);
-        hadoopStorageConfig.set("fs.s3a.secret.key", secretKey);
-        hadoopStorageConfig.set("fs.s3a.connection.maximum", maxConnections);
-        hadoopStorageConfig.set("fs.s3a.connection.request.timeout", 
requestTimeoutS);
-        hadoopStorageConfig.set("fs.s3a.connection.timeout", 
connectionTimeoutS);
-        hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
index 051dd9a927c..513727416ef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 
@@ -135,13 +134,7 @@ public class OBSProperties extends 
AbstractS3CompatibleProperties {
 
     @Override
     public void initializeHadoopStorageConfig() {
-        hadoopStorageConfig = new Configuration();
-        hadoopStorageConfig.set("fs.obs.impl", 
"com.obs.services.hadoop.fs.OBSFileSystem");
-        hadoopStorageConfig.set("fs.obs.access.key", accessKey);
-        hadoopStorageConfig.set("fs.obs.secret.key", secretKey);
-        hadoopStorageConfig.set("fs.obs.endpoint", endpoint);
-        hadoopStorageConfig.set("fs.obs.connection.timeout", 
connectionTimeoutS);
-        hadoopStorageConfig.set("fs.obs.request.timeout", requestTimeoutS);
-        hadoopStorageConfig.set("fs.obs.connection.max", maxConnections);
+        super.initializeHadoopStorageConfig();
+        hadoopStorageConfig.set("fs.obs.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
index 1e34311f9e9..7b27a1ea160 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 
@@ -172,15 +171,7 @@ public class OSSProperties extends 
AbstractS3CompatibleProperties {
 
     @Override
     public void initializeHadoopStorageConfig() {
-        hadoopStorageConfig = new Configuration();
-        hadoopStorageConfig.set("fs.oss.impl", 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
-        hadoopStorageConfig.set("fs.oss.endpoint", endpoint);
-        hadoopStorageConfig.set("fs.oss.region", region);
-        hadoopStorageConfig.set("fs.oss.accessKeyId", accessKey);
-        hadoopStorageConfig.set("fs.oss.accessKeySecret", secretKey);
-        hadoopStorageConfig.set("fs.oss.connection.timeout", 
connectionTimeoutS);
-        hadoopStorageConfig.set("fs.oss.connection.max", maxConnections);
-        hadoopStorageConfig.set("fs.oss.connection.request.timeout", 
requestTimeoutS);
-        hadoopStorageConfig.set("fs.oss.use.path.style.access", usePathStyle);
+        super.initializeHadoopStorageConfig();
+        hadoopStorageConfig.set("fs.oss.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index e664edc6188..19a9195ec2f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -23,11 +23,9 @@ import 
org.apache.doris.datasource.property.storage.exception.StoragePropertiesE
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
@@ -39,8 +37,6 @@ import 
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsPr
 import software.amazon.awssdk.services.sts.StsClient;
 import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
 
-import java.lang.reflect.Field;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -219,34 +215,6 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
         return ENDPOINT_PATTERN;
     }
 
-    private static List<Field> getIdentifyFields() {
-        List<Field> fields = Lists.newArrayList();
-        try {
-            //todo AliyunDlfProperties should in OSS storage type.
-            fields.add(S3Properties.class.getDeclaredField("s3AccessKey"));
-            // fixme Add it when MS done
-            
//fields.add(AliyunDLFProperties.class.getDeclaredField("dlfAccessKey"));
-            
//fields.add(AWSGlueProperties.class.getDeclaredField("glueAccessKey"));
-            return fields;
-        } catch (NoSuchFieldException e) {
-            // should not happen
-            throw new RuntimeException("Failed to get field: " + 
e.getMessage(), e);
-        }
-    }
-
-    /*
-    public void toPaimonOSSFileIOProperties(Options options) {
-        options.set("fs.oss.endpoint", s3Endpoint);
-        options.set("fs.oss.accessKeyId", s3AccessKey);
-        options.set("fs.oss.accessKeySecret", s3SecretKey);
-    }
-
-    public void toPaimonS3FileIOProperties(Options options) {
-        options.set("s3.endpoint", s3Endpoint);
-        options.set("s3.access-key", s3AccessKey);
-        options.set("s3.secret-key", s3SecretKey);
-    }*/
-
     @Override
     public Map<String, String> getBackendConfigProperties() {
         Map<String, String> backendProperties = 
generateBackendS3Configuration(s3ConnectionMaximum,
@@ -297,18 +265,17 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
                 InstanceProfileCredentialsProvider.create());
     }
 
-
     @Override
     public void initializeHadoopStorageConfig() {
-        hadoopStorageConfig = new Configuration();
-        hadoopStorageConfig.set("fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
-        hadoopStorageConfig.set("fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
-        hadoopStorageConfig.set("fs.s3a.endpoint", endpoint);
-        hadoopStorageConfig.set("fs.s3a.access.key", accessKey);
-        hadoopStorageConfig.set("fs.s3a.secret.key", secretKey);
-        hadoopStorageConfig.set("fs.s3a.connection.maximum", 
s3ConnectionMaximum);
-        hadoopStorageConfig.set("fs.s3a.connection.request.timeout", 
s3ConnectionRequestTimeoutS);
-        hadoopStorageConfig.set("fs.s3a.connection.timeout", 
s3ConnectionTimeoutS);
-        hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
+        super.initializeHadoopStorageConfig();
+        //Set assumed_roles
+        //@See 
https://hadoop.apache.org/docs/r3.4.1/hadoop-aws/tools/hadoop-aws/assumed_roles.html
+        if (StringUtils.isNotBlank(s3ExternalId) && 
StringUtils.isNotBlank(s3IAMRole)) {
+            //@See org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider
+            hadoopStorageConfig.set("fs.s3a.assumed.role.external.id", 
s3ExternalId);
+            hadoopStorageConfig.set("fs.s3a.assumed.role.arn", s3IAMRole);
+            hadoopStorageConfig.set("fs.s3a.aws.credentials.provider",
+                    
"org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
index 35f399409f7..131b5195c7e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -169,7 +169,7 @@ public class LogicalHudiScan extends LogicalFileScan {
         Optional<IncrementalRelation> newIncrementalRelation = 
Optional.empty();
         if (optScanParams.isPresent() && 
optScanParams.get().incrementalRead()) {
             TableScanParams scanParams = optScanParams.get();
-            Map<String, String> optParams = table.getHadoopProperties();
+            Map<String, String> optParams = 
table.getBackendStorageProperties();
             if (scanParams.getMapParams().containsKey("beginTime")) {
                 optParams.put("hoodie.datasource.read.begin.instanttime", 
scanParams.getMapParams().get("beginTime"));
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index d44b6dd3df3..0a696f64340 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -162,7 +162,7 @@ public class HiveTableSink extends 
BaseExternalTableDataSink {
             
tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
         }
 
-        tSink.setHadoopConfig(targetTable.getHadoopProperties());
+        tSink.setHadoopConfig(targetTable.getBackendStorageProperties());
 
         tDataSink = new TDataSink(getDataSinkType());
         tDataSink.setHiveTableSink(tSink);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to