This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new be7617e192d [feat]Refactor: Storage Property Conversion Separation and 
Unification (#50031)
be7617e192d is described below

commit be7617e192db81fb6c8d025723f80b40097c963d
Author: Calvin Kirs <[email protected]>
AuthorDate: Sat Apr 19 15:03:55 2025 +0800

    [feat]Refactor: Storage Property Conversion Separation and Unification 
(#50031)
    
    ## Background
    Previously, all storage-related property conversions were handled in a
    single class: PropertyConvert. This class included logic for multiple
    components such as:
    
    - BE storage configuration
    
    - Frontend (FE) object storage
    
    - HDFS(FE) configuration
    
    Over time, this approach introduced several problems:
    
    Tight Coupling: Different storage types (e.g., S3, OSS, COS, HDFS) were
    processed in a mixed manner.
    
    Inconsistent Behavior: The same storage type behaved differently across
    components. For instance:
    
    Some services accepted https:// style URIs.
    
    Others only accepted s3:// style URIs.
    
    High Maintenance Cost: Adding or updating logic for a single storage
    type risked breaking unrelated paths.
    
    Low Extensibility: Introducing new storage types or protocols required
    invasive changes to centralized logic.
    
    ## Changed
    This PR refactors the property conversion logic with the following
    goals:
    
    ### Separation of Responsibility:
    
    Each storage type (e.g., S3, COS, HDFS) now manages its own property
    parsing and conversion.
    
    No cross-dependency between different storage implementations.
    
    ### Unified Interface for Upper Layers:
    
    A single unified interface is exposed to business logic (e.g.,
    generating properties for BE).
    
    Upper layers no longer care about the specific storage type or URI
    scheme.
    
    ### Consistent Behavior Across Components:
    
    Each storage implementation defines its own rules.
    
    Eliminates inconsistencies like accepting different URI styles in
    different parts of the system.
    
    ### Future-Friendly Design:
    
    Lays the groundwork for plugin-based SPI support.
---
 fe/fe-common/pom.xml                               |   5 +
 .../doris/common/CatalogConfigFileUtils.java       |  91 ++++++++
 .../main/java/org/apache/doris/common/Config.java  |   5 +
 .../datasource/property/ConnectionProperties.java  | 123 +++++++++++
 .../datasource/property/ConnectorProperty.java     |  30 +++
 .../doris/datasource/property/PropertyUtils.java   |  46 +++++
 .../storage/AbstractObjectStorageProperties.java   | 214 +++++++++++++++++++
 .../datasource/property/storage/COSProperties.java | 104 ++++++++++
 .../property/storage/HdfsProperties.java           | 188 +++++++++++++++++
 .../property/storage/HdfsPropertiesUtils.java      | 142 +++++++++++++
 .../datasource/property/storage/OBSProperties.java | 106 ++++++++++
 .../datasource/property/storage/OSSProperties.java | 125 +++++++++++
 .../property/storage/ObjectStorageProperties.java  |  37 ++++
 .../datasource/property/storage/S3Properties.java  | 230 +++++++++++++++++++++
 .../property/storage/S3PropertyUtils.java          | 134 ++++++++++++
 .../property/storage/StorageProperties.java        | 168 +++++++++++++++
 .../property/storage/COSPropertiesTest.java        | 145 +++++++++++++
 .../property/storage/HdfsPropertiesTest.java       | 179 ++++++++++++++++
 .../property/storage/HdfsPropertiesUtilsTest.java  | 147 +++++++++++++
 .../property/storage/OBSPropertyTest.java          | 126 +++++++++++
 .../property/storage/OSSPropertiesTest.java        | 126 +++++++++++
 .../property/storage/S3PropertiesTest.java         | 152 ++++++++++++++
 .../property/storage/S3PropertyUtilsTest.java      | 110 ++++++++++
 .../plugins/hadoop_conf/hadoop1/core-site.xml      |  40 ++++
 .../plugins/hadoop_conf/hadoop1/hdfs-site.xml      |  50 +++++
 .../plugins/hive-conf/hive1/hive-site.xml          |  26 +++
 26 files changed, 2849 insertions(+)

diff --git a/fe/fe-common/pom.xml b/fe/fe-common/pom.xml
index 55934fd4e24..a02f4e1cb9c 100644
--- a/fe/fe-common/pom.xml
+++ b/fe/fe-common/pom.xml
@@ -56,6 +56,11 @@ under the License.
             <groupId>org.apache.thrift</groupId>
             <artifactId>libthrift</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>hive-catalog-shade</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/CatalogConfigFileUtils.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/CatalogConfigFileUtils.java
new file mode 100644
index 00000000000..6a1c552d972
--- /dev/null
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/CatalogConfigFileUtils.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.File;
+import java.util.function.BiConsumer;
+
+public class CatalogConfigFileUtils {
+
+    /**
+     * Loads configuration files from the specified directory into a Hadoop 
Configuration or HiveConf object.
+     *
+     * @param resourcesPath The comma-separated list of configuration resource 
files to load.
+     *                      This must not be null or empty.
+     * @param configDir The base directory where the configuration files are 
located.
+     * @param addResourceMethod A method reference to add the resource to the 
configuration.
+     * @param <T> The type of configuration object (either Hadoop 
Configuration or HiveConf).
+     * @return The populated configuration object.
+     * @throws IllegalArgumentException If the provided resourcesPath is 
blank, or if any of the specified
+     *                                  configuration files do not exist or 
are not regular files.
+     */
+    private static <T> T loadConfigFromDir(String resourcesPath, String 
configDir,
+                                           BiConsumer<T, Path> 
addResourceMethod) {
+        // Check if the provided resourcesPath is blank and throw an exception 
if so.
+        if (StringUtils.isBlank(resourcesPath)) {
+            throw new IllegalArgumentException("Config resource path is 
empty");
+        }
+
+        // Create a new configuration object.
+        T conf = (T) (configDir.equals(Config.hadoop_config_dir) ? new 
Configuration(false) : new HiveConf());
+
+        // Iterate over the comma-separated list of resource files.
+        for (String resource : resourcesPath.split(",")) {
+            // Construct the full path to the resource file.
+            String resourcePath = configDir + resource.trim();
+            File file = new File(resourcePath);
+
+            // Check if the file exists and is a regular file; if not, throw 
an exception.
+            if (file.exists() && file.isFile()) {
+                // Add the resource file to the configuration object.
+                addResourceMethod.accept(conf, new Path(file.toURI()));
+            } else {
+                // Throw an exception if the file does not exist or is not a 
regular file.
+                throw new IllegalArgumentException("Config resource file does 
not exist: " + resourcePath);
+            }
+        }
+        return conf;
+    }
+
+    /**
+     * Loads the Hadoop configuration files from the specified directory.
+     * @param resourcesPath The comma-separated list of Hadoop configuration 
resource files to load.
+     * @return The Hadoop `Configuration` object with the loaded configuration 
files.
+     * @throws IllegalArgumentException If the provided `resourcesPath` is 
blank, or if any of the specified
+     *                                  configuration files do not exist or 
are not regular files.
+     */
+    public static Configuration loadConfigurationFromHadoopConfDir(String 
resourcesPath) {
+        return loadConfigFromDir(resourcesPath, Config.hadoop_config_dir, 
Configuration::addResource);
+    }
+
+    /**
+     * Loads the Hive configuration files from the specified directory.
+     * @param resourcesPath The comma-separated list of Hive configuration 
resource files to load.
+     * @return The HiveConf object with the loaded configuration files.
+     * @throws IllegalArgumentException If the provided `resourcesPath` is 
blank, or if any of the specified
+     *                                  configuration files do not exist or 
are not regular files.
+     */
+    public static HiveConf loadHiveConfFromHiveConfDir(String resourcesPath) {
+        return loadConfigFromDir(resourcesPath, Config.hadoop_config_dir, 
HiveConf::addResource);
+    }
+}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 9aac3378b7d..7f7e7c2ad4f 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3072,6 +3072,11 @@ public class Config extends ConfigBase {
     })
     public static boolean enable_feature_data_sync_job = false;
 
+    @ConfField(description = {
+            "存放 hadoop conf 配置文件的默认目录。",
+            "The default directory for storing hadoop conf configuration 
files."})
+    public static String hadoop_config_dir = EnvUtils.getDorisHome() + 
"/plugins/hadoop_conf/";
+
     
//==========================================================================
     //                    begin of cloud config
     
//==========================================================================
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
new file mode 100644
index 00000000000..7fe00cedec7
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property;
+
+import org.apache.doris.common.CatalogConfigFileUtils;
+import org.apache.doris.common.UserException;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.hadoop.conf.Configuration;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class ConnectionProperties {
+    /**
+     * The original user-provided properties.
+     * <p>
+     * This map may contain various configuration entries, not all of which 
are relevant
+     * to the specific Connector implementation. It serves as the raw input 
from the user.
+     */
+    @Getter
+    @Setter
+    protected Map<String, String> origProps;
+
+    /**
+     * The filtered properties that are actually used by the Connector.
+     * <p>
+     * This map only contains key-value pairs that are recognized and matched 
by
+     * the specific Connector implementation. It's a subset of {@code 
origProps}.
+     */
+    @Getter
+    protected Map<String, String> matchedProperties = new HashMap<>();
+
+    protected ConnectionProperties(Map<String, String> origProps) {
+        this.origProps = origProps;
+    }
+
+    protected void initNormalizeAndCheckProps() throws UserException {
+        List<Field> supportedProps = 
PropertyUtils.getConnectorProperties(this.getClass());
+        for (Field field : supportedProps) {
+            field.setAccessible(true);
+            ConnectorProperty anno = 
field.getAnnotation(ConnectorProperty.class);
+            String[] names = anno.names();
+            for (String name : names) {
+                if (origProps.containsKey(name)) {
+                    try {
+                        field.set(this, origProps.get(name));
+                        matchedProperties.put(name, origProps.get(name));
+                    } catch (IllegalAccessException e) {
+                        throw new RuntimeException("Failed to set property " + 
name + ", " + e.getMessage(), e);
+                    }
+                    break;
+                }
+            }
+        }
+        // 3. check properties
+        checkRequiredProperties();
+    }
+
+    // Some properties may be loaded from file
+    // Subclass can override this method to load properties from file.
+    // The return value is the properties loaded from file, not include 
original properties
+    protected Map<String, String> loadConfigFromFile(String resourceConfig) {
+        if (Strings.isNullOrEmpty(resourceConfig)) {
+            return new HashMap<>();
+        }
+        if (Strings.isNullOrEmpty(origProps.get(resourceConfig))) {
+            return Maps.newHashMap();
+        }
+        Configuration conf = 
CatalogConfigFileUtils.loadConfigurationFromHadoopConfDir(origProps.get(resourceConfig));
+        Map<String, String> confMap = Maps.newHashMap();
+        for (Map.Entry<String, String> entry : conf) {
+            confMap.put(entry.getKey(), entry.getValue());
+        }
+        return confMap;
+    }
+
+    // Subclass can override this method to return the property name of 
resource config.
+    protected String getResourceConfigPropName() {
+        return null;
+    }
+
+    // This method will check if all required properties are set.
+    // Subclass can implement this method for additional check.
+    protected void checkRequiredProperties() {
+        List<Field> supportedProps = 
PropertyUtils.getConnectorProperties(this.getClass());
+        for (Field field : supportedProps) {
+            field.setAccessible(true);
+            ConnectorProperty anno = 
field.getAnnotation(ConnectorProperty.class);
+            String[] names = anno.names();
+            if (anno.required() && field.getType().equals(String.class)) {
+                try {
+                    String value = (String) field.get(this);
+                    if (Strings.isNullOrEmpty(value)) {
+                        throw new IllegalArgumentException("Property " + 
names[0] + " is required.");
+                    }
+                } catch (IllegalAccessException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectorProperty.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectorProperty.java
new file mode 100644
index 00000000000..9fcaa5a6014
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectorProperty.java
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ConnectorProperty {
+    String[] names() default {};
+    String description() default "";
+
+    boolean required() default true;
+    boolean supported() default true;
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyUtils.java
new file mode 100644
index 00000000000..f02f01efad3
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyUtils.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property;
+
+import com.google.common.collect.Lists;
+
+import java.lang.reflect.Field;
+import java.util.List;
+
+public class PropertyUtils {
+
+    // Get all fields of a class with annotation @ConnectorProperty
+    public static List<Field> getConnectorProperties(Class<?> clazz) {
+        List<Field> fields = Lists.newArrayList();
+        Class<?> currentClass = clazz;
+
+        while (currentClass != null) {
+            for (Field field : currentClass.getDeclaredFields()) {
+                field.setAccessible(true);
+                if (field.isAnnotationPresent(ConnectorProperty.class)) {
+                    ConnectorProperty connectorProperty = 
field.getAnnotation(ConnectorProperty.class);
+                    if (connectorProperty.supported()) {
+                        fields.add(field);
+                    }
+                }
+            }
+            currentClass = currentClass.getSuperclass();
+        }
+        return fields;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java
new file mode 100644
index 00000000000..26ac10fb32a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.ConnectorProperty;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Abstract base class for object storage system properties. This class 
provides common configuration
+ * settings for object storage systems and supports conversion of these 
properties into configuration
+ * maps for different protocols, such as AWS S3. All object storage systems 
should extend this class
+ * to inherit the common configuration properties and methods.
+ * <p>
+ * The properties include connection settings (e.g., timeouts and maximum 
connections) and a flag to
+ * determine if path-style URLs should be used for the storage system.
+ */
+public abstract class AbstractObjectStorageProperties extends 
StorageProperties implements ObjectStorageProperties {
+
+    /**
+     * The maximum number of concurrent connections that can be made to the 
object storage system.
+     * This value is optional and can be configured by the user.
+     */
+    @Getter
+    @ConnectorProperty(names = {"connection.maximum"}, required = false, 
description = "Maximum number of connections.")
+    protected String maxConnections = "100";
+
+    /**
+     * The timeout (in milliseconds) for requests made to the object storage 
system.
+     * This value is optional and can be configured by the user.
+     */
+    @Getter
+    @ConnectorProperty(names = {"connection.request.timeout"}, required = 
false,
+            description = "Request timeout in seconds.")
+    protected String requestTimeoutS = "10000";
+
+    /**
+     * The timeout (in milliseconds) for establishing a connection to the 
object storage system.
+     * This value is optional and can be configured by the user.
+     */
+    @Getter
+    @ConnectorProperty(names = {"connection.timeout"}, required = false, 
description = "Connection timeout in seconds.")
+    protected String connectionTimeoutS = "10000";
+
+    /**
+     * Flag indicating whether to use path-style URLs for the object storage 
system.
+     * This value is optional and can be configured by the user.
+     */
+    @Setter
+    @Getter
+    @ConnectorProperty(names = {"use_path_style", "s3.path-style-access"}, 
required = false,
+            description = "Whether to use path style URL for the storage.")
+    protected String usePathStyle = "false";
+    @ConnectorProperty(names = {"force_parsing_by_standard_uri"}, required = 
false,
+            description = "Whether to use path style URL for the storage.")
+    @Setter
+    @Getter
+    protected String forceParsingByStandardUrl = "false";
+
+    /**
+     * Constructor to initialize the object storage properties with the 
provided type and original properties map.
+     *
+     * @param type      the type of object storage system.
+     * @param origProps the original properties map.
+     */
+    protected AbstractObjectStorageProperties(Type type, Map<String, String> 
origProps) {
+        super(type, origProps);
+    }
+
+    /**
+     * Generates a map of AWS S3 configuration properties specifically for 
Backend (BE) service usage.
+     * This configuration includes endpoint, region, access credentials, 
timeouts, and connection settings.
+     * The map is typically used to initialize S3-compatible storage access 
for the backend.
+     *
+     * @param maxConnections      the maximum number of allowed S3 connections.
+     * @param requestTimeoutMs    request timeout in milliseconds.
+     * @param connectionTimeoutMs connection timeout in milliseconds.
+     * @param usePathStyle        whether to use path-style access 
(true/false).
+     * @return a map containing AWS S3 configuration properties.
+     */
+    protected Map<String, String> generateBackendS3Configuration(String 
maxConnections,
+                                                                 String 
requestTimeoutMs,
+                                                                 String 
connectionTimeoutMs,
+                                                                 String 
usePathStyle) {
+        return doBuildS3Configuration(maxConnections, requestTimeoutMs, 
connectionTimeoutMs, usePathStyle);
+    }
+
+    /**
+     * Overloaded version of {@link #generateBackendS3Configuration(String, 
String, String, String)}
+     * that uses default values
+     * from the current object context for connection settings.
+     *
+     * @return a map containing AWS S3 configuration properties.
+     */
+    protected Map<String, String> generateBackendS3Configuration() {
+        return doBuildS3Configuration(maxConnections, requestTimeoutS, 
connectionTimeoutS, usePathStyle);
+    }
+
+    /**
+     * Internal method to centralize S3 configuration property assembly.
+     */
+    private Map<String, String> doBuildS3Configuration(String maxConnections,
+                                                       String requestTimeoutMs,
+                                                       String 
connectionTimeoutMs,
+                                                       String usePathStyle) {
+        Map<String, String> s3Props = new HashMap<>();
+        s3Props.put("AWS_ENDPOINT", getEndpoint());
+        s3Props.put("AWS_REGION", getRegion());
+        s3Props.put("AWS_ACCESS_KEY", getAccessKey());
+        s3Props.put("AWS_SECRET_KEY", getSecretKey());
+        s3Props.put("AWS_MAX_CONNECTIONS", maxConnections);
+        s3Props.put("AWS_REQUEST_TIMEOUT_MS", requestTimeoutMs);
+        s3Props.put("AWS_CONNECTION_TIMEOUT_MS", connectionTimeoutMs);
+        s3Props.put("use_path_style", usePathStyle);
+        return s3Props;
+    }
+
+    @Override
+    public Map<String, String> getBackendConfigProperties() {
+        return generateBackendS3Configuration();
+    }
+
+
+    @Override
+    protected void initNormalizeAndCheckProps() throws UserException {
+        super.initNormalizeAndCheckProps();
+        setEndpointIfNotSet();
+        if (!isValidEndpoint(getEndpoint())) {
+            throw new IllegalArgumentException("Invalid endpoint format: " + 
getEndpoint());
+        }
+        checkRequiredProperties();
+        initRegionIfNecessary();
+        if (StringUtils.isBlank(getRegion())) {
+            throw new IllegalArgumentException("region is required");
+        }
+    }
+
+    protected abstract Pattern endpointPattern();
+
+    private boolean isValidEndpoint(String endpoint) {
+        if (endpoint == null || endpoint.isEmpty()) {
+            return false;
+        }
+
+        String host = extractHost(endpoint);
+        if (host == null || host.isEmpty()) {
+            return false;
+        }
+        host = host.replaceFirst("\\.internal$", "");
+        return endpointPattern().matcher(host).matches();
+    }
+
+    private String extractHost(String endpoint) {
+        try {
+            String url = endpoint.matches("^[a-zA-Z][a-zA-Z0-9+.-]*://.*") ? 
endpoint : "http://"; + endpoint;
+            URI uri = new URI(url);
+            return uri.getHost();
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Invalid endpoint format: " + 
endpoint, e);
+        }
+    }
+
+    protected abstract void initRegionIfNecessary();
+
+    private void setEndpointIfNotSet() throws UserException {
+        if (StringUtils.isNotBlank(getEndpoint())) {
+            return;
+        }
+        String endpoint = S3PropertyUtils.constructEndpointFromUrl(origProps, 
usePathStyle, forceParsingByStandardUrl);
+        if (StringUtils.isBlank(endpoint)) {
+            throw new IllegalArgumentException("endpoint is required");
+        }
+        setEndpoint(endpoint);
+    }
+
+    @Override
+    public String validateAndNormalizeUri(String uri) throws UserException {
+        return S3PropertyUtils.validateAndNormalizeUri(uri, getUsePathStyle(), 
getForceParsingByStandardUrl());
+
+    }
+
+    @Override
+    public String validateAndGetUri(Map<String, String> loadProps) throws 
UserException {
+        return S3PropertyUtils.validateAndGetUri(loadProps);
+    }
+
+    @Override
+    public String getStorageName() {
+        return "S3";
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
new file mode 100644
index 00000000000..7202f49cd96
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.datasource.property.ConnectorProperty;
+
+import com.google.common.base.Strings;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+public class COSProperties extends AbstractObjectStorageProperties {
+
+    @Setter
+    @Getter
+    @ConnectorProperty(names = {"cos.endpoint", "s3.endpoint", "AWS_ENDPOINT", 
"endpoint", "ENDPOINT"},
+            required = false,
+            description = "The endpoint of COS.")
+    protected String endpoint = "";
+
+    @Getter
+    @ConnectorProperty(names = {"cos.region", "s3.region", "AWS_REGION", 
"region", "REGION"},
+            required = false,
+            description = "The region of COS.")
+    protected String region = "";
+
+    @Getter
+    @ConnectorProperty(names = {"cos.access_key", "AWS_ACCESS_KEY", 
"ACCESS_KEY", "access_key"},
+            description = "The access key of COS.")
+    protected String accessKey = "";
+
+    @Getter
+    @ConnectorProperty(names = {"cos.secret_key", "s3.secret_key", 
"AWS_SECRET_KEY", "secret_key", "SECRET_KEY"},
+            description = "The secret key of COS.")
+    protected String secretKey = "";
+
+    private static final Pattern COS_ENDPOINT_PATTERN = Pattern
+            .compile("^cos\\.[a-z0-9-]+\\.myqcloud\\.com(\\.internal)?$");
+
+    protected COSProperties(Map<String, String> origProps) {
+        super(Type.COS, origProps);
+    }
+
+    protected static boolean guessIsMe(Map<String, String> origProps) {
+        String value = Stream.of("cos.endpoint", "s3.endpoint", 
"AWS_ENDPOINT", "endpoint", "ENDPOINT")
+                .map(origProps::get)
+                .filter(Objects::nonNull)
+                .findFirst()
+                .orElse(null);
+        if (!Strings.isNullOrEmpty(value)) {
+            return value.contains("myqcloud.com");
+        }
+        if (!origProps.containsKey("uri")) {
+            return false;
+        }
+        return origProps.get("uri").contains("myqcloud.com");
+    }
+
+    @Override
+    protected Pattern endpointPattern() {
+        return COS_ENDPOINT_PATTERN;
+    }
+
+    /**
+     * Initializes the cosRegion field based on the COS endpoint if it's not 
already set.
+     * <p>
+     * This method extracts the region from Tencent Cloud COS endpoints.
+     * It supports typical COS endpoint formats like:
+     * <p>
+     * Example:
+     * - "cos.ap-guangzhou.myqcloud.com" → cosRegion = "ap-guangzhou"
+     */
+    @Override
+    protected void initRegionIfNecessary() {
+        if (Strings.isNullOrEmpty(this.region)) {
+            Pattern cosPattern = 
Pattern.compile("cos\\.([a-z0-9-]+)\\.myqcloud\\.com");
+            Matcher matcher = cosPattern.matcher(endpoint);
+            if (matcher.find()) {
+                this.region = matcher.group(1);
+            }
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsProperties.java
new file mode 100644
index 00000000000..a3bbea4901c
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsProperties.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.ConnectorProperty;
+
+import com.google.common.base.Strings;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HdfsProperties extends StorageProperties {
+
+    @ConnectorProperty(names = {"hdfs.authentication.type", 
"hadoop.security.authentication"},
+            required = false,
+            description = "The authentication type of HDFS. The default value 
is 'none'.")
+    private String hdfsAuthenticationType = "simple";
+
+    @ConnectorProperty(names = {"hdfs.authentication.kerberos.principal", 
"hadoop.kerberos.principal"},
+            required = false,
+            description = "The principal of the kerberos authentication.")
+    private String hdfsKerberosPrincipal = "";
+
+    @ConnectorProperty(names = {"hdfs.authentication.kerberos.keytab", 
"hadoop.kerberos.keytab"},
+            required = false,
+            description = "The keytab of the kerberos authentication.")
+    private String hdfsKerberosKeytab = "";
+
+    @ConnectorProperty(names = {"hadoop.username"},
+            required = false,
+            description = "The username of Hadoop. Doris will user this user 
to access HDFS")
+    private String hadoopUsername = "";
+
+    @ConnectorProperty(names = {"hadoop.config.resources"},
+            required = false,
+            description = "The xml files of Hadoop configuration.")
+    private String hadoopConfigResources = "";
+
+    @ConnectorProperty(names = {"hdfs.impersonation.enabled"},
+            required = false,
+            supported = false,
+            description = "Whether to enable the impersonation of HDFS.")
+    private boolean hdfsImpersonationEnabled = false;
+
+    @ConnectorProperty(names = {"fs.defaultFS"}, required = false, description 
= "")
+    private String fsDefaultFS = "";
+
+    private Configuration configuration;
+
+    private Map<String, String> backendConfigProperties;
+
+    /**
+     * The final HDFS configuration map that determines the effective settings.
+     * Priority rules:
+     * 1. If a key exists in `overrideConfig` (user-provided settings), its 
value takes precedence.
+     * 2. If a key is not present in `overrideConfig`, the value from 
`hdfs-site.xml` or `core-site.xml` is used.
+     * 3. This map should be used to read the resolved HDFS configuration, 
ensuring the correct precedence is applied.
+     */
+    private Map<String, String> userOverriddenHdfsConfig;
+
+    public HdfsProperties(Map<String, String> origProps) {
+        super(Type.HDFS, origProps);
+    }
+
+    public static boolean guessIsMe(Map<String, String> props) {
+        if (MapUtils.isEmpty(props)) {
+            return false;
+        }
+        if (props.containsKey("hadoop.config.resources") || 
props.containsKey("hadoop.security.authentication")
+                || props.containsKey("dfs.nameservices") || 
props.containsKey("fs.defaultFS")) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    protected void initNormalizeAndCheckProps() throws UserException {
+        super.initNormalizeAndCheckProps();
+        extractUserOverriddenHdfsConfig(origProps);
+        initHadoopConfiguration();
+        initBackendConfigProperties();
+    }
+
+    private void extractUserOverriddenHdfsConfig(Map<String, String> 
origProps) {
+        if (MapUtils.isEmpty(origProps)) {
+            return;
+        }
+        userOverriddenHdfsConfig = new HashMap<>();
+        origProps.forEach((key, value) -> {
+            if (key.startsWith("hadoop.") || key.startsWith("dfs.") || 
key.equals("fs.defaultFS")) {
+                userOverriddenHdfsConfig.put(key, value);
+            }
+        });
+
+    }
+
+    @Override
+    protected String getResourceConfigPropName() {
+        return "hadoop.config.resources";
+    }
+
+    protected void checkRequiredProperties() {
+        super.checkRequiredProperties();
+        if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType) && 
(Strings.isNullOrEmpty(hdfsKerberosPrincipal)
+                || Strings.isNullOrEmpty(hdfsKerberosKeytab))) {
+            throw new IllegalArgumentException("HDFS authentication type is 
kerberos, "
+                    + "but principal or keytab is not set.");
+        }
+
+        if (StringUtils.isBlank(fsDefaultFS)) {
+            this.fsDefaultFS = 
HdfsPropertiesUtils.constructDefaultFsFromUri(origProps);
+        }
+    }
+
+    private void initHadoopConfiguration() {
+        Configuration conf = new Configuration(true);
+        Map<String, String> allProps = 
loadConfigFromFile(getResourceConfigPropName());
+        allProps.forEach(conf::set);
+        if (MapUtils.isNotEmpty(userOverriddenHdfsConfig)) {
+            userOverriddenHdfsConfig.forEach(conf::set);
+        }
+        if (StringUtils.isNotBlank(fsDefaultFS)) {
+            conf.set("fs.defaultFS", fsDefaultFS);
+        }
+        conf.set("hdfs.security.authentication", hdfsAuthenticationType);
+        if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) {
+            conf.set("hadoop.kerberos.principal", hdfsKerberosPrincipal);
+            conf.set("hadoop.kerberos.keytab", hdfsKerberosKeytab);
+        }
+        if (StringUtils.isNotBlank(hadoopUsername)) {
+            conf.set("hadoop.username", hadoopUsername);
+        }
+        this.configuration = conf;
+    }
+
+    private void initBackendConfigProperties() {
+        Map<String, String> backendConfigProperties = new HashMap<>();
+        for (Map.Entry<String, String> entry : configuration) {
+            backendConfigProperties.put(entry.getKey(), entry.getValue());
+        }
+
+        this.backendConfigProperties = backendConfigProperties;
+    }
+
+    public Configuration getHadoopConfiguration() {
+        return this.configuration;
+    }
+
+    //fixme be should send use input params
+    @Override
+    public Map<String, String> getBackendConfigProperties() {
+        return backendConfigProperties;
+    }
+
+    @Override
+    public String validateAndNormalizeUri(String url) throws UserException {
+        return HdfsPropertiesUtils.convertUrlToFilePath(url);
+    }
+
+    @Override
+    public String validateAndGetUri(Map<String, String> loadProps) throws 
UserException {
+        return HdfsPropertiesUtils.validateAndGetUri(loadProps);
+    }
+
+    @Override
+    public String getStorageName() {
+        return "HDFS";
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
new file mode 100644
index 00000000000..5e97a2fd639
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.URI;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class HdfsPropertiesUtils {
+    private static final String URI_KEY = "uri";
+
+    private static Set<String> supportSchema = new HashSet<>();
+
+    static {
+        supportSchema.add("hdfs");
+        supportSchema.add("viewfs");
+    }
+
+    /**
+     * Validates that the 'uri' property exists in the provided props map, and 
normalizes it.
+     *
+     * @param props the map of properties that must include a 'uri' entry
+     * @return a normalized URI string like 'hdfs://host/path'
+     * @throws UserException if the map is empty or does not contain the 
required 'uri' key
+     *                       <p>
+     *                       Example:
+     *                       Input: {"uri": "hdfs://namenode:9000/data/input"}
+     *                       Output: "hdfs://namenode:9000/data/input"
+     */
+    public static String validateAndGetUri(Map<String, String> props) throws 
UserException {
+        if (props.isEmpty()) {
+            throw new UserException("props is empty");
+        }
+        if (!props.containsKey(URI_KEY)) {
+            throw new UserException("props must contain uri");
+        }
+        String uriStr = props.get(URI_KEY);
+        return validateAndNormalizeUri(uriStr);
+    }
+
+    /**
+     * Validates and normalizes a raw URI string.
+     *
+     * @param uriStr the URI string to validate
+     * @return a normalized URI in the form of 'scheme://authority/path'
+     * @throws UserException if the URI is invalid or unsupported
+     *                       <p>
+     *                       Example:
+     *                       Input: "viewfs://ns1/path/to/file"
+     *                       Output: "viewfs://ns1/path/to/file"
+     */
+    public static String convertUrlToFilePath(String uriStr) throws 
UserException {
+        return validateAndNormalizeUri(uriStr);
+    }
+
+    /**
+     * Constructs the default filesystem URI (scheme + authority) from a full 
URI string in the props map.
+     *
+     * @param props the map of properties, expected to contain a valid 'uri' 
entry
+     * @return a URI prefix like 'hdfs://host:port', or null if the URI is 
missing or invalid
+     * <p>
+     * Example:
+     * Input: {"uri": "hdfs://namenode:8020/data"}
+     * Output: "hdfs://namenode:8020"
+     */
+    public static String constructDefaultFsFromUri(Map<String, String> props) {
+        if (props.isEmpty()) {
+            return null;
+        }
+        if (!props.containsKey(URI_KEY)) {
+            return null;
+        }
+        String uriStr = props.get(URI_KEY);
+        if (StringUtils.isBlank(uriStr)) {
+            return null;
+        }
+        URI uri = null;
+        try {
+            uri = URI.create(uriStr);
+        } catch (AnalysisException e) {
+            return null;
+        }
+        String schema = uri.getScheme();
+        if (StringUtils.isBlank(schema)) {
+            throw new IllegalArgumentException("Invalid uri: " + uriStr + 
"extract schema is null");
+        }
+        if (!supportSchema.contains(schema.toLowerCase())) {
+            throw new IllegalArgumentException("Invalid export path:"
+                    + schema + " , please use valid 'hdfs://' or 'viewfs://' 
path.");
+        }
+        return uri.getScheme() + "://" + uri.getAuthority();
+    }
+
+    /**
+     * Internal method that validates and normalizes a URI string.
+     * Ensures it has a valid scheme and is supported (e.g., hdfs, viewfs).
+     *
+     * @param uriStr the URI string to validate
+     * @return the normalized URI string
+     * @throws AnalysisException if the URI is blank or has an unsupported 
scheme
+     *                           <p>
+     *                           Example:
+     *                           Input: "hdfs://host:8020/user/data"
+     *                           Output: "hdfs://host:8020/user/data"
+     */
+    private static String validateAndNormalizeUri(String uriStr) throws 
AnalysisException {
+        if (StringUtils.isBlank(uriStr)) {
+            throw new IllegalArgumentException("uri is null, pls check your 
params");
+        }
+        URI uri = URI.create(uriStr);
+        String schema = uri.getScheme();
+        if (StringUtils.isBlank(schema)) {
+            throw new IllegalArgumentException("Invalid uri: " + uriStr + 
"extract schema is null");
+        }
+        if (!supportSchema.contains(schema.toLowerCase())) {
+            throw new IllegalArgumentException("Invalid export path:"
+                    + schema + " , please use valid 'hdfs://' or 'viewfs://' 
path.");
+        }
+        return uri.getScheme() + "://" + uri.getAuthority() + uri.getPath();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
new file mode 100644
index 00000000000..474acb0f5f2
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.datasource.property.ConnectorProperty;
+
+import com.google.common.base.Strings;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+public class OBSProperties extends AbstractObjectStorageProperties {
+
+    @Setter
+    @Getter
+    @ConnectorProperty(names = {"obs.endpoint", "s3.endpoint", "AWS_ENDPOINT", 
"endpoint", "ENDPOINT"},
+            required = false,
+            description = "The endpoint of OBS.")
+    protected String endpoint = "";
+
+    @Getter
+    @ConnectorProperty(names = {"obs.access_key", "AWS_ACCESS_KEY", 
"ACCESS_KEY", "access_key"},
+            description = "The access key of OBS.")
+    protected String accessKey = "";
+
+    @Getter
+    @ConnectorProperty(names = {"obs.secret_key", "secret_key", 
"s3.secret_key"},
+            description = "The secret key of OBS.")
+    protected String secretKey = "";
+
+    @Getter
+    @ConnectorProperty(names = {"obs.region", "s3.region", "AWS_REGION", 
"region", "REGION"}, required = false,
+            description = "The region of OBS.")
+    protected String region;
+
+    private static Pattern ENDPOINT_PATTERN = Pattern
+            .compile("^obs\\.[a-z0-9-]+\\.myhuaweicloud\\.com(\\.internal)?$");
+
+    public OBSProperties(Map<String, String> origProps) {
+        super(Type.OBS, origProps);
+        // Initialize fields from origProps
+    }
+
+    protected static boolean guessIsMe(Map<String, String> origProps) {
+        String value = Stream.of("obs.endpoint", "s3.endpoint", 
"AWS_ENDPOINT", "endpoint", "ENDPOINT", "uri")
+                .map(origProps::get)
+                .filter(Objects::nonNull)
+                .findFirst()
+                .orElse(null);
+
+        if (!Strings.isNullOrEmpty(value)) {
+            return value.contains("myhuaweicloud.com");
+        }
+        if (!origProps.containsKey("uri")) {
+            return false;
+        }
+        // Check if the uri property contains "myhuaweicloud.com"
+        return origProps.get("uri").contains("myhuaweicloud.com");
+    }
+
+    @Override
+    protected Pattern endpointPattern() {
+        return ENDPOINT_PATTERN;
+    }
+
+    /**
+     * Initializes the region field based on the OBS endpoint if it's not 
already set.
+     * <p>
+     * This method extracts the region from Huawei Cloud OBS endpoints.
+     * It supports typical OBS endpoint formats like:
+     * <p>
+     * Example:
+     * - "obs.cn-north-4.myhuaweicloud.com" → region = "cn-north-4"
+     */
+    @Override
+    protected void initRegionIfNecessary() {
+        if (Strings.isNullOrEmpty(this.region)) {
+            Pattern obsPattern = 
Pattern.compile("obs\\.([a-z0-9-]+)\\.myhuaweicloud\\.com");
+            Matcher matcher = obsPattern.matcher(endpoint);
+            if (matcher.find()) {
+                this.region = matcher.group(1);
+            }
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
new file mode 100644
index 00000000000..86fd1e7796a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.ConnectorProperty;
+
+import com.google.common.base.Strings;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+public class OSSProperties extends AbstractObjectStorageProperties {
+
+    @Setter
+    @Getter
+    @ConnectorProperty(names = {"oss.endpoint", "s3.endpoint", "AWS_ENDPOINT", 
"endpoint", "ENDPOINT"},
+            required = false,
+            description = "The endpoint of OSS.")
+    protected String endpoint = "";
+
+    @Getter
+    @ConnectorProperty(names = {"oss.access_key", "s3.access_key", 
"AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"},
+            description = "The access key of OSS.")
+    protected String accessKey = "";
+
+    @Getter
+    @ConnectorProperty(names = {"oss.secret_key", "s3.secret_key", 
"AWS_SECRET_KEY", "secret_key", "SECRET_KEY"},
+            description = "The secret key of OSS.")
+    protected String secretKey = "";
+
+    @Getter
+    @ConnectorProperty(names = {"oss.region", "s3.region", "AWS_REGION", 
"region", "REGION"}, required = false,
+            description = "The region of OSS.")
+    protected String region;
+
+    private static Pattern ENDPOINT_PATTERN = 
Pattern.compile("^oss-[a-z0-9-]+\\.aliyuncs\\.com(\\.internal)?$");
+
+    protected OSSProperties(Map<String, String> origProps) {
+        super(Type.OSS, origProps);
+    }
+
+    protected static boolean guessIsMe(Map<String, String> origProps) {
+        String value = Stream.of("oss.endpoint", "s3.endpoint", 
"AWS_ENDPOINT", "endpoint", "ENDPOINT")
+                .map(origProps::get)
+                .filter(Objects::nonNull)
+                .findFirst()
+                .orElse(null);
+        if (!Strings.isNullOrEmpty(value)) {
+            return value.contains("aliyuncs.com");
+        }
+        if (!origProps.containsKey("uri")) {
+            return false;
+        }
+        return origProps.get("uri").contains("aliyuncs.com");
+    }
+
+    @Override
+    protected void initNormalizeAndCheckProps() throws UserException {
+        super.initNormalizeAndCheckProps();
+        initRegionIfNecessary();
+    }
+
+    @Override
+    protected Pattern endpointPattern() {
+        return ENDPOINT_PATTERN;
+    }
+
+    /**
+     * Initializes the region field based on the endpoint if it's not already 
set.
+     * <p>
+     * This method attempts to extract the region name from the OSS endpoint 
string.
+     * It supports both external and internal Alibaba Cloud OSS endpoint 
formats.
+     * <p>
+     * Examples:
+     * - External endpoint: "oss-cn-hangzhou.aliyuncs.com" → region = 
"cn-hangzhou"
+     * - Internal endpoint: "oss-cn-shanghai.intranet.aliyuncs.com" → region = 
"cn-shanghai"
+     */
+    public void initRegionIfNecessary() {
+        // Return the region if it is already set
+        if (!Strings.isNullOrEmpty(this.region)) {
+            return;
+        }
+        // Check for external endpoint and extract region
+        if (endpoint.contains("aliyuncs.com")) {
+            // Regex pattern for external endpoint (e.g., 
oss-<region>.aliyuncs.com)
+            Pattern ossPattern = 
Pattern.compile("oss-([a-z0-9-]+)\\.aliyuncs\\.com");
+            Matcher matcher = ossPattern.matcher(endpoint);
+            if (matcher.find()) {
+                this.region = matcher.group(1);
+                return;
+            }
+        }
+        // Check for internal endpoint and extract region
+        if (endpoint.contains("intranet.aliyuncs.com")) {
+            // Regex pattern for internal endpoint (e.g., 
oss-<region>.intranet.aliyuncs.com)
+            Pattern ossIntranetPattern = 
Pattern.compile("oss-([a-z0-9-]+)\\.intranet\\.aliyuncs\\.com");
+            Matcher matcher = ossIntranetPattern.matcher(endpoint);
+            if (matcher.find()) {
+                this.region = matcher.group(1);
+            }
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java
new file mode 100644
index 00000000000..1ba241a9444
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+/**
+ * Interface representing the properties and configurations for object storage 
systems.
+ * This interface provides methods for converting the storage properties to 
specific
+ * configurations for different protocols, such as Hadoop HDFS and AWS S3.
+ */
+public interface ObjectStorageProperties {
+
+    String getEndpoint();
+
+    String getRegion();
+
+    String getAccessKey();
+
+    String getSecretKey();
+
+    void setEndpoint(String endpoint);
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
new file mode 100644
index 00000000000..e630613904a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.datasource.property.ConnectorProperty;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+public class S3Properties extends AbstractObjectStorageProperties {
+
+
+    @ConnectorProperty(names = {"s3.endpoint", "AWS_ENDPOINT", "endpoint", 
"ENDPOINT"},
+            required = false,
+            description = "The endpoint of S3.")
+    protected String s3Endpoint = "";
+
+    @ConnectorProperty(names = {"s3.region", "AWS_REGION", "region", "REGION"},
+            required = false,
+            description = "The region of S3.")
+    protected String s3Region = "";
+
+    @ConnectorProperty(names = {"s3.access_key", "AWS_ACCESS_KEY", 
"ACCESS_KEY", "access_key"},
+            description = "The access key of S3.")
+    protected String s3AccessKey = "";
+
+    @ConnectorProperty(names = {"s3.secret_key", "AWS_SECRET_KEY", 
"secret_key", "SECRET_KEY"},
+            description = "The secret key of S3.")
+    protected String s3SecretKey = "";
+
+
+    @ConnectorProperty(names = {"s3.connection.maximum",
+            "AWS_MAX_CONNECTIONS"},
+            required = false,
+            description = "The maximum number of connections to S3.")
+    protected String s3ConnectionMaximum = "50";
+
+    @ConnectorProperty(names = {"s3.connection.request.timeout",
+            "AWS_REQUEST_TIMEOUT_MS"},
+            required = false,
+            description = "The request timeout of S3 in milliseconds,")
+    protected String s3ConnectionRequestTimeoutS = "3000";
+
+    @ConnectorProperty(names = {"s3.connection.timeout",
+            "AWS_CONNECTION_TIMEOUT_MS"},
+            required = false,
+            description = "The connection timeout of S3 in milliseconds,")
+    protected String s3ConnectionTimeoutS = "1000";
+
+    @ConnectorProperty(names = {"s3.sts_endpoint"},
+            supported = false,
+            required = false,
+            description = "The sts endpoint of S3.")
+    protected String s3StsEndpoint = "";
+
+    @ConnectorProperty(names = {"s3.sts_region"},
+            supported = false,
+            required = false,
+            description = "The sts region of S3.")
+    protected String s3StsRegion = "";
+
+    @ConnectorProperty(names = {"s3.iam_role"},
+            supported = false,
+            required = false,
+            description = "The iam role of S3.")
+    protected String s3IAMRole = "";
+
+    @ConnectorProperty(names = {"s3.external_id"},
+            supported = false,
+            required = false,
+            description = "The external id of S3.")
+    protected String s3ExternalId = "";
+
+    private static final Pattern REGION_PATTERN = Pattern.compile(
+            "s3[.-](?:dualstack[.-])?([a-z0-9-]+)\\.amazonaws\\.com(?:\\.cn)?"
+    );
+
+
+    private static Pattern ENDPOINT_PATTERN = 
Pattern.compile("^s3(\\.[a-z0-9-]+)?\\.amazonaws\\.com$");
+
+    public S3Properties(Map<String, String> origProps) {
+        super(Type.S3, origProps);
+    }
+
+
+    /**
+     * Guess if the storage properties is for this storage type.
+     * Subclass should override this method to provide the correct 
implementation.
+     *
+     * @return
+     */
+    protected static boolean guessIsMe(Map<String, String> origProps) {
+        String endpoint = Stream.of("s3.endpoint", "AWS_ENDPOINT", "endpoint", 
"ENDPOINT")
+                .map(origProps::get)
+                .filter(Objects::nonNull)
+                .findFirst()
+                .orElse(null);
+        if (!Strings.isNullOrEmpty(endpoint)) {
+            return endpoint.contains("amazonaws.com");
+        }
+        if (!origProps.containsKey("uri")) {
+            return false;
+        }
+        String uri = origProps.get("uri");
+        return uri.contains("amazonaws.com");
+
+
+    }
+
+    @Override
+    protected Pattern endpointPattern() {
+        return ENDPOINT_PATTERN;
+    }
+
+    private static List<Field> getIdentifyFields() {
+        List<Field> fields = Lists.newArrayList();
+        try {
+            //todo AliyunDlfProperties should in OSS storage type.
+            fields.add(S3Properties.class.getDeclaredField("s3AccessKey"));
+            // fixme Add it when MS done
+            
//fields.add(AliyunDLFProperties.class.getDeclaredField("dlfAccessKey"));
+            
//fields.add(AWSGlueProperties.class.getDeclaredField("glueAccessKey"));
+            return fields;
+        } catch (NoSuchFieldException e) {
+            // should not happen
+            throw new RuntimeException("Failed to get field: " + 
e.getMessage(), e);
+        }
+    }
+
+    /*
+    public void toPaimonOSSFileIOProperties(Options options) {
+        options.set("fs.oss.endpoint", s3Endpoint);
+        options.set("fs.oss.accessKeyId", s3AccessKey);
+        options.set("fs.oss.accessKeySecret", s3SecretKey);
+    }
+
+    public void toPaimonS3FileIOProperties(Options options) {
+        options.set("s3.endpoint", s3Endpoint);
+        options.set("s3.access-key", s3AccessKey);
+        options.set("s3.secret-key", s3SecretKey);
+    }*/
+
+    public void toIcebergS3FileIOProperties(Map<String, String> catalogProps) {
+        // See S3FileIOProperties.java
+        catalogProps.put("s3.endpoint", s3Endpoint);
+        catalogProps.put("s3.access-key-id", s3AccessKey);
+        catalogProps.put("s3.secret-access-key", s3SecretKey);
+        catalogProps.put("client.region", s3Region);
+        catalogProps.put("s3.path-style-access", usePathStyle);
+    }
+
+    @Override
+    public Map<String, String> getBackendConfigProperties() {
+        return generateBackendS3Configuration(s3ConnectionMaximum,
+                s3ConnectionRequestTimeoutS, s3ConnectionTimeoutS, 
String.valueOf(usePathStyle));
+    }
+
+    /**
+     * Initializes the s3Region field based on the S3 endpoint if it's not 
already set.
+     * <p>
+     * This method extracts the region from Amazon S3-compatible endpoints 
using a predefined regex pattern.
+     * The endpoint is first converted to lowercase before matching.
+     * <p>
+     * Example:
+     * - "s3.us-west-2.amazonaws.com" → s3Region = "us-west-2"
+     * - "s3.cn-north-1.amazonaws.com.cn" → s3Region = "cn-north-1"
+     * <p>
+     * Note: REGION_PATTERN must be defined to capture the region from the S3 
endpoint.
+     * Example pattern:
+     * Pattern.compile("s3[.-]([a-z0-9-]+)\\.")
+     */
+    @Override
+    protected void initRegionIfNecessary() {
+        if (StringUtils.isBlank(s3Region) && 
StringUtils.isNotBlank(s3Endpoint)) {
+            Matcher matcher = REGION_PATTERN.matcher(s3Endpoint.toLowerCase());
+            if (matcher.find()) {
+                this.s3Region = matcher.group(1);
+            }
+        }
+    }
+
+    @Override
+    public String getEndpoint() {
+        return s3Endpoint;
+    }
+
+    @Override
+    public String getRegion() {
+        return s3Region;
+    }
+
+    @Override
+    public String getAccessKey() {
+        return s3AccessKey;
+    }
+
+    @Override
+    public String getSecretKey() {
+        return s3SecretKey;
+    }
+
+    @Override
+    public void setEndpoint(String endpoint) {
+        this.s3Endpoint = endpoint;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
new file mode 100644
index 00000000000..745838438dd
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.S3URI;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+
+public class S3PropertyUtils {
+
+
+    private static final String URI_KEY = "uri";
+
+    /**
+     * Constructs the S3 endpoint from a given URI in the props map.
+     *
+     * @param props                           the map containing the S3 URI, 
keyed by URI_KEY
+     * @param stringUsePathStyle              whether to use path-style access 
("true"/"false")
+     * @param stringForceParsingByStandardUri whether to force parsing using 
the standard URI format ("true"/"false")
+     * @return the extracted S3 endpoint or null if URI is invalid or parsing 
fails
+     * <p>
+     * Example:
+     * Input URI: "https://s3.us-west-1.amazonaws.com/my-bucket/my-key";
+     * Output: "s3.us-west-1.amazonaws.com"
+     */
+    public static String constructEndpointFromUrl(Map<String, String> props,
+                                                  String stringUsePathStyle,
+                                                  String 
stringForceParsingByStandardUri) throws UserException {
+        String uri = props.get(URI_KEY);
+        if (uri == null || uri.isEmpty()) {
+            return null;
+        }
+        boolean usePathStyle = Boolean.parseBoolean(stringUsePathStyle);
+        boolean forceParsingByStandardUri = 
Boolean.parseBoolean(stringForceParsingByStandardUri);
+        S3URI s3uri = S3URI.create(uri, usePathStyle, 
forceParsingByStandardUri);
+        return s3uri.getEndpoint().orElse(null);
+    }
+
+    /**
+     * Extracts the S3 region from a URI in the given props map.
+     *
+     * @param props                           the map containing the S3 URI, 
keyed by URI_KEY
+     * @param stringUsePathStyle              whether to use path-style access 
("true"/"false")
+     * @param stringForceParsingByStandardUri whether to force parsing using 
the standard URI format ("true"/"false")
+     * @return the extracted S3 region or null if URI is invalid or parsing 
fails
+     * <p>
+     * Example:
+     * Input URI: "https://s3.us-west-1.amazonaws.com/my-bucket/my-key";
+     * Output: "us-west-1"
+     */
+    public static String constructRegionFromUrl(Map<String, String> props,
+                                                String stringUsePathStyle,
+                                                String 
stringForceParsingByStandardUri) throws UserException {
+        String uri = props.get(URI_KEY);
+        if (uri == null || uri.isEmpty()) {
+            return null;
+        }
+        boolean usePathStyle = Boolean.parseBoolean(stringUsePathStyle);
+        boolean forceParsingByStandardUri = 
Boolean.parseBoolean(stringForceParsingByStandardUri);
+        S3URI s3uri = S3URI.create(uri, usePathStyle, 
forceParsingByStandardUri);
+        return s3uri.getRegion().orElse(null);
+
+    }
+
+    /**
+     * Validates and normalizes the given path into a standard S3 URI.
+     * If the input already starts with "s3://", it is returned as-is.
+     * Otherwise, it is parsed and converted into an S3-compatible URI format.
+     *
+     * @param path                            the raw S3-style path or full URI
+     * @param stringUsePathStyle              whether to use path-style access 
("true"/"false")
+     * @param stringForceParsingByStandardUri whether to force parsing using 
the standard URI format ("true"/"false")
+     * @return normalized S3 URI string like "s3://bucket/key"
+     * @throws UserException if the input path is blank or invalid
+     *                       <p>
+     *                       Example:
+     *                       Input: 
"https://s3.us-west-1.amazonaws.com/my-bucket/my-key";
+     *                       Output: "s3://my-bucket/my-key"
+     */
+    public static String validateAndNormalizeUri(String path,
+                                                 String stringUsePathStyle,
+                                                 String 
stringForceParsingByStandardUri) throws UserException {
+        if (StringUtils.isBlank(path)) {
+            throw new UserException("path is null");
+        }
+        if (path.startsWith("s3://")) {
+            return path;
+        }
+
+        boolean usePathStyle = Boolean.parseBoolean(stringUsePathStyle);
+        boolean forceParsingByStandardUri = 
Boolean.parseBoolean(stringForceParsingByStandardUri);
+        S3URI s3uri = S3URI.create(path, usePathStyle, 
forceParsingByStandardUri);
+        return "s3" + S3URI.SCHEME_DELIM + s3uri.getBucket() + 
S3URI.PATH_DELIM + s3uri.getKey();
+    }
+
+    /**
+     * Extracts and returns the raw URI string from the given props map.
+     *
+     * @param props the map expected to contain a 'uri' entry
+     * @return the URI string from props
+     * @throws UserException if the map is empty or does not contain 'uri'
+     *                       <p>
+     *                       Example:
+     *                       Input: {"uri": "s3://my-bucket/my-key"}
+     *                       Output: "s3://my-bucket/my-key"
+     */
+    public static String validateAndGetUri(Map<String, String> props) throws 
UserException {
+        if (props.isEmpty()) {
+            throw new UserException("props is empty");
+        }
+        if (!props.containsKey(URI_KEY)) {
+            throw new UserException("props must contain uri");
+        }
+        return props.get(URI_KEY);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
new file mode 100644
index 00000000000..95516c6067a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.ConnectionProperties;
+import org.apache.doris.datasource.property.ConnectorProperty;
+
+import lombok.Getter;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public abstract class StorageProperties extends ConnectionProperties {
+
+    public static final String FS_HDFS_SUPPORT = "fs.hdfs.support";
+    public static final String FS_S3_SUPPORT = "fs.s3.support";
+    public static final String FS_GCS_SUPPORT = "fs.gcs.support";
+    public static final String FS_AZURE_SUPPORT = "fs.azure.support";
+    public static final String FS_OSS_SUPPORT = "fs.oss.support";
+    public static final String FS_OBS_SUPPORT = "fs.obs.support";
+    public static final String FS_COS_SUPPORT = "fs.cos.support";
+
+    public enum Type {
+        HDFS,
+        S3,
+        OSS,
+        OBS,
+        COS,
+        UNKNOWN
+    }
+
+    public abstract Map<String, String> getBackendConfigProperties();
+
+    @Getter
+    protected Type type;
+
+
+    /**
+     * Creates a list of StorageProperties instances based on the provided 
properties.
+     * <p>
+     * This method iterates through the list of supported storage types and 
creates an instance
+     * for each supported type. If no supported type is found, an 
HDFSProperties instance is added
+     * by default.
+     *
+     * @param origProps the original properties map to create the 
StorageProperties instances
+     * @return a list of StorageProperties instances for all supported storage 
types
+     */
+    public static List<StorageProperties> createAll(Map<String, String> 
origProps) throws UserException {
+        List<StorageProperties> result = new ArrayList<>();
+        for (Function<Map<String, String>, StorageProperties> func : 
PROVIDERS) {
+            StorageProperties p = func.apply(origProps);
+            if (p != null) {
+                result.add(p);
+            }
+        }
+        if (result.stream().noneMatch(HdfsProperties.class::isInstance)) {
+            result.add(new HdfsProperties(origProps));
+        }
+
+        for (StorageProperties storageProperties : result) {
+            storageProperties.initNormalizeAndCheckProps();
+        }
+        return result;
+    }
+
+    /**
+     * Creates a primary StorageProperties instance based on the provided 
properties.
+     * <p>
+     * This method iterates through the list of supported storage types and 
returns the first
+     * matching StorageProperties instance. If no supported type is found, an 
exception is thrown.
+     *
+     * @param origProps the original properties map to create the 
StorageProperties instance
+     * @return a StorageProperties instance for the primary storage type
+     * @throws RuntimeException if no supported storage type is found
+     */
+    public static StorageProperties createPrimary(Map<String, String> 
origProps) throws UserException {
+        for (Function<Map<String, String>, StorageProperties> func : 
PROVIDERS) {
+            StorageProperties p = func.apply(origProps);
+            if (p != null) {
+                p.initNormalizeAndCheckProps();
+                return p;
+            }
+        }
+        throw new RuntimeException("No supported storage type found.");
+    }
+
+    private static final List<Function<Map<String, String>, 
StorageProperties>> PROVIDERS =
+            Arrays.asList(
+                    props -> (isFsSupport(props, FS_HDFS_SUPPORT)
+                            || HdfsProperties.guessIsMe(props)) ? new 
HdfsProperties(props) : null,
+                    props -> (isFsSupport(props, FS_S3_SUPPORT)
+                            || S3Properties.guessIsMe(props)) ? new 
S3Properties(props) : null,
+                    props -> (isFsSupport(props, FS_OSS_SUPPORT)
+                            || OSSProperties.guessIsMe(props)) ? new 
OSSProperties(props) : null,
+                    props -> (isFsSupport(props, FS_OBS_SUPPORT)
+                            || OBSProperties.guessIsMe(props)) ? new 
OBSProperties(props) : null,
+                    props -> (isFsSupport(props, FS_COS_SUPPORT)
+                            || COSProperties.guessIsMe(props)) ? new 
COSProperties(props) : null
+            );
+
+    protected StorageProperties(Type type, Map<String, String> origProps) {
+        super(origProps);
+        this.type = type;
+    }
+
+    private static boolean isFsSupport(Map<String, String> origProps, String 
fsEnable) {
+        return origProps.getOrDefault(fsEnable, 
"false").equalsIgnoreCase("true");
+    }
+
+    protected static boolean checkIdentifierKey(Map<String, String> origProps, 
List<Field> fields) {
+        for (Field field : fields) {
+            field.setAccessible(true);
+            ConnectorProperty annotation = 
field.getAnnotation(ConnectorProperty.class);
+            for (String key : annotation.names()) {
+                if (origProps.containsKey(key)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Validates the given URL string and returns a normalized URI in the 
format: scheme://authority/path.
+     * <p>
+     * This method checks that the input is non-empty, the scheme is present 
and supported (e.g., hdfs, viewfs),
+     * and converts it into a canonical URI string.
+     *
+     * @param url the raw URL string to validate and normalize
+     * @return a normalized URI string with validated scheme and authority
+     * @throws UserException if the URL is empty, lacks a valid scheme, or 
contains an unsupported scheme
+     */
+    public abstract String validateAndNormalizeUri(String url) throws 
UserException;
+
+    /**
+     * Extracts the URI string from the provided properties map, validates it, 
and returns the normalized URI.
+     * <p>
+     * This method checks that the 'uri' key exists in the property map, 
retrieves the value,
+     * and then delegates to {@link #validateAndNormalizeUri(String)} for 
further validation and normalization.
+     *
+     * @param loadProps the map containing load-related properties, including 
the URI under the key 'uri'
+     * @return a normalized and validated URI string
+     * @throws UserException if the 'uri' property is missing, empty, or 
invalid
+     */
+    public abstract String validateAndGetUri(Map<String, String> loadProps) 
throws UserException;
+
+    public abstract String getStorageName();
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
new file mode 100644
index 00000000000..d6f8dce27a5
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class COSPropertiesTest {
+    private Map<String, String> origProps;
+
+    private static String secretKey = "";
+    private static String accessKey = "";
+    private static String hdfsPath = "";
+
+    @BeforeEach
+    public void setUp() {
+        origProps = new HashMap<>();
+    }
+
+    @Test
+    public void testCOSProperties() throws UserException {
+        origProps.put("cos.endpoint", "https://cos.example.com";);
+        origProps.put("cos.access_key", "myCOSAccessKey");
+        origProps.put("cos.secret_key", "myCOSSecretKey");
+        origProps.put("cos.region", "ap-beijing-1");
+        origProps.put("connection.maximum", "88");
+        origProps.put("connection.request.timeout", "100");
+        origProps.put("connection.timeout", "1000");
+        origProps.put("use_path_style", "true");
+        origProps.put(StorageProperties.FS_COS_SUPPORT, "true");
+        origProps.put("test_non_storage_param", "6000");
+        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Invalid endpoint format: 
https://cos.example.com";);
+        origProps.put("cos.endpoint", "cos.ap-beijing-1.myqcloud.com");
+        COSProperties cosProperties = (COSProperties) 
StorageProperties.createAll(origProps).get(0);
+        Map<String, String> cosConfig = cosProperties.getMatchedProperties();
+        
Assertions.assertTrue(!cosConfig.containsKey("test_non_storage_param"));
+
+        origProps.forEach((k, v) -> {
+            if (!k.equals("test_non_storage_param") && 
!k.equals(StorageProperties.FS_COS_SUPPORT)) {
+                Assertions.assertEquals(v, cosConfig.get(k));
+            }
+        });
+        origProps = new HashMap<>();
+        origProps.put("cos.endpoint", "https://cos.example.com";);
+        origProps.put(StorageProperties.FS_COS_SUPPORT, "true");
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Property cos.access_key is required.");
+        origProps.put("cos.access_key", "myCOSAccessKey");
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Property cos.secret_key is required.");
+        origProps.put("cos.secret_key", "myCOSSecretKey");
+        //no any exception
+        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> 
StorageProperties.createPrimary(origProps), "Invalid endpoint format: 
https://cos.example.com";);
+        origProps.put("cos.endpoint", "cos.ap-beijing.myqcloud.com");
+        Assertions.assertDoesNotThrow(() -> 
StorageProperties.createPrimary(origProps));
+    }
+
+    @Test
+    public void testToNativeS3Configuration() throws UserException {
+        origProps.put("cos.endpoint", "cos.ap-beijing.myqcloud.com");
+        origProps.put("cos.access_key", "myCOSAccessKey");
+        origProps.put("cos.secret_key", "myCOSSecretKey");
+        origProps.put("test_non_storage_param", "6000");
+        origProps.put("connection.maximum", "88");
+        origProps.put("connection.request.timeout", "100");
+        origProps.put("connection.timeout", "1000");
+        origProps.put(StorageProperties.FS_COS_SUPPORT, "true");
+        //origProps.put("cos.region", "ap-beijing");
+
+        COSProperties cosProperties = (COSProperties) 
StorageProperties.createAll(origProps).get(0);
+        Map<String, String> s3Props = 
cosProperties.generateBackendS3Configuration();
+        Map<String, String> cosConfig = cosProperties.getMatchedProperties();
+        
Assertions.assertTrue(!cosConfig.containsKey("test_non_storage_param"));
+
+        origProps.forEach((k, v) -> {
+            if (!k.equals("test_non_storage_param") && 
!k.equals(StorageProperties.FS_COS_SUPPORT)) {
+                Assertions.assertEquals(v, cosConfig.get(k));
+            }
+        });
+        // Validate the S3 properties
+        Assertions.assertEquals("cos.ap-beijing.myqcloud.com", 
s3Props.get("AWS_ENDPOINT"));
+        Assertions.assertEquals("ap-beijing", s3Props.get("AWS_REGION"));
+        Assertions.assertEquals("myCOSAccessKey", 
s3Props.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("myCOSSecretKey", 
s3Props.get("AWS_SECRET_KEY"));
+        Assertions.assertEquals("88", s3Props.get("AWS_MAX_CONNECTIONS"));
+        Assertions.assertEquals("100", s3Props.get("AWS_REQUEST_TIMEOUT_MS"));
+        Assertions.assertEquals("1000", 
s3Props.get("AWS_CONNECTION_TIMEOUT_MS"));
+        Assertions.assertEquals("false", s3Props.get("use_path_style"));
+        origProps.put("use_path_style", "true");
+        cosProperties = (COSProperties) 
StorageProperties.createAll(origProps).get(0);
+        s3Props = cosProperties.generateBackendS3Configuration();
+        Assertions.assertEquals("true", s3Props.get("use_path_style"));
+        // Add any additional assertions for other properties if needed
+    }
+
+    @Test
+    public void testGetRegion() throws UserException {
+        origProps.put("cos.endpoint", "cos.ap-beijing.myqcloud.com");
+        origProps.put("cos.access_key", "myCOSAccessKey");
+        origProps.put("cos.secret_key", "myCOSSecretKey");
+        COSProperties cosProperties = (COSProperties) 
StorageProperties.createPrimary(origProps);
+        Assertions.assertEquals("ap-beijing", cosProperties.getRegion());
+        Assertions.assertEquals("myCOSAccessKey", 
cosProperties.getAccessKey());
+        Assertions.assertEquals("myCOSSecretKey", 
cosProperties.getSecretKey());
+        Assertions.assertEquals("cos.ap-beijing.myqcloud.com", 
cosProperties.getEndpoint());
+    }
+
+    @Test
+    public void testGetRegionWithDefault() throws UserException {
+        origProps.put("uri", 
"https://examplebucket-1250000000.cos.ap-beijing.myqcloud.com/test/file.txt";);
+        origProps.put("cos.access_key", "myCOSAccessKey");
+        origProps.put("cos.secret_key", "myCOSSecretKey");
+        COSProperties cosProperties = (COSProperties) 
StorageProperties.createPrimary(origProps);
+        Assertions.assertEquals("ap-beijing", cosProperties.getRegion());
+        Assertions.assertEquals("myCOSAccessKey", 
cosProperties.getAccessKey());
+        Assertions.assertEquals("myCOSSecretKey", 
cosProperties.getSecretKey());
+        Assertions.assertEquals("cos.ap-beijing.myqcloud.com", 
cosProperties.getEndpoint());
+        Map<String, String> cosNoEndpointProps = new HashMap<>();
+        cosNoEndpointProps.put("cos.access_key", "myCOSAccessKey");
+        cosNoEndpointProps.put("cos.secret_key", "myCOSSecretKey");
+        cosNoEndpointProps.put("cos.region", "ap-beijing");
+        origProps.put("uri", "s3://examplebucket-1250000000/test/file.txt");
+        //not support this case
+        Assertions.assertThrowsExactly(RuntimeException.class, () -> 
StorageProperties.createPrimary(cosNoEndpointProps), "Property cos.endpoint is 
required.");
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/HdfsPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/HdfsPropertiesTest.java
new file mode 100644
index 00000000000..a4dff23d276
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/HdfsPropertiesTest.java
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.common.UserException;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsPropertiesTest {
+
+
+    @Test
+    public void testBasicHdfsCreate() throws UserException {
+        // Test 1: Check default authentication type (should be "simple")
+        Map<String, String> origProps = createBaseHdfsProperties();
+        List<StorageProperties> storageProperties = 
StorageProperties.createAll(origProps);
+        HdfsProperties hdfsProperties = (HdfsProperties) 
storageProperties.get(0);
+        Configuration conf = hdfsProperties.getHadoopConfiguration();
+        Assertions.assertEquals("simple", 
conf.get("hadoop.security.authentication"));
+
+        // Test 2: Kerberos without necessary configurations (should throw 
exception)
+        origProps.put("hdfs.authentication.type", "kerberos");
+        assertKerberosConfigException(origProps, "HDFS authentication type is 
kerberos, but principal or keytab is not set");
+
+        // Test 3: Kerberos with missing principal (should throw exception)
+        origProps.put("hdfs.authentication.kerberos.principal", "hadoop");
+        assertKerberosConfigException(origProps, "HDFS authentication type is 
kerberos, but principal or keytab is not set");
+
+        // Test 4: Kerberos with complete config (should succeed)
+        origProps.put("hdfs.authentication.kerberos.keytab", "keytab");
+        HdfsProperties properties = (HdfsProperties) 
StorageProperties.createAll(origProps)
+                .get(0);  // No exception expected
+        Configuration configuration = properties.getHadoopConfiguration();
+        Assertions.assertEquals("kerberos", 
configuration.get("hdfs.security.authentication"));
+        Assertions.assertEquals("hadoop", 
configuration.get("hadoop.kerberos.principal"));
+        Assertions.assertEquals("keytab", 
configuration.get("hadoop.kerberos.keytab"));
+    }
+
+    @Test
+    public void testBasicHdfsPropertiesCreateByConfigFile() throws 
UserException {
+        // Test 1: Check loading of config resources
+        Map<String, String> origProps = createBaseHdfsProperties();
+        URL hiveFileUrl = 
HdfsPropertiesTest.class.getClassLoader().getResource("plugins");
+        Config.hadoop_config_dir = hiveFileUrl.getPath().toString() + 
"/hadoop_conf/";
+        origProps.put("hadoop.config.resources", 
"hadoop/core-site.xml,hadoop/hdfs-site.xml");
+
+        // Test 2: Missing config resources (should throw exception)
+        assertConfigResourceException(origProps, "Config resource file does 
not exist");
+
+        // Test 3: Valid config resources (should succeed)
+        origProps.put("hadoop.config.resources", 
"hadoop1/core-site.xml,hadoop1/hdfs-site.xml");
+        List<StorageProperties> storageProperties = 
StorageProperties.createAll(origProps);
+        HdfsProperties hdfsProperties = (HdfsProperties) 
storageProperties.get(0);
+        Configuration conf = hdfsProperties.getHadoopConfiguration();
+        Assertions.assertEquals("hdfs://localhost:9000", 
conf.get("fs.defaultFS"));
+        Assertions.assertEquals("ns1", conf.get("dfs.nameservices"));
+
+        // Test 4: Kerberos without necessary configurations (should throw 
exception)
+        origProps.put("hdfs.authentication.type", "kerberos");
+        assertKerberosConfigException(origProps, "HDFS authentication type is 
kerberos, but principal or keytab is not set");
+
+        // Test 5: Kerberos with missing principal (should throw exception)
+        origProps.put("hdfs.authentication.kerberos.principal", "hadoop");
+        assertKerberosConfigException(origProps, "HDFS authentication type is 
kerberos, but principal or keytab is not set");
+
+        // Test 6: Kerberos with complete config (should succeed)
+        origProps.put("hdfs.authentication.kerberos.keytab", "keytab");
+        hdfsProperties = (HdfsProperties) 
StorageProperties.createAll(origProps).get(0);  // No exception expected
+        Configuration configuration = hdfsProperties.getHadoopConfiguration();
+        Assertions.assertEquals("kerberos", 
configuration.get("hdfs.security.authentication"));
+        Assertions.assertEquals("hadoop", 
configuration.get("hadoop.kerberos.principal"));
+        Assertions.assertEquals("keytab", 
configuration.get("hadoop.kerberos.keytab"));
+        Assertions.assertEquals("hdfs://localhost:9000", 
configuration.get("fs.defaultFS"));
+
+    }
+
+    @Test
+    public void testNonParamsException() throws UserException {
+        Map<String, String> origProps = new HashMap<>();
+        Assertions.assertThrowsExactly(RuntimeException.class, () -> 
StorageProperties.createPrimary(origProps));
+        origProps.put("nonhdfs", "hdfs://localhost:9000");
+        Assertions.assertThrowsExactly(RuntimeException.class, () -> {
+            StorageProperties.createPrimary(origProps);
+        });
+        origProps.put(StorageProperties.FS_HDFS_SUPPORT, "true");
+        HdfsProperties hdfsProperties = (HdfsProperties) 
StorageProperties.createPrimary(origProps);
+        Assertions.assertEquals("HDFS", hdfsProperties.getStorageName());
+        Assertions.assertNotEquals(null, 
hdfsProperties.getHadoopConfiguration());
+        Assertions.assertNotEquals(null, 
hdfsProperties.getBackendConfigProperties());
+        Map<String, String> resourceNullVal = new HashMap<>();
+        resourceNullVal.put(StorageProperties.FS_HDFS_SUPPORT, "true");
+        
StorageProperties.createPrimary(resourceNullVal).getBackendConfigProperties();
+    }
+
+    @Test
+    public void testBasicCreateByOldProperties() throws UserException {
+        Map<String, String> origProps = new HashMap<>();
+        origProps.put("hdfs.authentication.type", "simple");
+        origProps.put("fs.defaultFS", "hdfs://localhost:9000");
+        StorageProperties properties = 
StorageProperties.createAll(origProps).get(0);
+        Assertions.assertEquals(properties.getClass(), HdfsProperties.class);
+        origProps.put("dfs.nameservices", "ns1");
+        origProps.put("dfs.ha.namenodes.ns1", "nn1,nn2");
+        origProps.put("dfs.namenode.rpc-address.ns1.nn1", "localhost:9000");
+        origProps.put("hadoop.async.threads.max", "10");
+        properties = StorageProperties.createAll(origProps).get(0);
+        Assertions.assertEquals(properties.getClass(), HdfsProperties.class);
+        Map<String, String> beProperties = 
properties.getBackendConfigProperties();
+        Assertions.assertEquals("hdfs://localhost:9000", 
beProperties.get("fs.defaultFS"));
+        Assertions.assertEquals("ns1", beProperties.get("dfs.nameservices"));
+        Assertions.assertEquals("nn1,nn2", 
beProperties.get("dfs.ha.namenodes.ns1"));
+        Assertions.assertEquals("10", 
beProperties.get("hadoop.async.threads.max"));
+
+
+    }
+
+    // Helper methods to reduce code duplication
+    private Map<String, String> createBaseHdfsProperties() {
+        Map<String, String> origProps = Maps.newHashMap();
+        origProps.put(StorageProperties.FS_HDFS_SUPPORT, "true");
+        return origProps;
+    }
+
+    private void assertKerberosConfigException(Map<String, String> origProps, 
String expectedMessage) {
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), expectedMessage);
+    }
+
+    private void assertConfigResourceException(Map<String, String> origProps, 
String expectedMessage) {
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), expectedMessage);
+    }
+
+    @Test
+    public void checkUriParamsTests() throws UserException {
+        Map<String, String> origProps = createBaseHdfsProperties();
+        origProps.put("fs.defaultFS", "hdfs://localhost:9000");
+        origProps.put("uri", "s3://region/bucket/");
+        HdfsProperties hdfsProperties = (HdfsProperties) 
StorageProperties.createAll(origProps).get(0);
+        HdfsProperties finalHdfsProperties = hdfsProperties;
+        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
+            finalHdfsProperties.validateAndGetUri(origProps);
+        });
+
+        origProps.put("uri", "hdfs://localhost:9000/test");
+        origProps.put("hadoop.username", "test");
+        hdfsProperties = (HdfsProperties) 
StorageProperties.createAll(origProps).get(0);
+        Assertions.assertEquals("test", 
hdfsProperties.getBackendConfigProperties().get("hadoop.username"));
+        Assertions.assertEquals("hdfs://localhost:9000/test", 
hdfsProperties.validateAndGetUri(origProps));
+        HdfsProperties finalHdfsProperties1 = hdfsProperties;
+        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
+            finalHdfsProperties1.validateAndNormalizeUri("");
+        });
+        Assertions.assertEquals("hdfs://localhost:9000/test", 
hdfsProperties.validateAndNormalizeUri("hdfs://localhost:9000/test"));
+
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtilsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtilsTest.java
new file mode 100644
index 00000000000..b01688cd264
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtilsTest.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HdfsPropertiesUtilsTest {
+
+    @Test
+    public void testCheckLoadPropsAndReturnUri_success() throws Exception {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", "hdfs://localhost:9000/data/file.txt");
+
+        String result = HdfsPropertiesUtils.validateAndGetUri(props);
+        Assertions.assertEquals("hdfs://localhost:9000/data/file.txt", result);
+    }
+
+    @Test
+    public void testCheckLoadPropsAndReturnUri_emptyProps() {
+        Map<String, String> props = new HashMap<>();
+
+        Exception exception = Assertions.assertThrows(UserException.class, () 
-> {
+            HdfsPropertiesUtils.validateAndGetUri(props);
+        });
+        Assertions.assertEquals("errCode = 2, detailMessage = props is empty", 
exception.getMessage());
+    }
+
+    @Test
+    public void testCheckLoadPropsAndReturnUri_missingUriKey() {
+        Map<String, String> props = new HashMap<>();
+        props.put("path", "xxx");
+
+        Exception exception = Assertions.assertThrows(UserException.class, () 
-> {
+            HdfsPropertiesUtils.validateAndGetUri(props);
+        });
+        Assertions.assertEquals("errCode = 2, detailMessage = props must 
contain uri", exception.getMessage());
+    }
+
+    @Test
+    public void testConvertUrlToFilePath_valid() throws Exception {
+        String uri = "viewfs://cluster/user/test";
+        String result = HdfsPropertiesUtils.convertUrlToFilePath(uri);
+        Assertions.assertEquals("viewfs://cluster/user/test", result);
+    }
+
+    @Test
+    public void testConvertUrlToFilePath_invalidSchema() {
+        String uri = "s3://bucket/file.txt";
+
+        Exception exception = 
Assertions.assertThrows(IllegalArgumentException.class, () -> {
+            HdfsPropertiesUtils.convertUrlToFilePath(uri);
+        });
+        Assertions.assertTrue(exception.getMessage().contains("Invalid export 
path"));
+    }
+
+    @Test
+    public void testConvertUrlToFilePath_blankUri() {
+        String uri = "   ";
+
+        Exception exception = 
Assertions.assertThrows(IllegalArgumentException.class, () -> {
+            HdfsPropertiesUtils.convertUrlToFilePath(uri);
+        });
+        Assertions.assertTrue(exception.getMessage().contains("uri is null"));
+    }
+
+    @Test
+    public void testConstructDefaultFsFromUri_valid() {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", "hdfs://localhost:8020/data");
+
+        String result = HdfsPropertiesUtils.constructDefaultFsFromUri(props);
+        Assertions.assertEquals("hdfs://localhost:8020", result);
+    }
+
+    @Test
+    public void testConstructDefaultFsFromUri_viewfs() {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", "viewfs://cluster/path");
+
+        String result = HdfsPropertiesUtils.constructDefaultFsFromUri(props);
+        Assertions.assertEquals("viewfs://cluster", result);
+    }
+
+    @Test
+    public void testConstructDefaultFsFromUri_invalidSchema() {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", "obs://bucket/test");
+
+        Exception exception = 
Assertions.assertThrows(IllegalArgumentException.class, () -> {
+            HdfsPropertiesUtils.constructDefaultFsFromUri(props);
+        });
+        Assertions.assertTrue(exception.getMessage().contains("Invalid export 
path"));
+    }
+
+    @Test
+    public void testConstructDefaultFsFromUri_emptyProps() {
+        Map<String, String> props = new HashMap<>();
+        String result = HdfsPropertiesUtils.constructDefaultFsFromUri(props);
+        Assertions.assertNull(result);
+    }
+
+    @Test
+    public void testConstructDefaultFsFromUri_missingUri() {
+        Map<String, String> props = new HashMap<>();
+        props.put("x", "y");
+
+        String result = HdfsPropertiesUtils.constructDefaultFsFromUri(props);
+        Assertions.assertNull(result);
+    }
+
+    @Test
+    public void testConstructDefaultFsFromUri_blankUri() {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", "  ");
+
+        String result = HdfsPropertiesUtils.constructDefaultFsFromUri(props);
+        Assertions.assertNull(result);
+    }
+
+    @Test
+    public void testConvertUrlToFilePath_uppercaseSchema() throws Exception {
+        String uri = "HDFS://localhost:9000/test";
+        String result = HdfsPropertiesUtils.convertUrlToFilePath(uri);
+        Assertions.assertEquals("HDFS://localhost:9000/test", result);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
new file mode 100644
index 00000000000..a33edb451a0
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class OBSPropertyTest {
+    private Map<String, String> origProps = new HashMap<>();
+
+    @Test
+    public void testBasicCreateTest() {
+        //Map<String, String> origProps = new HashMap<>();
+        origProps.put("obs.endpoint", "https://obs.example.com";);
+        origProps.put("obs.access_key", "myOBSAccessKey");
+        origProps.put("obs.secret_key", "myOBSSecretKey");
+        origProps.put(StorageProperties.FS_OBS_SUPPORT, "true");
+        // Test creation without additional properties
+        origProps = new HashMap<>();
+        origProps.put("obs.endpoint", "https://obs.example.com";);
+        origProps.put(StorageProperties.FS_OBS_SUPPORT, "true");
+
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Property obs.access_key is required.");
+        origProps.put("obs.access_key", "myOBSAccessKey");
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Property obs.secret_key is required.");
+        origProps.put("obs.secret_key", "myOBSSecretKey");
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Invalid endpoint format: 
https://obs.example.com";);
+        origProps.put("obs.endpoint", "obs.cn-north-4.myhuaweicloud.com");
+        Assertions.assertDoesNotThrow(() -> 
StorageProperties.createAll(origProps));
+        origProps.put("obs.endpoint", 
"https://obs.cn-north-4.myhuaweicloud.com";);
+        Assertions.assertDoesNotThrow(() -> 
StorageProperties.createAll(origProps));
+    }
+
+    @Test
+    public void testToNativeS3Configuration() throws UserException {
+        origProps.put("obs.access_key", "myOBSAccessKey");
+        origProps.put("obs.secret_key", "myOBSSecretKey");
+        origProps.put("obs.endpoint", "obs.cn-north-4.myhuaweicloud.com");
+        origProps.put("connection.maximum", "88");
+        origProps.put("connection.request.timeout", "100");
+        origProps.put("connection.timeout", "1000");
+        origProps.put("use_path_style", "true");
+        origProps.put("test_non_storage_param", "test_non_storage_value");
+        origProps.put(StorageProperties.FS_OBS_SUPPORT, "true");
+        OBSProperties obsProperties = (OBSProperties) 
StorageProperties.createAll(origProps).get(0);
+        Map<String, String> s3Props = new HashMap<>();
+        Map<String, String> obsConfig = obsProperties.getMatchedProperties();
+        
Assertions.assertTrue(!obsConfig.containsKey("test_non_storage_param"));
+
+        origProps.forEach((k, v) -> {
+            if (!k.equals("test_non_storage_param") && 
!k.equals(StorageProperties.FS_OBS_SUPPORT)) {
+                Assertions.assertEquals(v, obsConfig.get(k));
+            }
+        });
+
+        s3Props = obsProperties.getBackendConfigProperties();
+        Assertions.assertEquals("obs.cn-north-4.myhuaweicloud.com", 
s3Props.get("AWS_ENDPOINT"));
+        Assertions.assertEquals("cn-north-4", s3Props.get("AWS_REGION"));
+        Assertions.assertEquals("myOBSAccessKey", 
s3Props.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("myOBSSecretKey", 
s3Props.get("AWS_SECRET_KEY"));
+        Assertions.assertEquals("88", s3Props.get("AWS_MAX_CONNECTIONS"));
+        Assertions.assertEquals("100", s3Props.get("AWS_REQUEST_TIMEOUT_MS"));
+        Assertions.assertEquals("1000", 
s3Props.get("AWS_CONNECTION_TIMEOUT_MS"));
+        Assertions.assertEquals("true", s3Props.get("use_path_style"));
+        origProps.remove("use_path_style");
+        obsProperties = (OBSProperties) 
StorageProperties.createAll(origProps).get(0);
+        s3Props = obsProperties.getBackendConfigProperties();
+        Assertions.assertEquals("false", s3Props.get("use_path_style"));
+    }
+
+
+    @Test
+    public void testGetRegion() throws UserException {
+        origProps.put("obs.endpoint", "obs.cn-north-4.myhuaweicloud.com");
+        origProps.put("obs.access_key", "myCOSAccessKey");
+        origProps.put("obs.secret_key", "myCOSSecretKey");
+        OBSProperties obsProperties = (OBSProperties) 
StorageProperties.createAll(origProps).get(0);
+        Assertions.assertEquals("cn-north-4", obsProperties.getRegion());
+        Assertions.assertEquals("myCOSAccessKey", 
obsProperties.getAccessKey());
+        Assertions.assertEquals("myCOSSecretKey", 
obsProperties.getSecretKey());
+        Assertions.assertEquals("obs.cn-north-4.myhuaweicloud.com", 
obsProperties.getEndpoint());
+    }
+
+    @Test
+    public void testGetRegionWithDefault() throws UserException {
+        origProps.put("uri", 
"https://examplebucket-1250000000.obs.cn-north-4.myhuaweicloud.com/test/file.txt";);
+        origProps.put("obs.access_key", "myCOSAccessKey");
+        origProps.put("obs.secret_key", "myCOSSecretKey");
+        OBSProperties obsProperties = (OBSProperties) 
StorageProperties.createPrimary(origProps);
+        Assertions.assertEquals("cn-north-4", obsProperties.getRegion());
+        Assertions.assertEquals("myCOSAccessKey", 
obsProperties.getAccessKey());
+        Assertions.assertEquals("myCOSSecretKey", 
obsProperties.getSecretKey());
+        Assertions.assertEquals("obs.cn-north-4.myhuaweicloud.com", 
obsProperties.getEndpoint());
+        Map<String, String> cosNoEndpointProps = new HashMap<>();
+        cosNoEndpointProps.put("obs.access_key", "myCOSAccessKey");
+        cosNoEndpointProps.put("obs.secret_key", "myCOSSecretKey");
+        cosNoEndpointProps.put("obs.region", "ap-beijing");
+        cosNoEndpointProps.put("uri", 
"s3://examplebucket-1250000000/test/file.txt");
+        //not support
+        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> 
StorageProperties.createAll(cosNoEndpointProps), "Property cos.endpoint is 
required.");
+    }
+
+    private static String obsAccessKey = "";
+    private static String obsSecretKey = "";
+
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
new file mode 100644
index 00000000000..1b67ced5720
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class OSSPropertiesTest {
+
+    private static String ossAccessKey = "";
+    private static String ossSecretKey = "";
+    private static String hdfsPath = "";
+
+    @Test
+    public void testBasicCreateTest() {
+        Map<String, String> origProps = new HashMap<>();
+        origProps.put("oss.endpoint", "https://oss.aliyuncs.com";);
+        origProps.put("oss.access_key", "myOSSAccessKey");
+        origProps.put("oss.secret_key", "myOSSSecretKey");
+        origProps.put(StorageProperties.FS_OSS_SUPPORT, "true");
+        Map<String, String> finalOrigProps = origProps;
+        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> 
StorageProperties.createPrimary(finalOrigProps), "Property oss.endpoint is 
required.");
+        origProps.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
+        Map<String, String> finalOrigProps1 = origProps;
+        Assertions.assertDoesNotThrow(() -> 
StorageProperties.createPrimary(finalOrigProps1));
+        origProps = new HashMap<>();
+        origProps.put("oss.endpoint", "https://oss.aliyuncs.com";);
+        Map<String, String> finalOrigProps2 = origProps;
+        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> 
StorageProperties.createPrimary(finalOrigProps2));
+
+    }
+
+
+    @Test
+    public void testToNativeS3Configuration() throws UserException {
+        Map<String, String> origProps = new HashMap<>();
+        origProps.put("oss.access_key", "myOSSAccessKey");
+        origProps.put("oss.secret_key", "myOSSSecretKey");
+        origProps.put("oss.endpoint", "oss-cn-beijing-internal.aliyuncs.com");
+        origProps.put(StorageProperties.FS_OSS_SUPPORT, "true");
+        origProps.put("connection.maximum", "88");
+        origProps.put("connection.request.timeout", "100");
+        origProps.put("connection.timeout", "1000");
+        origProps.put("use_path_style", "true");
+        origProps.put("test_non_storage_param", "6000");
+        OSSProperties ossProperties = (OSSProperties) 
StorageProperties.createAll(origProps).get(0);
+        Map<String, String> s3Props;
+
+        Map<String, String> ossConfig = ossProperties.getMatchedProperties();
+        
Assertions.assertTrue(!ossConfig.containsKey("test_non_storage_param"));
+
+        origProps.forEach((k, v) -> {
+            if (!k.equals("test_non_storage_param") && 
!k.equals(StorageProperties.FS_OSS_SUPPORT)) {
+                Assertions.assertEquals(v, ossConfig.get(k));
+            }
+        });
+
+
+        s3Props = ossProperties.generateBackendS3Configuration();
+        Assertions.assertEquals("oss-cn-beijing-internal.aliyuncs.com", 
s3Props.get("AWS_ENDPOINT"));
+        Assertions.assertEquals("cn-beijing-internal", 
s3Props.get("AWS_REGION"));
+        Assertions.assertEquals("myOSSAccessKey", 
s3Props.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("myOSSSecretKey", 
s3Props.get("AWS_SECRET_KEY"));
+        Assertions.assertEquals("88", s3Props.get("AWS_MAX_CONNECTIONS"));
+        Assertions.assertEquals("100", s3Props.get("AWS_REQUEST_TIMEOUT_MS"));
+        Assertions.assertEquals("1000", 
s3Props.get("AWS_CONNECTION_TIMEOUT_MS"));
+        Assertions.assertEquals("true", s3Props.get("use_path_style"));
+        origProps.remove("use_path_style");
+        ossProperties = (OSSProperties) 
StorageProperties.createAll(origProps).get(0);
+        s3Props = ossProperties.generateBackendS3Configuration();
+        Assertions.assertEquals("false", s3Props.get("use_path_style"));
+    }
+
+    @Test
+    public void testGetRegion() throws UserException {
+        Map<String, String> origProps = new HashMap<>();
+        origProps.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
+        origProps.put("oss.access_key", "myCOSAccessKey");
+        origProps.put("oss.secret_key", "myCOSSecretKey");
+        OSSProperties ossProperties = (OSSProperties) 
StorageProperties.createPrimary(origProps);
+        Assertions.assertEquals("cn-hangzhou", ossProperties.getRegion());
+        Assertions.assertEquals("myCOSAccessKey", 
ossProperties.getAccessKey());
+        Assertions.assertEquals("myCOSSecretKey", 
ossProperties.getSecretKey());
+        Assertions.assertEquals("oss-cn-hangzhou.aliyuncs.com", 
ossProperties.getEndpoint());
+    }
+
+    @Test
+    public void testGetRegionWithDefault() throws UserException {
+        Map<String, String> origProps = new HashMap<>();
+        origProps.put("uri", 
"https://examplebucket-1250000000.oss-cn-hangzhou.aliyuncs.com/test/file.txt";);
+        origProps.put("oss.access_key", "myCOSAccessKey");
+        origProps.put("oss.secret_key", "myCOSSecretKey");
+        OSSProperties ossProperties = (OSSProperties) 
StorageProperties.createPrimary(origProps);
+        Assertions.assertEquals("cn-hangzhou", ossProperties.getRegion());
+        Assertions.assertEquals("myCOSAccessKey", 
ossProperties.getAccessKey());
+        Assertions.assertEquals("myCOSSecretKey", 
ossProperties.getSecretKey());
+        Assertions.assertEquals("oss-cn-hangzhou.aliyuncs.com", 
ossProperties.getEndpoint());
+        Map<String, String> cosNoEndpointProps = new HashMap<>();
+        cosNoEndpointProps.put("oss.access_key", "myCOSAccessKey");
+        cosNoEndpointProps.put("oss.secret_key", "myCOSSecretKey");
+        cosNoEndpointProps.put("oss.region", "cn-hangzhou");
+        origProps.put("uri", "s3://examplebucket-1250000000/test/file.txt");
+        // not support
+        Assertions.assertThrowsExactly(RuntimeException.class, () -> 
StorageProperties.createPrimary(cosNoEndpointProps), "Property cos.endpoint is 
required.");
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
new file mode 100644
index 00000000000..b5a8ce0d825
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class S3PropertiesTest {
+    private Map<String, String> origProps;
+
+    private static String secretKey = "";
+    private static String accessKey = "";
+    private static String hdfsPath = "";
+
+    @BeforeEach
+    public void setUp() {
+        origProps = new HashMap<>();
+    }
+
+    @Test
+    public void testS3Properties() {
+        origProps.put("s3.endpoint", "https://cos.example.com";);
+        origProps.put("s3.access_key", "myS3AccessKey");
+        origProps.put("s3.secret_key", "myS3SecretKey");
+        origProps.put("s3.region", "us-west-1");
+        origProps.put(StorageProperties.FS_S3_SUPPORT, "true");
+        origProps = new HashMap<>();
+        origProps.put("s3.endpoint", "https://s3.example.com";);
+        origProps.put(StorageProperties.FS_S3_SUPPORT, "true");
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Property cos.access_key is required.");
+        origProps.put("s3.access_key", "myS3AccessKey");
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Property cos.secret_key is required.");
+        origProps.put("s3.secret_key", "myS3SecretKey");
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
StorageProperties.createAll(origProps), "Invalid endpoint format: 
https://s3.example.com";);
+        origProps.put("s3.endpoint", "s3.us-west-1.amazonaws.com");
+        Assertions.assertDoesNotThrow(() -> 
StorageProperties.createAll(origProps));
+    }
+
+    @Test
+    public void testToNativeS3Configuration() throws UserException {
+        origProps.put("s3.endpoint", "https://cos.example.com";);
+        origProps.put("s3.access_key", "myS3AccessKey");
+        origProps.put("s3.secret_key", "myS3SecretKey");
+        origProps.put("s3.region", "us-west-1");
+        origProps.put(StorageProperties.FS_S3_SUPPORT, "true");
+        origProps.put("use_path_style", "true");
+        origProps.put("s3.connection.maximum", "88");
+        origProps.put("s3.connection.timeout", "6000");
+        origProps.put("test_non_storage_param", "6000");
+
+
+        Assertions.assertThrows(IllegalArgumentException.class, () -> {
+            StorageProperties.createAll(origProps).get(1);
+        }, "Invalid endpoint format: https://cos.example.com";);
+        origProps.put("s3.endpoint", "s3.us-west-1.amazonaws.com");
+        S3Properties s3Properties = (S3Properties) 
StorageProperties.createAll(origProps).get(0);
+        Map<String, String> s3Props = 
s3Properties.getBackendConfigProperties();
+        Map<String, String> s3Config = s3Properties.getMatchedProperties();
+        Assertions.assertTrue(!s3Config.containsKey("test_non_storage_param"));
+
+        origProps.forEach((k, v) -> {
+            if (!k.equals("test_non_storage_param") && 
!k.equals(StorageProperties.FS_S3_SUPPORT)) {
+                Assertions.assertEquals(v, s3Config.get(k));
+            }
+        });
+        // Validate the S3 properties
+        Assertions.assertEquals("s3.us-west-1.amazonaws.com", 
s3Props.get("AWS_ENDPOINT"));
+        Assertions.assertEquals("us-west-1", s3Props.get("AWS_REGION"));
+        Assertions.assertEquals("myS3AccessKey", 
s3Props.get("AWS_ACCESS_KEY"));
+        Assertions.assertEquals("myS3SecretKey", 
s3Props.get("AWS_SECRET_KEY"));
+        Assertions.assertEquals("88", s3Props.get("AWS_MAX_CONNECTIONS"));
+        Assertions.assertEquals("6000", 
s3Props.get("AWS_CONNECTION_TIMEOUT_MS"));
+        Assertions.assertEquals("true", s3Props.get("use_path_style"));
+        origProps.remove("use_path_style");
+        origProps.remove("s3.connection.maximum");
+        origProps.remove("s3.connection.timeout");
+        s3Props = s3Properties.getBackendConfigProperties();
+
+        Assertions.assertEquals("true", s3Props.get("use_path_style"));
+        Assertions.assertEquals("88", s3Props.get("AWS_MAX_CONNECTIONS"));
+        Assertions.assertEquals("6000", 
s3Props.get("AWS_CONNECTION_TIMEOUT_MS"));
+    }
+
+
+    @Test
+    public void testGetRegion() throws UserException {
+        Map<String, String> origProps = new HashMap<>();
+        origProps.put("s3.endpoint", "oss-cn-hangzhou.aliyuncs.com");
+        origProps.put("s3.access_key", "myCOSAccessKey");
+        origProps.put("s3.secret_key", "myCOSSecretKey");
+        origProps.put("s3.region", "cn-hangzhou");
+        OSSProperties ossProperties = (OSSProperties) 
StorageProperties.createPrimary(origProps);
+        Assertions.assertEquals("cn-hangzhou", ossProperties.getRegion());
+        Assertions.assertEquals("myCOSAccessKey", 
ossProperties.getAccessKey());
+        Assertions.assertEquals("myCOSSecretKey", 
ossProperties.getSecretKey());
+        Assertions.assertEquals("oss-cn-hangzhou.aliyuncs.com", 
ossProperties.getEndpoint());
+        origProps = new HashMap<>();
+        origProps.put("s3.endpoint", "s3.us-west-2.amazonaws.com");
+        origProps.put("s3.access_key", "myCOSAccessKey");
+        origProps.put("s3.secret_key", "myCOSSecretKey");
+        origProps.put("s3.region", "us-west-2");
+        S3Properties s3Properties = (S3Properties) 
StorageProperties.createPrimary(origProps);
+        Assertions.assertEquals("us-west-2", s3Properties.getRegion());
+        Assertions.assertEquals("myCOSAccessKey", s3Properties.getAccessKey());
+        Assertions.assertEquals("myCOSSecretKey", s3Properties.getSecretKey());
+        Assertions.assertEquals("s3.us-west-2.amazonaws.com", 
s3Properties.getEndpoint());
+
+
+    }
+
+    @Test
+    public void testGetRegionWithDefault() throws UserException {
+        Map<String, String> origProps = new HashMap<>();
+        origProps.put("uri", 
"https://example-bucket.s3.us-west-2.amazonaws.com/path/to/file.txt\n";);
+        origProps.put("s3.access_key", "myCOSAccessKey");
+        origProps.put("s3.secret_key", "myCOSSecretKey");
+        origProps.put("s3.region", "us-west-2");
+        S3Properties s3Properties = (S3Properties) 
StorageProperties.createPrimary(origProps);
+        Assertions.assertEquals("us-west-2", s3Properties.getRegion());
+        Assertions.assertEquals("myCOSAccessKey", s3Properties.getAccessKey());
+        Assertions.assertEquals("myCOSSecretKey", s3Properties.getSecretKey());
+        Assertions.assertEquals("s3.us-west-2.amazonaws.com", 
s3Properties.getEndpoint());
+        Map<String, String> s3EndpointProps = new HashMap<>();
+        s3EndpointProps.put("oss.access_key", "myCOSAccessKey");
+        s3EndpointProps.put("oss.secret_key", "myCOSSecretKey");
+        s3EndpointProps.put("oss.region", "cn-hangzhou");
+        origProps.put("uri", "s3://examplebucket-1250000000/test/file.txt");
+        //not support
+        Assertions.assertThrowsExactly(RuntimeException.class, () -> 
StorageProperties.createPrimary(s3EndpointProps), "Property cos.endpoint is 
required.");
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertyUtilsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertyUtilsTest.java
new file mode 100644
index 00000000000..64795761383
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertyUtilsTest.java
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class S3PropertyUtilsTest {
+
+    @Test
+    void testCheckLoadPropsAndReturnUri_success() throws UserException {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", "https://bucket.s3.us-west-2.amazonaws.com/key";);
+        String result = S3PropertyUtils.validateAndGetUri(props);
+        
Assertions.assertEquals("https://bucket.s3.us-west-2.amazonaws.com/key";, 
result);
+    }
+
+    @Test
+    void testCheckLoadPropsAndReturnUri_missingKey() {
+        Map<String, String> props = new HashMap<>();
+        Executable executable = () -> S3PropertyUtils.validateAndGetUri(props);
+        UserException exception = Assertions.assertThrows(UserException.class, 
executable);
+        Assertions.assertEquals("errCode = 2, detailMessage = props is empty", 
exception.getMessage());
+
+        props.put("someKey", "value");
+        executable = () -> S3PropertyUtils.validateAndGetUri(props);
+        exception = Assertions.assertThrows(UserException.class, executable);
+        Assertions.assertEquals("errCode = 2, detailMessage = props must 
contain uri", exception.getMessage());
+    }
+
+    @Test
+    void testConstructEndpointFromUrl_success() throws UserException {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", 
"https://my-bucket.s3.us-east-1.amazonaws.com/some/file.txt";);
+        String endpoint = S3PropertyUtils.constructEndpointFromUrl(props, 
"false", "false");
+        Assertions.assertEquals("s3.us-east-1.amazonaws.com", endpoint);
+    }
+
+    @Test
+    void testConstructEndpointFromUrl_nullOrBlank() throws UserException {
+        Map<String, String> props = new HashMap<>();
+        Assertions.assertNull(S3PropertyUtils.constructEndpointFromUrl(props, 
"true", "false"));
+
+        props.put("uri", "");
+        Assertions.assertNull(S3PropertyUtils.constructEndpointFromUrl(props, 
"false", "true"));
+
+        props.put("uri", "invalid uri without scheme");
+        Assertions.assertThrowsExactly(UserException.class, () -> 
S3PropertyUtils.constructEndpointFromUrl(props, "true", "true"));
+    }
+
+    @Test
+    void testConstructRegionFromUrl_success() throws UserException {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", 
"https://my-bucket.s3.us-west-1.amazonaws.com/test.txt";);
+        String region = S3PropertyUtils.constructRegionFromUrl(props, "false", 
"false");
+        Assertions.assertEquals("us-west-1", region);
+    }
+
+    @Test
+    void testConstructRegionFromUrl_nullOrInvalid() throws UserException {
+        Map<String, String> props = new HashMap<>();
+        Assertions.assertNull(S3PropertyUtils.constructRegionFromUrl(props, 
"false", "false"));
+
+        props.put("uri", "");
+        Assertions.assertNull(S3PropertyUtils.constructRegionFromUrl(props, 
"false", "true"));
+
+        props.put("uri", "not a uri");
+        Assertions.assertThrowsExactly(UserException.class, () -> 
S3PropertyUtils.constructRegionFromUrl(props, "false", "true"));
+        props.put("uri", 
"https://my-bucket.s3.us-west-1.amazonaws.com/test.txt";);
+        Assertions.assertEquals("us-west-1", 
S3PropertyUtils.constructRegionFromUrl(props, "false", "true"));
+    }
+
+    @Test
+    void testConvertToS3Address_success() throws UserException {
+        String httpsUrl = 
"https://my-bucket.s3.us-east-1.amazonaws.com/test/key.txt";;
+        String s3Path = S3PropertyUtils.validateAndNormalizeUri(httpsUrl, 
"false", "false");
+        Assertions.assertEquals("s3://my-bucket/test/key.txt", s3Path);
+
+        String alreadyS3 = "s3://bucket-name/path/file.csv";
+        Assertions.assertEquals(alreadyS3, 
S3PropertyUtils.validateAndNormalizeUri(alreadyS3, "true", "true"));
+    }
+
+    @Test
+    void testConvertToS3Address_invalid() {
+        Assertions.assertThrows(UserException.class, () -> 
S3PropertyUtils.validateAndNormalizeUri(null, "false", "true"));
+        Assertions.assertThrows(UserException.class, () -> 
S3PropertyUtils.validateAndNormalizeUri("", "false", "false"));
+        Assertions.assertThrows(UserException.class, () -> 
S3PropertyUtils.validateAndNormalizeUri("not a uri", "true", "true"));
+    }
+}
diff --git 
a/fe/fe-core/src/test/resources/plugins/hadoop_conf/hadoop1/core-site.xml 
b/fe/fe-core/src/test/resources/plugins/hadoop_conf/hadoop1/core-site.xml
new file mode 100644
index 00000000000..7b912ff1dd2
--- /dev/null
+++ b/fe/fe-core/src/test/resources/plugins/hadoop_conf/hadoop1/core-site.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration>
+    <!-- Hadoop core-site configuration -->
+
+    <!-- Specify the URI for the NameNode -->
+    <property>
+        <name>fs.defaultFS</name>
+        <value>hdfs://localhost:9000</value>
+    </property>
+
+    <!-- Hadoop temporary directory for intermediate data -->
+    <property>
+        <name>hadoop.tmp.dir</name>
+        <value>/tmp/hadoop</value>
+    </property>
+
+    <!-- Define the Hadoop home directory -->
+    <property>
+        <name>hadoop.home.dir</name>
+        <value>/usr/local/hadoop</value>
+    </property>
+</configuration>
\ No newline at end of file
diff --git 
a/fe/fe-core/src/test/resources/plugins/hadoop_conf/hadoop1/hdfs-site.xml 
b/fe/fe-core/src/test/resources/plugins/hadoop_conf/hadoop1/hdfs-site.xml
new file mode 100644
index 00000000000..d2dee1aa899
--- /dev/null
+++ b/fe/fe-core/src/test/resources/plugins/hadoop_conf/hadoop1/hdfs-site.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration>
+    <!-- HDFS-specific configuration -->
+
+    <!-- Directory for storing HDFS data -->
+    <property>
+        <name>dfs.data.dir</name>
+        <value>/data/hdfs</value>
+    </property>
+
+    <!-- Directory for storing HDFS NameNode data -->
+    <property>
+        <name>dfs.name.dir</name>
+        <value>/data/hdfs/namenode</value>
+    </property>
+
+    <!-- Replication factor for HDFS -->
+    <property>
+        <name>dfs.replication</name>
+        <value>3</value>
+    </property>
+
+    <!-- HDFS block size -->
+    <property>
+        <name>dfs.blocksize</name>
+        <value>134217728</value> <!-- 128 MB -->
+    </property>
+    <property>
+        <name>dfs.nameservices</name>
+        <value>ns1</value>
+    </property>
+</configuration>
\ No newline at end of file
diff --git 
a/fe/fe-core/src/test/resources/plugins/hive-conf/hive1/hive-site.xml 
b/fe/fe-core/src/test/resources/plugins/hive-conf/hive1/hive-site.xml
new file mode 100644
index 00000000000..f602da5a3ba
--- /dev/null
+++ b/fe/fe-core/src/test/resources/plugins/hive-conf/hive1/hive-site.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+    <property>
+        <name>hive.metastore.warehouse.dir</name>
+        <value>/user/hive/default</value>
+    </property>
+</configuration> 
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to