github-actions[bot] commented on code in PR #61094:
URL: https://github.com/apache/doris/pull/61094#discussion_r2900514641


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.catalog.JdbcResource;
+import 
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.HdfsProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.jdbc.JdbcCatalogFactory;
+import org.apache.paimon.options.CatalogOptions;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties {
+    private static final Logger LOG = 
LogManager.getLogger(PaimonJdbcMetaStoreProperties.class);
+    private static final String JDBC_PREFIX = "jdbc.";
+    private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new 
ConcurrentHashMap<>();
+
+    @ConnectorProperty(
+            names = {"uri", "paimon.jdbc.uri"},
+            required = true,
+            description = "JDBC connection URI for the Paimon JDBC catalog."
+    )
+    private String uri = "";
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.user", "jdbc.user"},
+            required = false,
+            description = "Username for the Paimon JDBC catalog."
+    )
+    private String jdbcUser;
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.password", "jdbc.password"},
+            required = false,
+            sensitive = true,
+            description = "Password for the Paimon JDBC catalog."
+    )
+    private String jdbcPassword;
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.driver_url", "jdbc.driver_url"},
+            required = false,
+            description = "JDBC driver JAR file path or URL. "
+                    + "Can be a local file name (will look in 
$DORIS_HOME/plugins/jdbc_drivers/) "
+                    + "or a full URL (http://, https://, file://)."
+    )
+    private String driverUrl;
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.driver_class", "jdbc.driver_class"},
+            required = false,
+            description = "JDBC driver class name. If specified with 
paimon.jdbc.driver_url, "
+                    + "the driver will be loaded dynamically."
+    )
+    private String driverClass;
+
+    protected PaimonJdbcMetaStoreProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    @Override
+    public String getPaimonCatalogType() {
+        return PaimonExternalCatalog.PAIMON_JDBC;
+    }
+
+    @Override
+    protected void checkRequiredProperties() {
+        super.checkRequiredProperties();
+        if (StringUtils.isBlank(warehouse)) {
+            throw new IllegalArgumentException("Property warehouse is 
required.");
+        }
+    }
+
+    @Override
+    public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
+        buildCatalogOptions();
+        Configuration conf = new Configuration();
+        for (StorageProperties storageProperties : storagePropertiesList) {
+            if (storageProperties.getHadoopStorageConfig() != null) {
+                conf.addResource(storageProperties.getHadoopStorageConfig());
+            }
+            if 
(storageProperties.getType().equals(StorageProperties.Type.HDFS)) {
+                this.executionAuthenticator = new 
HadoopExecutionAuthenticator(((HdfsProperties) storageProperties)
+                        .getHadoopAuthenticator());
+            }
+        }
+        appendUserHadoopConfig(conf);
+        if (StringUtils.isNotBlank(driverUrl)) {
+            registerJdbcDriver(driverUrl, driverClass);
+            LOG.info("Using dynamic JDBC driver for Paimon JDBC catalog from: 
{}", driverUrl);
+        }
+        CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
conf);
+        try {
+            return this.executionAuthenticator.execute(() -> 
CatalogFactory.createCatalog(catalogContext));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create Paimon catalog with 
JDBC metastore: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void appendCustomCatalogOptions() {
+        catalogOptions.set(CatalogOptions.URI.key(), uri);
+        addIfNotBlank("jdbc.user", jdbcUser);
+        addIfNotBlank("jdbc.password", jdbcPassword);
+        appendRawJdbcCatalogOptions();
+    }
+
+    @Override
+    protected String getMetastoreType() {
+        return JdbcCatalogFactory.IDENTIFIER;
+    }
+
+    private void addIfNotBlank(String key, String value) {
+        if (StringUtils.isNotBlank(value)) {
+            catalogOptions.set(key, value);
+        }
+    }
+
+    private void appendRawJdbcCatalogOptions() {
+        origProps.forEach((key, value) -> {
+            if (key != null && key.startsWith(JDBC_PREFIX) && 
!catalogOptions.keySet().contains(key)) {
+                catalogOptions.set(key, value);
+            }
+        });
+    }
+
+    /**
+     * Register JDBC driver with DriverManager.
+     * This is necessary because DriverManager.getConnection() doesn't use 
Thread.contextClassLoader.
+     */
+    private void registerJdbcDriver(String driverUrl, String driverClassName) {
+        try {
+            String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl);
+            URL url = new URL(fullDriverUrl);
+
+            ClassLoader classLoader = 
DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> {
+                ClassLoader parent = getClass().getClassLoader();
+                return URLClassLoader.newInstance(new URL[] {u}, parent);
+            });
+
+            if (StringUtils.isBlank(driverClassName)) {
+                throw new IllegalArgumentException(
+                        "jdbc.driver_class or paimon.jdbc.driver_class is 
required when jdbc.driver_url "
+                                + "or paimon.jdbc.driver_url is specified");
+            }
+
+            Class<?> loadedDriverClass = Class.forName(driverClassName, true, 
classLoader);
+            java.sql.Driver driver = (java.sql.Driver) 
loadedDriverClass.getDeclaredConstructor().newInstance();
+            java.sql.DriverManager.registerDriver(new DriverShim(driver));

Review Comment:
   **[Medium] Driver leak: `DriverManager.registerDriver` called without 
deduplication.**
   
   `registerJdbcDriver` is called from `initializeCatalog()` which can be 
invoked multiple times (catalog creation, refresh, etc.). Each call registers a 
**new** `DriverShim` with `DriverManager`. There is no corresponding 
`deregisterDriver` or deduplication guard.
   
   Over time, `DriverManager`'s internal list grows unboundedly. Each 
`DriverShim` holds a reference to the actual driver and its `ClassLoader`, 
preventing garbage collection. `DriverManager.getConnection()` iterates all 
registered drivers, so this also has a progressive performance impact.
   
   This is a pre-existing issue copied from `IcebergJdbcMetaStoreProperties`, 
but it would be good to fix it here rather than propagate. Suggested fix:
   
   ```java
   private static final Set<URL> REGISTERED_DRIVER_URLS = 
ConcurrentHashMap.newKeySet();
   
   // In registerJdbcDriver, before the registerDriver call:
   if (!REGISTERED_DRIVER_URLS.add(url)) {
       LOG.info("JDBC driver already registered for URL: {}", url);
       return;
   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.catalog.JdbcResource;
+import 
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.HdfsProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.jdbc.JdbcCatalogFactory;
+import org.apache.paimon.options.CatalogOptions;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties {
+    private static final Logger LOG = 
LogManager.getLogger(PaimonJdbcMetaStoreProperties.class);
+    private static final String JDBC_PREFIX = "jdbc.";
+    private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new 
ConcurrentHashMap<>();
+
+    @ConnectorProperty(
+            names = {"uri", "paimon.jdbc.uri"},
+            required = true,
+            description = "JDBC connection URI for the Paimon JDBC catalog."
+    )
+    private String uri = "";
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.user", "jdbc.user"},
+            required = false,
+            description = "Username for the Paimon JDBC catalog."
+    )
+    private String jdbcUser;
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.password", "jdbc.password"},
+            required = false,
+            sensitive = true,
+            description = "Password for the Paimon JDBC catalog."
+    )
+    private String jdbcPassword;
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.driver_url", "jdbc.driver_url"},
+            required = false,
+            description = "JDBC driver JAR file path or URL. "
+                    + "Can be a local file name (will look in 
$DORIS_HOME/plugins/jdbc_drivers/) "
+                    + "or a full URL (http://, https://, file://)."
+    )
+    private String driverUrl;
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.driver_class", "jdbc.driver_class"},
+            required = false,
+            description = "JDBC driver class name. If specified with 
paimon.jdbc.driver_url, "
+                    + "the driver will be loaded dynamically."
+    )
+    private String driverClass;
+
+    protected PaimonJdbcMetaStoreProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    @Override
+    public String getPaimonCatalogType() {
+        return PaimonExternalCatalog.PAIMON_JDBC;
+    }
+
+    @Override
+    protected void checkRequiredProperties() {
+        super.checkRequiredProperties();
+        if (StringUtils.isBlank(warehouse)) {
+            throw new IllegalArgumentException("Property warehouse is 
required.");
+        }
+    }
+
+    @Override
+    public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
+        buildCatalogOptions();
+        Configuration conf = new Configuration();
+        for (StorageProperties storageProperties : storagePropertiesList) {
+            if (storageProperties.getHadoopStorageConfig() != null) {
+                conf.addResource(storageProperties.getHadoopStorageConfig());
+            }
+            if 
(storageProperties.getType().equals(StorageProperties.Type.HDFS)) {
+                this.executionAuthenticator = new 
HadoopExecutionAuthenticator(((HdfsProperties) storageProperties)
+                        .getHadoopAuthenticator());
+            }
+        }
+        appendUserHadoopConfig(conf);
+        if (StringUtils.isNotBlank(driverUrl)) {
+            registerJdbcDriver(driverUrl, driverClass);
+            LOG.info("Using dynamic JDBC driver for Paimon JDBC catalog from: 
{}", driverUrl);
+        }
+        CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
conf);
+        try {
+            return this.executionAuthenticator.execute(() -> 
CatalogFactory.createCatalog(catalogContext));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create Paimon catalog with 
JDBC metastore: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void appendCustomCatalogOptions() {
+        catalogOptions.set(CatalogOptions.URI.key(), uri);
+        addIfNotBlank("jdbc.user", jdbcUser);
+        addIfNotBlank("jdbc.password", jdbcPassword);
+        appendRawJdbcCatalogOptions();
+    }
+
+    @Override
+    protected String getMetastoreType() {
+        return JdbcCatalogFactory.IDENTIFIER;
+    }
+
+    private void addIfNotBlank(String key, String value) {
+        if (StringUtils.isNotBlank(value)) {
+            catalogOptions.set(key, value);
+        }
+    }
+
+    private void appendRawJdbcCatalogOptions() {
+        origProps.forEach((key, value) -> {
+            if (key != null && key.startsWith(JDBC_PREFIX) && 
!catalogOptions.keySet().contains(key)) {
+                catalogOptions.set(key, value);
+            }
+        });
+    }
+
+    /**
+     * Register JDBC driver with DriverManager.
+     * This is necessary because DriverManager.getConnection() doesn't use 
Thread.contextClassLoader.
+     */
+    private void registerJdbcDriver(String driverUrl, String driverClassName) {
+        try {
+            String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl);
+            URL url = new URL(fullDriverUrl);
+
+            ClassLoader classLoader = 
DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> {
+                ClassLoader parent = getClass().getClassLoader();
+                return URLClassLoader.newInstance(new URL[] {u}, parent);
+            });
+
+            if (StringUtils.isBlank(driverClassName)) {

Review Comment:
   **[Low] Validation order: check `driverClassName` before creating 
ClassLoader.**
   
   The `StringUtils.isBlank(driverClassName)` check happens *after* 
`DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, ...)` which may create a new 
`URLClassLoader`. If `driverClassName` is blank, this ClassLoader is created 
for nothing (and cached permanently).
   
   Move this check before the `computeIfAbsent` call:
   ```java
   if (StringUtils.isBlank(driverClassName)) {
       throw new IllegalArgumentException(...);
   }
   ClassLoader classLoader = DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u 
-> { ... });
   ```



##########
regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy:
##########
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_jdbc_catalog", "p0,external") {
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("Paimon test is not enabled, skip this test")
+        return
+    }
+
+    String enabledJdbc = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabledJdbc == null || !enabledJdbc.equalsIgnoreCase("true")) {
+        logger.info("Paimon JDBC catalog test requires enableJdbcTest, skip 
this test")
+        return
+    }
+
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String minioPort = 
context.config.otherConfigs.get("paimon_jdbc_minio_port")
+    if (minioPort == null || minioPort.isEmpty()) {
+        minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+    }
+    String jdbcPort = context.config.otherConfigs.get("pg_14_port")
+    if (externalEnvIp == null || externalEnvIp.isEmpty()
+            || minioPort == null || minioPort.isEmpty()
+            || jdbcPort == null || jdbcPort.isEmpty()) {
+        logger.info("Paimon JDBC catalog test environment is not fully 
configured, skip this test")
+        return
+    }
+
+    String minioAk = context.config.otherConfigs.get("paimon_jdbc_minio_ak")
+    if (minioAk == null || minioAk.isEmpty()) {
+        minioAk = "admin"
+    }
+    String minioSk = context.config.otherConfigs.get("paimon_jdbc_minio_sk")
+    if (minioSk == null || minioSk.isEmpty()) {
+        minioSk = "password"
+    }
+    String warehouseBucket = 
context.config.otherConfigs.get("paimon_jdbc_warehouse_bucket")
+    if (warehouseBucket == null || warehouseBucket.isEmpty()) {
+        warehouseBucket = "warehouse"
+    }
+
+    String catalogName = "test_paimon_jdbc_catalog"
+    String dbName = "paimon_jdbc_db"
+    String driverName = "postgresql-42.5.0.jar"
+    String driverDownloadUrl = 
"${getS3Url()}/regression/jdbc_driver/${driverName}"
+    String jdbcDriversDir = getFeConfig("jdbc_drivers_dir")
+    String localDriverDir = "${context.config.dataPath}/jdbc_driver"
+    String localDriverPath = "${localDriverDir}/${driverName}"
+    String sparkSeedCatalogName = "${catalogName}_seed"
+
+    assertTrue(jdbcDriversDir != null && !jdbcDriversDir.isEmpty(), 
"jdbc_drivers_dir must be configured")
+
+    def executeCommand = { String cmd, Boolean mustSuc ->
+        try {
+            logger.info("execute ${cmd}")
+            def proc = new ProcessBuilder("/bin/bash", "-c", 
cmd).redirectErrorStream(true).start()
+            int exitcode = proc.waitFor()
+            String output = proc.text
+            if (exitcode != 0) {
+                logger.info("exit code: ${exitcode}, output\n: ${output}")
+                if (mustSuc) {
+                    assertTrue(false, "Execute failed: ${cmd}")
+                }
+            }
+            return output
+        } catch (IOException e) {
+            assertTrue(false, "Execute timeout: ${cmd}")
+        }
+    }
+
+    executeCommand("mkdir -p ${localDriverDir}", false)
+    executeCommand("mkdir -p ${jdbcDriversDir}", true)
+    if (!new File(localDriverPath).exists()) {
+        executeCommand("/usr/bin/curl --max-time 600 ${driverDownloadUrl} 
--output ${localDriverPath}", true)
+    }
+    executeCommand("cp -f ${localDriverPath} ${jdbcDriversDir}/${driverName}", 
true)
+
+    String sparkContainerName = executeCommand("docker ps --filter 
name=spark-iceberg --format {{.Names}}", false)
+            ?.trim()
+    if (sparkContainerName == null || sparkContainerName.isEmpty()) {
+        logger.info("spark-iceberg container not found, skip this test")
+        return
+    }
+
+    def sparkPaimonJdbc = { String sqlText ->
+        String escapedSql = sqlText.replaceAll('"', '\\\\"')
+        String command = """docker exec ${sparkContainerName} spark-sql 
--master spark://${sparkContainerName}:7077 \
+--conf 
spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
 \
+--conf 
spark.sql.catalog.${sparkSeedCatalogName}=org.apache.paimon.spark.SparkCatalog \

Review Comment:
   **[Medium] Spark seed catalog uses `metastore=filesystem` but Doris catalog 
uses JDBC metastore.**
   
   The Doris catalog is created with `paimon.catalog.type=jdbc` backed by 
PostgreSQL. But the Spark seed catalog here uses `metastore=filesystem`. When 
Spark writes data via the filesystem metastore, the table metadata is stored on 
S3 as filesystem-based metadata files. The Doris JDBC catalog looks for 
metadata in PostgreSQL, not on S3 filesystem metadata files.
   
   This means the `INSERT` done via Spark may not be visible when querying 
through the Doris JDBC catalog, because the JDBC metastore doesn't know about 
the data written by the filesystem metastore.
   
   The Spark seed catalog should use `metastore=jdbc` pointing to the same 
PostgreSQL instance, e.g.:
   ```
   --conf spark.sql.catalog.${sparkSeedCatalogName}.metastore=jdbc \
   --conf 
spark.sql.catalog.${sparkSeedCatalogName}.uri=jdbc:postgresql://${externalEnvIp}:${jdbcPort}/postgres
 \
   --conf spark.sql.catalog.${sparkSeedCatalogName}.jdbc.user=postgres \
   --conf spark.sql.catalog.${sparkSeedCatalogName}.jdbc.password=123456 \
   ```
   
   Alternatively, if the intent is to test that JDBC catalog can read data 
written on the same storage path by another tool, then the test design should 
be documented with a clear comment explaining this, and the table creation 
through Doris SQL (which registers in JDBC metastore) should be sufficient for 
metadata. But then Spark should NOT recreate/shadow the table metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to