This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 7db9cb06ad3 branch-4.1: [Feature](iceberg) Support Iceberg JDBC 
Catalog #59502 (#61360)
7db9cb06ad3 is described below

commit 7db9cb06ad3461d6837df816f458bd0dc458b77e
Author: Chenjunwei <[email protected]>
AuthorDate: Tue Mar 17 00:34:21 2026 +0800

    branch-4.1: [Feature](iceberg) Support Iceberg JDBC Catalog #59502 (#61360)
    
    Cherry-pick #59502 to branch-4.1
---
 .../datasource/iceberg/IcebergExternalCatalog.java |   1 +
 .../iceberg/IcebergExternalCatalogFactory.java     |   2 +
 .../iceberg/IcebergJdbcExternalCatalog.java        |  31 ++
 .../datasource/iceberg/source/IcebergScanNode.java |   1 +
 .../metastore/IcebergJdbcMetaStoreProperties.java  | 310 ++++++++++++++++++++
 .../metastore/IcebergPropertiesFactory.java        |   1 +
 .../org/apache/doris/persist/gson/GsonUtils.java   |   2 +
 .../IcebergJdbcMetaStorePropertiesTest.java        |  83 ++++++
 .../iceberg/test_iceberg_jdbc_catalog.out          |  42 +++
 .../iceberg/test_iceberg_jdbc_catalog.groovy       | 316 +++++++++++++++++++++
 10 files changed, 789 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index ee8ad8b4fc0..66aabd58a33 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -50,6 +50,7 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     public static final String ICEBERG_HADOOP = "hadoop";
     public static final String ICEBERG_GLUE = "glue";
     public static final String ICEBERG_DLF = "dlf";
+    public static final String ICEBERG_JDBC = "jdbc";
     public static final String ICEBERG_S3_TABLES = "s3tables";
     public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
     public static final String ICEBERG_TABLE_CACHE_ENABLE = 
"meta.cache.iceberg.table.enable";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
index 748c0805393..824d20e7000 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
@@ -39,6 +39,8 @@ public class IcebergExternalCatalogFactory {
                 return new IcebergGlueExternalCatalog(catalogId, name, 
resource, props, comment);
             case IcebergExternalCatalog.ICEBERG_DLF:
                 return new IcebergDLFExternalCatalog(catalogId, name, 
resource, props, comment);
+            case IcebergExternalCatalog.ICEBERG_JDBC:
+                return new IcebergJdbcExternalCatalog(catalogId, name, 
resource, props, comment);
             case IcebergExternalCatalog.ICEBERG_HADOOP:
                 return new IcebergHadoopExternalCatalog(catalogId, name, 
resource, props, comment);
             case IcebergExternalCatalog.ICEBERG_S3_TABLES:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java
new file mode 100644
index 00000000000..aeb2fd9deec
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergJdbcExternalCatalog.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.datasource.CatalogProperty;
+
+import java.util.Map;
+
+public class IcebergJdbcExternalCatalog extends IcebergExternalCatalog {
+
+    public IcebergJdbcExternalCatalog(long catalogId, String name, String 
resource, Map<String, String> props,
+            String comment) {
+        super(catalogId, name, comment);
+        catalogProperty = new CatalogProperty(resource, props);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index bcb89d3f221..624de1fde9d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -179,6 +179,7 @@ public class IcebergScanNode extends FileQueryScanNode {
                 case IcebergExternalCatalog.ICEBERG_DLF:
                 case IcebergExternalCatalog.ICEBERG_GLUE:
                 case IcebergExternalCatalog.ICEBERG_HADOOP:
+                case IcebergExternalCatalog.ICEBERG_JDBC:
                 case IcebergExternalCatalog.ICEBERG_S3_TABLES:
                     source = new IcebergApiSource((IcebergExternalTable) 
table, desc, columnNameToRange);
                     break;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java
new file mode 100644
index 00000000000..5c81532edd4
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java
@@ -0,0 +1,310 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.catalog.JdbcResource;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import 
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.aws.AwsClientProperties;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergJdbcMetaStoreProperties.class);
+
+    private static final String JDBC_PREFIX = "jdbc.";
+    private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new 
ConcurrentHashMap<>();
+
+    private Map<String, String> icebergJdbcCatalogProperties;
+
+    @ConnectorProperty(
+            names = {"uri", "iceberg.jdbc.uri"},
+            required = true,
+            description = "JDBC connection URI for the Iceberg JDBC catalog."
+    )
+    private String uri = "";
+
+    @ConnectorProperty(
+            names = {"iceberg.jdbc.user"},
+            required = false,
+            description = "Username for the Iceberg JDBC catalog."
+    )
+    private String jdbcUser;
+
+    @ConnectorProperty(
+            names = {"iceberg.jdbc.password"},
+            required = false,
+            sensitive = true,
+            description = "Password for the Iceberg JDBC catalog."
+    )
+    private String jdbcPassword;
+
+    @ConnectorProperty(
+            names = {"iceberg.jdbc.init-catalog-tables"},
+            required = false,
+            description = "Whether to create catalog tables if they do not 
exist."
+    )
+    private String jdbcInitCatalogTables;
+
+    @ConnectorProperty(
+            names = {"iceberg.jdbc.schema-version"},
+            required = false,
+            description = "Iceberg JDBC catalog schema version (V0/V1)."
+    )
+    private String jdbcSchemaVersion;
+
+    @ConnectorProperty(
+            names = {"iceberg.jdbc.strict-mode"},
+            required = false,
+            description = "Whether to enforce strict JDBC catalog schema 
checks."
+    )
+    private String jdbcStrictMode;
+
+    @ConnectorProperty(
+            names = {"iceberg.jdbc.driver_url"},
+            required = false,
+            description = "JDBC driver JAR file path or URL. "
+                    + "Can be a local file name (will look in 
$DORIS_HOME/plugins/jdbc_drivers/) "
+                    + "or a full URL (http://, https://, file://)."
+    )
+    private String driverUrl;
+
+    @ConnectorProperty(
+            names = {"iceberg.jdbc.driver_class"},
+            required = false,
+            description = "JDBC driver class name. If not specified, will be 
auto-detected from the JDBC URI."
+    )
+    private String driverClass;
+
+    public IcebergJdbcMetaStoreProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    @Override
+    public String getIcebergCatalogType() {
+        return IcebergExternalCatalog.ICEBERG_JDBC;
+    }
+
+    @Override
+    public void initNormalizeAndCheckProps() {
+        super.initNormalizeAndCheckProps();
+        initIcebergJdbcCatalogProperties();
+    }
+
+    @Override
+    protected void checkRequiredProperties() {
+        super.checkRequiredProperties();
+        if (StringUtils.isBlank(warehouse)) {
+            throw new IllegalArgumentException("Property warehouse is 
required.");
+        }
+    }
+
+    @Override
+    public Catalog initCatalog(String catalogName, Map<String, String> 
catalogProps,
+            List<StorageProperties> storagePropertiesList) {
+        Map<String, String> fileIOProperties = Maps.newHashMap();
+        Configuration conf = new Configuration();
+        toFileIOProperties(storagePropertiesList, fileIOProperties, conf);
+
+        Map<String, String> options = 
Maps.newHashMap(getIcebergJdbcCatalogProperties());
+        options.putAll(fileIOProperties);
+
+        // Support dynamic JDBC driver loading
+        // We need to register the driver with DriverManager because Iceberg 
uses DriverManager.getConnection()
+        // which doesn't respect Thread.contextClassLoader
+        if (StringUtils.isNotBlank(driverUrl)) {
+            registerJdbcDriver(driverUrl, driverClass);
+            LOG.info("Using dynamic JDBC driver from: {}", driverUrl);
+        }
+        return CatalogUtil.buildIcebergCatalog(catalogName, options, conf);
+    }
+
+    /**
+     * Register JDBC driver with DriverManager.
+     * This is necessary because DriverManager.getConnection() doesn't use 
Thread.contextClassLoader,
+     * it uses the caller's ClassLoader. By registering the driver, 
DriverManager can find it.
+     *
+     * @param driverUrl Path or URL to the JDBC driver JAR
+     * @param driverClassName Driver class name to register
+     */
+    private void registerJdbcDriver(String driverUrl, String driverClassName) {
+        try {
+            String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl);
+            URL url = new URL(fullDriverUrl);
+
+            ClassLoader classLoader = 
DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> {
+                ClassLoader parent = getClass().getClassLoader();
+                return URLClassLoader.newInstance(new URL[]{u}, parent);
+            });
+
+            if (StringUtils.isBlank(driverClassName)) {
+                throw new IllegalArgumentException("driver_class is required 
when driver_url is specified");
+            }
+
+            // Load the driver class and register it with DriverManager
+            Class<?> driverClass = Class.forName(driverClassName, true, 
classLoader);
+            java.sql.Driver driver = (java.sql.Driver) 
driverClass.getDeclaredConstructor().newInstance();
+
+            // Wrap with a shim driver because DriverManager refuses to use a 
driver not loaded by system classloader
+            java.sql.DriverManager.registerDriver(new DriverShim(driver));
+            LOG.info("Successfully registered JDBC driver: {} from {}", 
driverClassName, fullDriverUrl);
+
+        } catch (MalformedURLException e) {
+            throw new IllegalArgumentException("Invalid driver URL: " + 
driverUrl, e);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Failed to load JDBC driver 
class: " + driverClassName, e);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to register JDBC driver: " + 
driverClassName, e);
+        }
+    }
+
+    /**
+     * A shim driver that wraps the actual driver loaded from a custom 
ClassLoader.
+     * This is needed because DriverManager refuses to use a driver that 
wasn't loaded by the system classloader.
+     */
+    private static class DriverShim implements java.sql.Driver {
+        private final java.sql.Driver delegate;
+
+        DriverShim(java.sql.Driver delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public java.sql.Connection connect(String url, java.util.Properties 
info) throws java.sql.SQLException {
+            return delegate.connect(url, info);
+        }
+
+        @Override
+        public boolean acceptsURL(String url) throws java.sql.SQLException {
+            return delegate.acceptsURL(url);
+        }
+
+        @Override
+        public java.sql.DriverPropertyInfo[] getPropertyInfo(String url, 
java.util.Properties info)
+                throws java.sql.SQLException {
+            return delegate.getPropertyInfo(url, info);
+        }
+
+        @Override
+        public int getMajorVersion() {
+            return delegate.getMajorVersion();
+        }
+
+        @Override
+        public int getMinorVersion() {
+            return delegate.getMinorVersion();
+        }
+
+        @Override
+        public boolean jdbcCompliant() {
+            return delegate.jdbcCompliant();
+        }
+
+        @Override
+        public java.util.logging.Logger getParentLogger() throws 
java.sql.SQLFeatureNotSupportedException {
+            return delegate.getParentLogger();
+        }
+    }
+
+    public Map<String, String> getIcebergJdbcCatalogProperties() {
+        return Collections.unmodifiableMap(icebergJdbcCatalogProperties);
+    }
+
+    private void initIcebergJdbcCatalogProperties() {
+        icebergJdbcCatalogProperties = new HashMap<>();
+        icebergJdbcCatalogProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC);
+        icebergJdbcCatalogProperties.put(CatalogProperties.URI, uri);
+        if (StringUtils.isNotBlank(warehouse)) {
+            
icebergJdbcCatalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, 
warehouse);
+        }
+        addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.user", jdbcUser);
+        addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.password", 
jdbcPassword);
+        addIfNotBlank(icebergJdbcCatalogProperties, 
"jdbc.init-catalog-tables", jdbcInitCatalogTables);
+        addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.schema-version", 
jdbcSchemaVersion);
+        addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.strict-mode", 
jdbcStrictMode);
+
+        if (origProps != null) {
+            for (Map.Entry<String, String> entry : origProps.entrySet()) {
+                String key = entry.getKey();
+                if (key != null && key.startsWith(JDBC_PREFIX)
+                        && !icebergJdbcCatalogProperties.containsKey(key)) {
+                    icebergJdbcCatalogProperties.put(key, entry.getValue());
+                }
+            }
+        }
+    }
+
+    private static void addIfNotBlank(Map<String, String> props, String key, 
String value) {
+        if (StringUtils.isNotBlank(value)) {
+            props.put(key, value);
+        }
+    }
+
+    private static void toFileIOProperties(List<StorageProperties> 
storagePropertiesList,
+            Map<String, String> fileIOProperties, Configuration conf) {
+        for (StorageProperties storageProperties : storagePropertiesList) {
+            if (storageProperties instanceof AbstractS3CompatibleProperties) {
+                toS3FileIOProperties((AbstractS3CompatibleProperties) 
storageProperties, fileIOProperties);
+            }
+            if (storageProperties.getHadoopStorageConfig() != null) {
+                conf.addResource(storageProperties.getHadoopStorageConfig());
+            }
+        }
+    }
+
+    private static void toS3FileIOProperties(AbstractS3CompatibleProperties 
s3Properties,
+            Map<String, String> options) {
+        if (StringUtils.isNotBlank(s3Properties.getEndpoint())) {
+            options.put(S3FileIOProperties.ENDPOINT, 
s3Properties.getEndpoint());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) {
+            options.put(S3FileIOProperties.PATH_STYLE_ACCESS, 
s3Properties.getUsePathStyle());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getRegion())) {
+            options.put(AwsClientProperties.CLIENT_REGION, 
s3Properties.getRegion());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getAccessKey())) {
+            options.put(S3FileIOProperties.ACCESS_KEY_ID, 
s3Properties.getAccessKey());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getSecretKey())) {
+            options.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
s3Properties.getSecretKey());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
+            options.put(S3FileIOProperties.SESSION_TOKEN, 
s3Properties.getSessionToken());
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java
index 64fd28216cf..333c6c44806 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergPropertiesFactory.java
@@ -43,6 +43,7 @@ public class IcebergPropertiesFactory extends 
AbstractMetastorePropertiesFactory
         register("hadoop", IcebergFileSystemMetaStoreProperties::new);
         register("s3tables", IcebergS3TablesMetaStoreProperties::new);
         register("dlf", IcebergAliyunDLFMetaStoreProperties::new);
+        register("jdbc", IcebergJdbcMetaStoreProperties::new);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 6c495bc78c1..db06dada60d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -145,6 +145,7 @@ import 
org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergHadoopExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergJdbcExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergS3TablesExternalCatalog;
 import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
@@ -419,6 +420,7 @@ public class GsonUtils {
                 .registerSubtype(IcebergRestExternalCatalog.class, 
IcebergRestExternalCatalog.class.getSimpleName())
                 .registerSubtype(IcebergDLFExternalCatalog.class, 
IcebergDLFExternalCatalog.class.getSimpleName())
                 .registerSubtype(IcebergHadoopExternalCatalog.class, 
IcebergHadoopExternalCatalog.class.getSimpleName())
+                .registerSubtype(IcebergJdbcExternalCatalog.class, 
IcebergJdbcExternalCatalog.class.getSimpleName())
                 .registerSubtype(IcebergS3TablesExternalCatalog.class,
                         IcebergS3TablesExternalCatalog.class.getSimpleName())
                 .registerSubtype(PaimonExternalCatalog.class, 
PaimonExternalCatalog.class.getSimpleName())
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java
new file mode 100644
index 00000000000..b35782b2033
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IcebergJdbcMetaStorePropertiesTest {
+
+    @Test
+    public void testBasicJdbcProperties() {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", "jdbc:mysql://localhost:3306/iceberg");
+        props.put("warehouse", "s3://warehouse/path");
+        props.put("jdbc.user", "iceberg");
+        props.put("jdbc.password", "secret");
+
+        IcebergJdbcMetaStoreProperties jdbcProps = new 
IcebergJdbcMetaStoreProperties(props);
+        jdbcProps.initNormalizeAndCheckProps();
+
+        Map<String, String> catalogProps = 
jdbcProps.getIcebergJdbcCatalogProperties();
+        Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC,
+                catalogProps.get(CatalogUtil.ICEBERG_CATALOG_TYPE));
+        Assertions.assertEquals("jdbc:mysql://localhost:3306/iceberg", 
catalogProps.get(CatalogProperties.URI));
+        Assertions.assertEquals("s3://warehouse/path", 
catalogProps.get(CatalogProperties.WAREHOUSE_LOCATION));
+        Assertions.assertEquals("iceberg", catalogProps.get("jdbc.user"));
+        Assertions.assertEquals("secret", catalogProps.get("jdbc.password"));
+    }
+
+    @Test
+    public void testJdbcPrefixPassthrough() {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", "jdbc:mysql://localhost:3306/iceberg");
+        props.put("warehouse", "s3://warehouse/path");
+        props.put("jdbc.useSSL", "true");
+        props.put("jdbc.verifyServerCertificate", "true");
+
+        IcebergJdbcMetaStoreProperties jdbcProps = new 
IcebergJdbcMetaStoreProperties(props);
+        jdbcProps.initNormalizeAndCheckProps();
+
+        Map<String, String> catalogProps = 
jdbcProps.getIcebergJdbcCatalogProperties();
+        Assertions.assertEquals("true", catalogProps.get("jdbc.useSSL"));
+        Assertions.assertEquals("true", 
catalogProps.get("jdbc.verifyServerCertificate"));
+    }
+
+    @Test
+    public void testMissingWarehouse() {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", "jdbc:mysql://localhost:3306/iceberg");
+
+        IcebergJdbcMetaStoreProperties jdbcProps = new 
IcebergJdbcMetaStoreProperties(props);
+        Assertions.assertThrows(IllegalArgumentException.class, 
jdbcProps::initNormalizeAndCheckProps);
+    }
+
+    @Test
+    public void testMissingUri() {
+        Map<String, String> props = new HashMap<>();
+        props.put("warehouse", "s3://warehouse/path");
+
+        IcebergJdbcMetaStoreProperties jdbcProps = new 
IcebergJdbcMetaStoreProperties(props);
+        Assertions.assertThrows(IllegalArgumentException.class, 
jdbcProps::initNormalizeAndCheckProps);
+    }
+}
diff --git 
a/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out
new file mode 100644
index 00000000000..9e7a05f3757
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_jdbc_catalog.out
@@ -0,0 +1,42 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !datatypes_select --
+false  2       200000000000    2.5     3.5     234.56  world   2025-01-02      
2025-01-02T11:00
+true   1       100000000000    1.5     2.5     123.45  hello   2025-01-01      
2025-01-01T10:00
+true   3       300000000000    3.5     4.5     345.67  test    2025-01-03      
2025-01-03T12:00
+
+-- !datatypes_count --
+3
+
+-- !datatypes_filter --
+1      hello
+3      test
+
+-- !partition_select --
+1      Item1   A       2025-01-01
+2      Item2   A       2025-01-01
+3      Item3   B       2025-01-02
+4      Item4   B       2025-01-02
+5      Item5   A       2025-01-03
+
+-- !partition_filter --
+1      Item1   A       2025-01-01
+2      Item2   A       2025-01-01
+5      Item5   A       2025-01-03
+
+-- !sys_snapshots --
+1
+
+-- !sys_history --
+1
+
+-- !after_overwrite --
+1      Item1   A       2025-01-01
+2      Item2   A       2025-01-01
+3      Item3   B       2025-01-02
+4      Item4   B       2025-01-02
+5      Item5   A       2025-01-03
+
+-- !mysql_select --
+1      Alice   2025-01-01T10:00
+2      Bob     2025-01-02T11:00
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy
new file mode 100644
index 00000000000..412d305da1c
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_jdbc_catalog.groovy
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_jdbc_catalog", 
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("Iceberg test is not enabled, skip this test")
+        return;
+    }
+
+    String enabledJdbc = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabledJdbc == null || !enabledJdbc.equalsIgnoreCase("true")) {
+        logger.info("Iceberg JDBC catalog test requires enableJdbcTest, skip 
this test")
+        return;
+    }
+
+    // Get test environment configuration
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String jdbc_port = context.config.otherConfigs.get("pg_14_port")
+    
+    // JDBC Catalog specific test - uses PostgreSQL as the metadata store
+    // If PostgreSQL port is not configured, this test will be skipped
+    if (jdbc_port == null || jdbc_port.isEmpty()) {
+        logger.info("Iceberg JDBC catalog PostgreSQL port not configured 
(pg_14_port), skip this test")
+        return;
+    }
+
+    if (minio_port == null || minio_port.isEmpty() || externalEnvIp == null) {
+        logger.info("Iceberg test environment not fully configured, skip this 
test")
+        return;
+    }
+
+    String catalog_name = "test_iceberg_jdbc_catalog"
+    String db_name = "jdbc_test_db"
+    String driver_name = "postgresql-42.5.0.jar"
+    String driver_download_url = 
"${getS3Url()}/regression/jdbc_driver/${driver_name}"
+    String jdbc_drivers_dir = getFeConfig("jdbc_drivers_dir")
+    String local_driver_dir = "${context.config.dataPath}/jdbc_driver"
+    String local_driver_path = "${local_driver_dir}/${driver_name}"
+    String pg_db = "postgres"
+    String mysql_db = "iceberg_db"
+
+    // MySQL driver config
+    String mysql_driver_name = "mysql-connector-java-5.1.49-v2.jar"
+    String mysql_driver_download_url = 
"${getS3Url()}/regression/jdbc_driver/mysql-connector-java-5.1.49.jar"
+    String local_mysql_driver_path = "${local_driver_dir}/${mysql_driver_name}"
+
+    def executeCommand = { String cmd, Boolean mustSuc ->
+        try {
+            logger.info("execute ${cmd}")
+            def proc = new ProcessBuilder("/bin/bash", "-c", 
cmd).redirectErrorStream(true).start()
+            int exitcode = proc.waitFor()
+            if (exitcode != 0) {
+                logger.info("exit code: ${exitcode}, output\n: ${proc.text}")
+                if (mustSuc == true) {
+                    assertTrue(false, "Execute failed: ${cmd}")
+                }
+            }
+        } catch (IOException e) {
+            assertTrue(false, "Execute timeout: ${cmd}")
+        }
+    }
+
+    // Ensure the PostgreSQL JDBC driver is available on all FE/BE nodes.
+    def host_ips = new ArrayList()
+    String[][] backends = sql """ show backends """
+    for (def b in backends) {
+        host_ips.add(b[1])
+    }
+    String[][] frontends = sql """ show frontends """
+    for (def f in frontends) {
+        host_ips.add(f[1])
+    }
+    host_ips = host_ips.unique()
+
+    executeCommand("mkdir -p ${local_driver_dir}", false)
+    if (!new File(local_driver_path).exists()) {
+        executeCommand("/usr/bin/curl --max-time 600 ${driver_download_url} 
--output ${local_driver_path}", true)
+    }
+    if (!new File(local_mysql_driver_path).exists()) {
+        executeCommand("/usr/bin/curl --max-time 600 
${mysql_driver_download_url} --output ${local_mysql_driver_path}", true)
+    }
+    for (def ip in host_ips) {
+        executeCommand("ssh -o StrictHostKeyChecking=no root@${ip} \"mkdir -p 
${jdbc_drivers_dir}\"", false)
+        scpFiles("root", ip, local_driver_path, jdbc_drivers_dir, false)
+        scpFiles("root", ip, local_mysql_driver_path, jdbc_drivers_dir, false)
+    }
+    
+    try {
+        // Clean up existing catalog
+        sql """DROP CATALOG IF EXISTS ${catalog_name}"""
+
+        // Create Iceberg JDBC Catalog with PostgreSQL backend and MinIO 
storage
+        sql """
+            CREATE CATALOG ${catalog_name} PROPERTIES (
+                'type' = 'iceberg',
+                'iceberg.catalog.type' = 'jdbc',
+                'uri' = 
'jdbc:postgresql://${externalEnvIp}:${jdbc_port}/${pg_db}',
+                'warehouse' = 's3://warehouse/jdbc_wh/',
+                'iceberg.jdbc.driver_url' = '${driver_name}',
+                'iceberg.jdbc.driver_class' = 'org.postgresql.Driver',
+                'iceberg.jdbc.user' = 'postgres',
+                'iceberg.jdbc.password' = '123456',
+                'iceberg.jdbc.init-catalog-tables' = 'true',
+                'iceberg.jdbc.schema-version' = 'V1',
+                's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
+                's3.access_key' = 'admin',
+                's3.secret_key' = 'password',
+                's3.region' = 'us-east-1'
+            )
+        """
+
+        // Switch to the catalog
+        sql """SWITCH ${catalog_name}"""
+
+        // Test: Show catalogs
+        def catalogs = sql """SHOW CATALOGS"""
+        assertTrue(catalogs.toString().contains(catalog_name))
+
+        // Test: Create database
+        sql """DROP DATABASE IF EXISTS ${db_name} FORCE"""
+        sql """CREATE DATABASE ${db_name}"""
+        
+        def databases = sql """SHOW DATABASES"""
+        assertTrue(databases.toString().contains(db_name))
+
+        sql """USE ${db_name}"""
+
+        // Test: Create non-partitioned table with various data types
+        sql """DROP TABLE IF EXISTS test_datatypes"""
+        sql """
+            CREATE TABLE test_datatypes (
+                c_boolean BOOLEAN,
+                c_int INT,
+                c_bigint BIGINT,
+                c_float FLOAT,
+                c_double DOUBLE,
+                c_decimal DECIMAL(10, 2),
+                c_string STRING,
+                c_date DATE,
+                c_datetime DATETIME
+            ) PROPERTIES (
+                'write-format' = 'parquet',
+                'compression-codec' = 'zstd'
+            )
+        """
+
+        def tables = sql """SHOW TABLES"""
+        assertTrue(tables.toString().contains("test_datatypes"))
+
+        // Test: Insert data with various types
+        sql """
+            INSERT INTO test_datatypes VALUES
+            (true, 1, 100000000000, 1.5, 2.5, 123.45, 'hello', '2025-01-01', 
'2025-01-01 10:00:00'),
+            (false, 2, 200000000000, 2.5, 3.5, 234.56, 'world', '2025-01-02', 
'2025-01-02 11:00:00'),
+            (true, 3, 300000000000, 3.5, 4.5, 345.67, 'test', '2025-01-03', 
'2025-01-03 12:00:00')
+        """
+
+        // Test: Query data with different data types
+        order_qt_datatypes_select """SELECT * FROM test_datatypes ORDER BY 
c_int"""
+        order_qt_datatypes_count """SELECT count(*) FROM test_datatypes"""
+        order_qt_datatypes_filter """SELECT c_int, c_string FROM 
test_datatypes WHERE c_boolean = true ORDER BY c_int"""
+
+        // Test: Create partitioned table
+        sql """DROP TABLE IF EXISTS test_partitioned"""
+        sql """
+            CREATE TABLE test_partitioned (
+                id INT,
+                name STRING,
+                category STRING,
+                event_date DATE
+            ) 
+            PARTITION BY LIST (category) ()
+            PROPERTIES (
+                'write-format' = 'parquet'
+            )
+        """
+
+        // Test: Insert into partitioned table
+        sql """
+            INSERT INTO test_partitioned VALUES
+            (1, 'Item1', 'A', '2025-01-01'),
+            (2, 'Item2', 'A', '2025-01-01'),
+            (3, 'Item3', 'B', '2025-01-02'),
+            (4, 'Item4', 'B', '2025-01-02'),
+            (5, 'Item5', 'A', '2025-01-03')
+        """
+
+        order_qt_partition_select """SELECT * FROM test_partitioned ORDER BY 
id"""
+        order_qt_partition_filter """SELECT * FROM test_partitioned WHERE 
category = 'A' ORDER BY id"""
+
+        // Test: System tables
+        order_qt_sys_snapshots """SELECT count(*) FROM 
test_datatypes\$snapshots"""
+        order_qt_sys_history """SELECT count(*) FROM test_datatypes\$history"""
+
+        // Test: DESCRIBE TABLE
+        def desc = sql """DESCRIBE test_datatypes"""
+        assertTrue(desc.toString().contains("c_int"))
+        assertTrue(desc.toString().contains("c_string"))
+
+        // Test: INSERT OVERWRITE
+        sql """
+            INSERT OVERWRITE TABLE test_partitioned
+            SELECT * FROM test_partitioned WHERE category = 'A'
+        """
+        order_qt_after_overwrite """SELECT * FROM test_partitioned ORDER BY 
id"""
+
+        // Test: Drop table
+        sql """DROP TABLE IF EXISTS test_datatypes"""
+        sql """DROP TABLE IF EXISTS test_partitioned"""
+
+        // Test: Drop database
+        sql """DROP DATABASE IF EXISTS ${db_name} FORCE"""
+
+        logger.info("Iceberg JDBC Catalog test completed successfully")
+
+        // MySQL Catalog Test
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+        if (mysql_port != null) {
+            // Clean up MySQL database to remove old metadata
+            // This prevents issues where the database contains metadata 
pointing to invalid S3 locations
+            String cleanupCmd = "mysql -h ${externalEnvIp} -P ${mysql_port} -u 
root -p123456 -e 'DROP DATABASE IF EXISTS iceberg_db; CREATE DATABASE 
iceberg_db;'"
+            executeCommand(cleanupCmd, false)
+
+            String mysql_catalog_name = "iceberg_jdbc_mysql"
+            try {
+                sql """DROP CATALOG IF EXISTS ${mysql_catalog_name}"""
+                sql """
+                    CREATE CATALOG ${mysql_catalog_name} PROPERTIES (
+                        'type' = 'iceberg',
+                        'iceberg.catalog.type' = 'jdbc',
+                        'uri' = 
'jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysql_db}',
+                        'warehouse' = 's3://warehouse/jdbc_wh_mysql/',
+                        'iceberg.jdbc.driver_url' = 
'file://${jdbc_drivers_dir}/${mysql_driver_name}',
+                        'iceberg.jdbc.driver_class' = 'com.mysql.jdbc.Driver',
+                        'iceberg.jdbc.user' = 'root',
+                        'iceberg.jdbc.password' = '123456',
+                        'iceberg.jdbc.init-catalog-tables' = 'true',
+                        'iceberg.jdbc.schema-version' = 'V1',
+                        's3.endpoint' = 
'http://${externalEnvIp}:${minio_port}',
+                        's3.access_key' = 'admin',
+                        's3.secret_key' = 'password',
+                        's3.region' = 'us-east-1'
+                    )
+                """
+                
+                sql """SWITCH ${mysql_catalog_name}"""
+                
+                String mysql_db_name = "mysql_test_db"
+                sql """DROP DATABASE IF EXISTS ${mysql_db_name} FORCE"""
+                sql """CREATE DATABASE ${mysql_db_name}"""
+                sql """USE ${mysql_db_name}"""
+                
+                sql """DROP TABLE IF EXISTS test_mysql_catalog"""
+                sql """
+                    CREATE TABLE test_mysql_catalog (
+                        id INT,
+                        name STRING,
+                        ts DATETIME
+                    ) PROPERTIES (
+                        'write-format' = 'parquet'
+                    )
+                """
+                
+                sql """
+                    INSERT INTO test_mysql_catalog VALUES
+                    (1, 'Alice', '2025-01-01 10:00:00'),
+                    (2, 'Bob', '2025-01-02 11:00:00')
+                """
+                
+                order_qt_mysql_select """SELECT * FROM test_mysql_catalog 
ORDER BY id"""
+                
+                sql """DROP TABLE IF EXISTS test_mysql_catalog"""
+                sql """DROP DATABASE IF EXISTS ${mysql_db_name} FORCE"""
+                
+                logger.info("Iceberg JDBC Catalog (MySQL) test completed 
successfully")
+            } catch (Exception e) {
+                logger.warn("MySQL Catalog test failed: ${e.message}")
+                // Don't fail the whole suite if MySQL is optional or 
misconfigured
+                // But user asked for it, so maybe we should let it fail or 
log error
+                throw e
+            } finally {
+                try {
+                    sql """SWITCH internal"""
+                    sql """DROP CATALOG IF EXISTS ${mysql_catalog_name}"""
+                } catch (Exception e) {
+                    logger.warn("Failed to cleanup MySQL catalog: 
${e.message}")
+                }
+            }
+        }
+
+    } finally {
+        // Cleanup
+        try {
+            sql """SWITCH internal"""
+            sql """DROP CATALOG IF EXISTS ${catalog_name}"""
+        } catch (Exception e) {
+            logger.warn("Failed to cleanup catalog: ${e.message}")
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to