This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 0a687a6ed90 branch-3.1: [fix](paimon)Support user-defined S3 config 
prefixes and unify to HDFS S3A protocol (#57116) (#57526)
0a687a6ed90 is described below

commit 0a687a6ed901c2acb542c6bb65107248e4fd29b4
Author: Calvin Kirs <[email protected]>
AuthorDate: Fri Oct 31 17:52:35 2025 +0800

    branch-3.1: [fix](paimon)Support user-defined S3 config prefixes and unify 
to HDFS S3A protocol (#57116) (#57526)
    
    cp #57116
---
 .../metastore/AbstractPaimonProperties.java        | 49 +++++++++++-
 .../IcebergFileSystemMetaStoreProperties.java      |  4 +-
 .../metastore/IcebergHMSMetaStoreProperties.java   |  3 -
 .../property/metastore/IcebergRestProperties.java  |  7 +-
 .../PaimonAliyunDLFMetaStoreProperties.java        |  6 +-
 .../PaimonFileSystemMetaStoreProperties.java       |  7 +-
 .../metastore/PaimonHMSMetaStoreProperties.java    |  6 +-
 .../metastore/PaimonRestMetaStoreProperties.java   |  2 +-
 .../storage/AbstractS3CompatibleProperties.java    | 41 ----------
 .../property/storage/AzureProperties.java          |  7 ++
 .../property/storage/BrokerProperties.java         |  8 ++
 .../property/storage/HdfsCompatibleProperties.java |  7 ++
 .../property/storage/LocalProperties.java          |  6 ++
 .../property/storage/StorageProperties.java        | 62 ++++++++++++++-
 .../metastore/AbstractPaimonPropertiesTest.java    | 89 ++++++++++++++++++++++
 .../PaimonRestMetaStorePropertiesTest.java         | 14 +---
 16 files changed, 236 insertions(+), 82 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
index 9c9a49455d1..7d0fca2446c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java
@@ -21,8 +21,10 @@ import 
org.apache.doris.common.security.authentication.ExecutionAuthenticator;
 import org.apache.doris.datasource.property.ConnectorProperty;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 
+import com.google.common.collect.ImmutableList;
 import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
@@ -58,7 +60,7 @@ public abstract class AbstractPaimonProperties extends 
MetastoreProperties {
 
     public abstract Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList);
 
-    protected void appendCatalogOptions(List<StorageProperties> 
storagePropertiesList) {
+    protected void appendCatalogOptions() {
         if (StringUtils.isNotBlank(warehouse)) {
             catalogOptions.set(CatalogOptions.WAREHOUSE.key(), warehouse);
         }
@@ -69,7 +71,10 @@ public abstract class AbstractPaimonProperties extends 
MetastoreProperties {
             if (k.toLowerCase().startsWith(USER_PROPERTY_PREFIX)) {
                 String newKey = k.substring(USER_PROPERTY_PREFIX.length());
                 if (StringUtils.isNotBlank(newKey)) {
-                    catalogOptions.set(newKey, v);
+                    boolean excluded = 
userStoragePrefixes.stream().anyMatch(k::startsWith);
+                    if (!excluded) {
+                        catalogOptions.set(newKey, v);
+                    }
                 }
             }
         });
@@ -78,12 +83,16 @@ public abstract class AbstractPaimonProperties extends 
MetastoreProperties {
     /**
      * Build catalog options including common and subclass-specific ones.
      */
-    public void buildCatalogOptions(List<StorageProperties> 
storagePropertiesList) {
+    public void buildCatalogOptions() {
         catalogOptions = new Options();
-        appendCatalogOptions(storagePropertiesList);
+        appendCatalogOptions();
         appendCustomCatalogOptions();
     }
 
+    protected void appendUserHadoopConfig(Configuration  conf) {
+        normalizeS3Config().forEach(conf::set);
+    }
+
     public Map<String, String> getCatalogOptionsMap() {
         // Return the cached map if already initialized
         Map<String, String> existing = catalogOptionsMapRef.get();
@@ -112,6 +121,38 @@ public abstract class AbstractPaimonProperties extends 
MetastoreProperties {
         }
     }
 
+    /**
+     * @See org.apache.paimon.s3.S3FileIO
+     * Possible S3 config key prefixes:
+     * 1. "s3."      - Paimon legacy custom prefix
+     * 2. "s3a."     - Paimon-supported shorthand
+     * 3. "fs.s3a."  - Hadoop S3A official prefix
+     *
+     * All of them are normalized to the Hadoop-recognized prefix "fs.s3a."
+     */
+    private final List<String> userStoragePrefixes = ImmutableList.of(
+                    "paimon.s3.", "paimon.s3a.", "paimon.fs.s3.", 
"paimon.fs.oss."
+    );
+
+    /** Hadoop S3A standard prefix */
+    private static final String FS_S3A_PREFIX = "fs.s3a.";
+
+    /**
+     * Normalizes user-provided S3 config keys to Hadoop S3A keys
+     */
+    protected Map<String, String> normalizeS3Config() {
+        Map<String, String> result = new HashMap<>();
+        origProps.forEach((key, value) -> {
+            for (String prefix : userStoragePrefixes) {
+                if (key.startsWith(prefix)) {
+                    result.put(FS_S3A_PREFIX + key.substring(prefix.length()), 
value);
+                    return; // stop after the first matching prefix
+                }
+            }
+        });
+        return result;
+    }
+
 
     /**
      * Hook method for subclasses to append metastore-specific or custom 
catalog options.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
index 9323d78e318..d644b7b06e0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
@@ -46,7 +46,7 @@ public class IcebergFileSystemMetaStoreProperties extends 
AbstractIcebergPropert
                                List<StorageProperties> storagePropertiesList) {
         Configuration configuration = 
buildConfiguration(storagePropertiesList);
         HadoopCatalog catalog = new HadoopCatalog();
-        buildCatalogProps(catalogProps, storagePropertiesList);
+        buildCatalogProps(storagePropertiesList);
         catalog.setConf(configuration);
         try {
             this.executionAuthenticator.execute(() -> {
@@ -71,7 +71,7 @@ public class IcebergFileSystemMetaStoreProperties extends 
AbstractIcebergPropert
         return configuration;
     }
 
-    private void buildCatalogProps(Map<String, String> props, 
List<StorageProperties> storagePropertiesList) {
+    private void buildCatalogProps(List<StorageProperties> 
storagePropertiesList) {
         if (storagePropertiesList.size() == 1 && storagePropertiesList.get(0) 
instanceof HdfsProperties) {
             HdfsProperties hdfsProps = (HdfsProperties) 
storagePropertiesList.get(0);
             if (hdfsProps.isKerberos()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
index fe04cd18026..289d682b23c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
@@ -69,9 +69,6 @@ public class IcebergHMSMetaStoreProperties extends 
AbstractIcebergProperties {
         HiveCatalog hiveCatalog = new HiveCatalog();
         hiveCatalog.setConf(conf);
         storagePropertiesList.forEach(sp -> {
-            for (Map.Entry<String, String> entry : 
sp.getHadoopStorageConfig()) {
-                catalogProps.put(entry.getKey(), entry.getValue());
-            }
             // NOTE: Custom FileIO implementation (KerberizedHadoopFileIO) is 
commented out by default.
             // Using FileIO for Kerberos authentication may cause 
serialization issues when accessing
             // Iceberg system tables (e.g., history, snapshots, manifests).
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
index b899116a1a8..026f9c11533 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
@@ -21,7 +21,6 @@ import 
org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.property.ConnectorProperty;
 import org.apache.doris.datasource.property.ParamRules;
 import 
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
-import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 
 import com.google.common.collect.Maps;
@@ -320,14 +319,12 @@ public class IcebergRestProperties extends 
AbstractIcebergProperties {
             Map<String, String> fileIOProperties, Configuration conf) {
 
         for (StorageProperties storageProperties : storagePropertiesList) {
-            if (storageProperties instanceof HdfsCompatibleProperties) {
-                
storageProperties.getBackendConfigProperties().forEach(conf::set);
-            } else if (storageProperties instanceof 
AbstractS3CompatibleProperties) {
+            if (storageProperties instanceof AbstractS3CompatibleProperties) {
                 // For all S3-compatible storage types, put properties in 
fileIOProperties map
                 toS3FileIOProperties((AbstractS3CompatibleProperties) 
storageProperties, fileIOProperties);
             } else {
                 // For other storage types, just use fileIOProperties map
-                
fileIOProperties.putAll(storageProperties.getBackendConfigProperties());
+                
storageProperties.getBackendConfigProperties().forEach(conf::set);
             }
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
index ae4aeda48f8..a3e6c9dd85a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java
@@ -87,7 +87,7 @@ public class PaimonAliyunDLFMetaStoreProperties extends 
AbstractPaimonProperties
     @Override
     public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
         HiveConf hiveConf = buildHiveConf();
-        buildCatalogOptions(storagePropertiesList);
+        buildCatalogOptions();
         StorageProperties ossProps = storagePropertiesList.stream()
                 .filter(sp -> sp.getType() == StorageProperties.Type.OSS)
                 .findFirst()
@@ -97,10 +97,8 @@ public class PaimonAliyunDLFMetaStoreProperties extends 
AbstractPaimonProperties
             throw new IllegalStateException("Expected OSSProperties type.");
         }
         OSSProperties ossProperties = (OSSProperties) ossProps;
-        for (Map.Entry<String, String> entry : 
ossProperties.getHadoopStorageConfig()) {
-            catalogOptions.set(entry.getKey(), entry.getValue());
-        }
         hiveConf.addResource(ossProperties.getHadoopStorageConfig());
+        appendUserHadoopConfig(hiveConf);
         CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
hiveConf);
         return CatalogFactory.createCatalog(catalogContext);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
index 9a74775b9b2..df0ebae9749 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java
@@ -38,19 +38,16 @@ public class PaimonFileSystemMetaStoreProperties extends 
AbstractPaimonPropertie
 
     @Override
     public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
-        buildCatalogOptions(storagePropertiesList);
+        buildCatalogOptions();
         Configuration conf = new Configuration();
         storagePropertiesList.forEach(storageProperties -> {
-            for (Map.Entry<String, String> entry : 
storageProperties.getHadoopStorageConfig()) {
-                catalogOptions.set(entry.getKey(), entry.getValue());
-            }
             conf.addResource(storageProperties.getHadoopStorageConfig());
             if 
(storageProperties.getType().equals(StorageProperties.Type.HDFS)) {
                 this.executionAuthenticator = new 
HadoopExecutionAuthenticator(((HdfsProperties) storageProperties)
                         .getHadoopAuthenticator());
             }
         });
-
+        appendUserHadoopConfig(conf);
         CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
conf);
         try {
             return this.executionAuthenticator.execute(() -> 
CatalogFactory.createCatalog(catalogContext));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
index 2f67d109dac..24342fce457 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java
@@ -87,11 +87,9 @@ public class PaimonHMSMetaStoreProperties extends 
AbstractPaimonProperties {
 
     @Override
     public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
+        buildCatalogOptions();
         Configuration conf = buildHiveConfiguration(storagePropertiesList);
-        buildCatalogOptions(storagePropertiesList);
-        for (Map.Entry<String, String> entry : conf) {
-            catalogOptions.set(entry.getKey(), entry.getValue());
-        }
+        appendUserHadoopConfig(conf);
         CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
conf);
         try {
             return executionAuthenticator.execute(() -> 
CatalogFactory.createCatalog(catalogContext));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStoreProperties.java
index 91e47c10253..85651a2dc83 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStoreProperties.java
@@ -77,7 +77,7 @@ public class PaimonRestMetaStoreProperties extends 
AbstractPaimonProperties {
 
     @Override
     public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
-        buildCatalogOptions(storagePropertiesList);
+        buildCatalogOptions();
         CatalogContext catalogContext = CatalogContext.create(catalogOptions);
         return CatalogFactory.createCatalog(catalogContext);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
index c4cc493abe9..4b8997b6d56 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
@@ -21,7 +21,6 @@ import org.apache.doris.common.UserException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import org.apache.commons.lang3.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.logging.log4j.LogManager;
@@ -226,8 +225,6 @@ public abstract class AbstractS3CompatibleProperties 
extends StorageProperties i
 
     protected abstract Set<Pattern> endpointPatterns();
 
-    protected abstract Set<String> schemas();
-
     // This method should be overridden by subclasses to provide a default 
endpoint based on the region.
     // Because for aws s3, only region is needed, the endpoint can be 
constructed from the region.
     // But for other s3 compatible storage, the endpoint may need to be 
specified explicitly.
@@ -249,16 +246,10 @@ public abstract class AbstractS3CompatibleProperties 
extends StorageProperties i
     @Override
     public void initializeHadoopStorageConfig() {
         hadoopStorageConfig = new Configuration();
-        origProps.forEach((key, value) -> {
-            if (key.startsWith("fs.")) {
-                hadoopStorageConfig.set(key, value);
-            }
-        });
         // Compatibility note: Due to historical reasons, even when the 
underlying
         // storage is OSS, OBS, etc., users may still configure the schema as 
"s3://".
         // To ensure backward compatibility, we append S3-related properties 
by default.
         appendS3HdfsProperties(hadoopStorageConfig);
-        ensureDisableCache(hadoopStorageConfig, origProps);
     }
 
     private void appendS3HdfsProperties(Configuration hadoopStorageConfig) {
@@ -284,38 +275,6 @@ public abstract class AbstractS3CompatibleProperties 
extends StorageProperties i
         hadoopStorageConfig.set("fs.s3a.path.style.access", getUsePathStyle());
     }
 
-    /**
-     * By default, Hadoop caches FileSystem instances per scheme and authority 
(e.g. s3a://bucket/), meaning that all
-     * subsequent calls using the same URI will reuse the same FileSystem 
object.
-     * In multi-tenant or dynamic credential environments — where different 
users may access the same bucket using
-     * different access keys or tokens — this cache reuse can lead to 
cross-credential contamination.
-     * <p>
-     * Specifically, if the cache is not disabled, a FileSystem instance 
initialized with one set of credentials may
-     * be reused by another session targeting the same bucket but with a 
different AK/SK. This results in:
-     * <p>
-     * Incorrect authentication (using stale credentials)
-     * <p>
-     * Unexpected permission errors or access denial
-     * <p>
-     * Potential data leakage between users
-     * <p>
-     * To avoid such risks, the configuration property
-     * fs.<schema>.impl.disable.cache
-     * must be set to true for all object storage backends (e.g., S3A, OSS, 
COS, OBS), ensuring that each new access
-     * creates an isolated FileSystem instance with its own credentials and 
configuration context.
-     */
-    private void ensureDisableCache(Configuration conf, Map<String, String> 
origProps) {
-        for (String schema : schemas()) {
-            String key = "fs." + schema + ".impl.disable.cache";
-            String userValue = origProps.get(key);
-            if (StringUtils.isNotBlank(userValue)) {
-                conf.setBoolean(key, BooleanUtils.toBoolean(userValue));
-            } else {
-                conf.setBoolean(key, true);
-            }
-        }
-    }
-
     @Override
     public String getStorageName() {
         return "S3";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
index 38315f00077..83ee7b932b7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.property.ConnectorProperty;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.hadoop.conf.Configuration;
@@ -29,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Stream;
 
 /**
@@ -188,6 +190,11 @@ public class AzureProperties extends StorageProperties {
         setAzureAccountKeys(hadoopStorageConfig, accountName, accountKey);
     }
 
+    @Override
+    protected Set<String> schemas() {
+        return ImmutableSet.of("wasb", "wasbs", "abfs", "abfss");
+    }
+
     private static void setAzureAccountKeys(Configuration conf, String 
accountName, String accountKey) {
         String[] endpoints = {
                 "dfs.core.windows.net",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/BrokerProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/BrokerProperties.java
index 5c61597a09d..2987eb762ae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/BrokerProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/BrokerProperties.java
@@ -20,12 +20,14 @@ package org.apache.doris.datasource.property.storage;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.property.ConnectorProperty;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import lombok.Getter;
 import lombok.Setter;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 public class BrokerProperties extends StorageProperties {
 
@@ -94,6 +96,12 @@ public class BrokerProperties extends StorageProperties {
         // do nothing
     }
 
+    @Override
+    protected Set<String> schemas() {
+        //not used
+        return ImmutableSet.of();
+    }
+
     private Map<String, String> extractBrokerProperties() {
         Map<String, String> brokerProperties = new HashMap<>();
         for (String key : origProps.keySet()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsCompatibleProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsCompatibleProperties.java
index 39d3fa2bc4e..9a2d05d841f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsCompatibleProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsCompatibleProperties.java
@@ -21,9 +21,11 @@ import 
org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import 
org.apache.doris.common.security.authentication.HadoopSimpleAuthenticator;
 import 
org.apache.doris.common.security.authentication.SimpleAuthenticationConfig;
 
+import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 
 import java.util.Map;
+import java.util.Set;
 
 public abstract class HdfsCompatibleProperties extends StorageProperties {
 
@@ -49,4 +51,9 @@ public abstract class HdfsCompatibleProperties extends 
StorageProperties {
         //nothing to do
     }
 
+    @Override
+    protected Set<String> schemas() {
+        return ImmutableSet.of("hdfs");
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/LocalProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/LocalProperties.java
index b80c88b3134..e123ae0882b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/LocalProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/LocalProperties.java
@@ -24,6 +24,7 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import java.util.Map;
+import java.util.Set;
 
 public class LocalProperties extends StorageProperties {
     public static final String PROP_FILE_PATH = "file_path";
@@ -80,4 +81,9 @@ public class LocalProperties extends StorageProperties {
         hadoopStorageConfig.set("fs.local.impl", 
"org.apache.hadoop.fs.LocalFileSystem");
         hadoopStorageConfig.set("fs.file.impl", 
"org.apache.hadoop.fs.LocalFileSystem");
     }
+
+    @Override
+    protected Set<String> schemas() {
+        return ImmutableSet.of();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index ee75bc3b63d..1dbd417de64 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -23,6 +23,8 @@ import org.apache.doris.datasource.property.ConnectorProperty;
 import 
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
 
 import lombok.Getter;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import java.lang.reflect.Field;
@@ -31,6 +33,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 
 public abstract class StorageProperties extends ConnectionProperties {
@@ -51,6 +54,8 @@ public abstract class StorageProperties extends 
ConnectionProperties {
 
     public static final String FS_PROVIDER_KEY = "provider";
 
+    protected  final String userFsPropsPrefix = "fs.";
+
     public enum Type {
         HDFS,
         S3,
@@ -131,7 +136,7 @@ public abstract class StorageProperties extends 
ConnectionProperties {
 
         for (StorageProperties storageProperties : result) {
             storageProperties.initNormalizeAndCheckProps();
-            storageProperties.initializeHadoopStorageConfig();
+            storageProperties.buildHadoopStorageConfig();
         }
         return result;
     }
@@ -151,7 +156,7 @@ public abstract class StorageProperties extends 
ConnectionProperties {
             StorageProperties p = func.apply(origProps);
             if (p != null) {
                 p.initNormalizeAndCheckProps();
-                p.initializeHadoopStorageConfig();
+                p.buildHadoopStorageConfig();
                 return p;
             }
         }
@@ -233,5 +238,56 @@ public abstract class StorageProperties extends 
ConnectionProperties {
 
     public abstract String getStorageName();
 
-    public abstract void initializeHadoopStorageConfig();
+    private void buildHadoopStorageConfig() {
+        initializeHadoopStorageConfig();
+        if (null == hadoopStorageConfig) {
+            return;
+        }
+        appendUserFsConfig(origProps);
+        ensureDisableCache(hadoopStorageConfig, origProps);
+    }
+
+    private void appendUserFsConfig(Map<String, String> userProps) {
+        userProps.forEach((k, v) -> {
+            if (k.startsWith(userFsPropsPrefix) && StringUtils.isNotBlank(v)) {
+                hadoopStorageConfig.set(k, v);
+            }
+        });
+    }
+
+    protected abstract void initializeHadoopStorageConfig();
+
+    protected abstract Set<String> schemas();
+
+    /**
+     * By default, Hadoop caches FileSystem instances per scheme and authority 
(e.g. s3a://bucket/), meaning that all
+     * subsequent calls using the same URI will reuse the same FileSystem 
object.
+     * In multi-tenant or dynamic credential environments — where different 
users may access the same bucket using
+     * different access keys or tokens — this cache reuse can lead to 
cross-credential contamination.
+     * <p>
+     * Specifically, if the cache is not disabled, a FileSystem instance 
initialized with one set of credentials may
+     * be reused by another session targeting the same bucket but with a 
different AK/SK. This results in:
+     * <p>
+     * Incorrect authentication (using stale credentials)
+     * <p>
+     * Unexpected permission errors or access denial
+     * <p>
+     * Potential data leakage between users
+     * <p>
+     * To avoid such risks, the configuration property
+     * fs.<schema>.impl.disable.cache
+     * must be set to true for all object storage backends (e.g., S3A, OSS, 
COS, OBS), ensuring that each new access
+     * creates an isolated FileSystem instance with its own credentials and 
configuration context.
+     */
+    private void ensureDisableCache(Configuration conf, Map<String, String> 
origProps) {
+        for (String schema : schemas()) {
+            String key = "fs." + schema + ".impl.disable.cache";
+            String userValue = origProps.get(key);
+            if (StringUtils.isNotBlank(userValue)) {
+                conf.setBoolean(key, BooleanUtils.toBoolean(userValue));
+            } else {
+                conf.setBoolean(key, true);
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractPaimonPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractPaimonPropertiesTest.java
new file mode 100644
index 00000000000..e5a775ba6e3
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractPaimonPropertiesTest.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.paimon.catalog.Catalog;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AbstractPaimonPropertiesTest {
+
+    private static class TestPaimonProperties extends AbstractPaimonProperties 
{
+
+
+        protected TestPaimonProperties(Map<String, String> props) {
+            super(props);
+        }
+
+        @Override
+        public String getPaimonCatalogType() {
+            return "test";
+        }
+
+        @Override
+        public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
+            return null;
+        }
+
+        @Override
+        protected void appendCustomCatalogOptions() {
+
+        }
+
+        @Override
+        protected String getMetastoreType() {
+            return "test";
+        }
+    }
+
+    TestPaimonProperties props;
+
+    @BeforeEach
+    void setup() {
+        Map<String, String> input = new HashMap<>();
+        input.put("warehouse", "s3://tmp/warehouse");
+        input.put("paimon.metastore", "filesystem");
+        input.put("paimon.s3.access-key", "AK");
+        input.put("paimon.s3.secret-key", "SK");
+        input.put("paimon.custom.key", "value");
+        props = new TestPaimonProperties(input);
+    }
+
+    @Test
+    void testNormalizeS3Config() {
+        Map<String, String> input = new HashMap<>();
+        input.put("paimon.s3.list.version", "1");
+        input.put("paimon.s3.paging.maximum", "100");
+        input.put("paimon.fs.s3.read.ahead.buffer.size", "1");
+        input.put("paimon.s3a.replication.factor", "3");
+        TestPaimonProperties testProps = new TestPaimonProperties(input);
+        Map<String, String> result = testProps.normalizeS3Config();
+        Assertions.assertTrue("1".equals(result.get("fs.s3a.list.version")));
+        
Assertions.assertTrue("100".equals(result.get("fs.s3a.paging.maximum")));
+        
Assertions.assertTrue("1".equals(result.get("fs.s3a.read.ahead.buffer.size")));
+        
Assertions.assertTrue("3".equals(result.get("fs.s3a.replication.factor")));
+    }
+
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStorePropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStorePropertiesTest.java
index 265e67c3b65..cbfa6a01c80 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStorePropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStorePropertiesTest.java
@@ -18,15 +18,12 @@
 package org.apache.doris.datasource.property.metastore;
 
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
-import org.apache.doris.datasource.property.storage.StorageProperties;
 
 import org.apache.paimon.options.Options;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class PaimonRestMetaStorePropertiesTest {
@@ -63,9 +60,8 @@ public class PaimonRestMetaStorePropertiesTest {
         restProps2.initNormalizeAndCheckProps();
 
         // Both should work and set the same URI in catalog options
-        List<StorageProperties> storagePropertiesList = new ArrayList<>();
-        restProps1.buildCatalogOptions(storagePropertiesList);
-        restProps2.buildCatalogOptions(storagePropertiesList);
+        restProps1.buildCatalogOptions();
+        restProps2.buildCatalogOptions();
 
         Options options1 = restProps1.getCatalogOptions();
         Options options2 = restProps2.getCatalogOptions();
@@ -87,8 +83,7 @@ public class PaimonRestMetaStorePropertiesTest {
         PaimonRestMetaStoreProperties restProps = new 
PaimonRestMetaStoreProperties(props);
         restProps.initNormalizeAndCheckProps();
 
-        List<StorageProperties> storagePropertiesList = new ArrayList<>();
-        restProps.buildCatalogOptions(storagePropertiesList);
+        restProps.buildCatalogOptions();
         Options catalogOptions = restProps.getCatalogOptions();
 
         // Basic URI should be set
@@ -356,8 +351,7 @@ public class PaimonRestMetaStorePropertiesTest {
         PaimonRestMetaStoreProperties restProps = new 
PaimonRestMetaStoreProperties(props);
         restProps.initNormalizeAndCheckProps();
 
-        List<StorageProperties> storagePropertiesList = new ArrayList<>();
-        restProps.buildCatalogOptions(storagePropertiesList);
+        restProps.buildCatalogOptions();
         Options catalogOptions = restProps.getCatalogOptions();
 
         // paimon.rest.* properties should be passed through without prefix


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to