github-actions[bot] commented on code in PR #61094: URL: https://github.com/apache/doris/pull/61094#discussion_r2900514641
########## fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java: ########## @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.metastore; + +import org.apache.doris.catalog.JdbcResource; +import org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.datasource.property.ConnectorProperty; +import org.apache.doris.datasource.property.storage.HdfsProperties; +import org.apache.doris.datasource.property.storage.StorageProperties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.jdbc.JdbcCatalogFactory; +import org.apache.paimon.options.CatalogOptions; + +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties { + private static final Logger LOG = LogManager.getLogger(PaimonJdbcMetaStoreProperties.class); + private static final String JDBC_PREFIX = "jdbc."; + private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>(); + + @ConnectorProperty( + names = {"uri", "paimon.jdbc.uri"}, + required = true, + description = "JDBC connection URI for the Paimon JDBC catalog." + ) + private String uri = ""; + + @ConnectorProperty( + names = {"paimon.jdbc.user", "jdbc.user"}, + required = false, + description = "Username for the Paimon JDBC catalog." + ) + private String jdbcUser; + + @ConnectorProperty( + names = {"paimon.jdbc.password", "jdbc.password"}, + required = false, + sensitive = true, + description = "Password for the Paimon JDBC catalog." + ) + private String jdbcPassword; + + @ConnectorProperty( + names = {"paimon.jdbc.driver_url", "jdbc.driver_url"}, + required = false, + description = "JDBC driver JAR file path or URL. " + + "Can be a local file name (will look in $DORIS_HOME/plugins/jdbc_drivers/) " + + "or a full URL (http://, https://, file://)." + ) + private String driverUrl; + + @ConnectorProperty( + names = {"paimon.jdbc.driver_class", "jdbc.driver_class"}, + required = false, + description = "JDBC driver class name. If specified with paimon.jdbc.driver_url, " + + "the driver will be loaded dynamically." + ) + private String driverClass; + + protected PaimonJdbcMetaStoreProperties(Map<String, String> props) { + super(props); + } + + @Override + public String getPaimonCatalogType() { + return PaimonExternalCatalog.PAIMON_JDBC; + } + + @Override + protected void checkRequiredProperties() { + super.checkRequiredProperties(); + if (StringUtils.isBlank(warehouse)) { + throw new IllegalArgumentException("Property warehouse is required."); + } + } + + @Override + public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) { + buildCatalogOptions(); + Configuration conf = new Configuration(); + for (StorageProperties storageProperties : storagePropertiesList) { + if (storageProperties.getHadoopStorageConfig() != null) { + conf.addResource(storageProperties.getHadoopStorageConfig()); + } + if (storageProperties.getType().equals(StorageProperties.Type.HDFS)) { + this.executionAuthenticator = new HadoopExecutionAuthenticator(((HdfsProperties) storageProperties) + .getHadoopAuthenticator()); + } + } + appendUserHadoopConfig(conf); + if (StringUtils.isNotBlank(driverUrl)) { + registerJdbcDriver(driverUrl, driverClass); + LOG.info("Using dynamic JDBC driver for Paimon JDBC catalog from: {}", driverUrl); + } + CatalogContext catalogContext = CatalogContext.create(catalogOptions, conf); + try { + return this.executionAuthenticator.execute(() -> CatalogFactory.createCatalog(catalogContext)); + } catch (Exception e) { + throw new RuntimeException("Failed to create Paimon catalog with JDBC metastore: " + e.getMessage(), e); + } + } + + @Override + protected void appendCustomCatalogOptions() { + catalogOptions.set(CatalogOptions.URI.key(), uri); + addIfNotBlank("jdbc.user", jdbcUser); + addIfNotBlank("jdbc.password", jdbcPassword); + appendRawJdbcCatalogOptions(); + } + + @Override + protected String getMetastoreType() { + return JdbcCatalogFactory.IDENTIFIER; + } + + private void addIfNotBlank(String key, String value) { + if (StringUtils.isNotBlank(value)) { + catalogOptions.set(key, value); + } + } + + private void appendRawJdbcCatalogOptions() { + origProps.forEach((key, value) -> { + if (key != null && key.startsWith(JDBC_PREFIX) && !catalogOptions.keySet().contains(key)) { + catalogOptions.set(key, value); + } + }); + } + + /** + * Register JDBC driver with DriverManager. + * This is necessary because DriverManager.getConnection() doesn't use Thread.contextClassLoader. + */ + private void registerJdbcDriver(String driverUrl, String driverClassName) { + try { + String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl); + URL url = new URL(fullDriverUrl); + + ClassLoader classLoader = DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> { + ClassLoader parent = getClass().getClassLoader(); + return URLClassLoader.newInstance(new URL[] {u}, parent); + }); + + if (StringUtils.isBlank(driverClassName)) { + throw new IllegalArgumentException( + "jdbc.driver_class or paimon.jdbc.driver_class is required when jdbc.driver_url " + + "or paimon.jdbc.driver_url is specified"); + } + + Class<?> loadedDriverClass = Class.forName(driverClassName, true, classLoader); + java.sql.Driver driver = (java.sql.Driver) loadedDriverClass.getDeclaredConstructor().newInstance(); + java.sql.DriverManager.registerDriver(new DriverShim(driver)); Review Comment: **[Medium] Driver leak: `DriverManager.registerDriver` called without deduplication.** `registerJdbcDriver` is called from `initializeCatalog()` which can be invoked multiple times (catalog creation, refresh, etc.). Each call registers a **new** `DriverShim` with `DriverManager`. There is no corresponding `deregisterDriver` or deduplication guard. Over time, `DriverManager`'s internal list grows unboundedly. Each `DriverShim` holds a reference to the actual driver and its `ClassLoader`, preventing garbage collection. `DriverManager.getConnection()` iterates all registered drivers, so this also has a progressive performance impact. This is a pre-existing issue copied from `IcebergJdbcMetaStoreProperties`, but it would be good to fix it here rather than propagate. Suggested fix: ```java private static final Set<URL> REGISTERED_DRIVER_URLS = ConcurrentHashMap.newKeySet(); // In registerJdbcDriver, before the registerDriver call: if (!REGISTERED_DRIVER_URLS.add(url)) { LOG.info("JDBC driver already registered for URL: {}", url); return; } ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java: ########## @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.metastore; + +import org.apache.doris.catalog.JdbcResource; +import org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.datasource.property.ConnectorProperty; +import org.apache.doris.datasource.property.storage.HdfsProperties; +import org.apache.doris.datasource.property.storage.StorageProperties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.jdbc.JdbcCatalogFactory; +import org.apache.paimon.options.CatalogOptions; + +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties { + private static final Logger LOG = LogManager.getLogger(PaimonJdbcMetaStoreProperties.class); + private static final String JDBC_PREFIX = "jdbc."; + private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>(); + + @ConnectorProperty( + names = {"uri", "paimon.jdbc.uri"}, + required = true, + description = "JDBC connection URI for the Paimon JDBC catalog." + ) + private String uri = ""; + + @ConnectorProperty( + names = {"paimon.jdbc.user", "jdbc.user"}, + required = false, + description = "Username for the Paimon JDBC catalog." + ) + private String jdbcUser; + + @ConnectorProperty( + names = {"paimon.jdbc.password", "jdbc.password"}, + required = false, + sensitive = true, + description = "Password for the Paimon JDBC catalog." + ) + private String jdbcPassword; + + @ConnectorProperty( + names = {"paimon.jdbc.driver_url", "jdbc.driver_url"}, + required = false, + description = "JDBC driver JAR file path or URL. " + + "Can be a local file name (will look in $DORIS_HOME/plugins/jdbc_drivers/) " + + "or a full URL (http://, https://, file://)." + ) + private String driverUrl; + + @ConnectorProperty( + names = {"paimon.jdbc.driver_class", "jdbc.driver_class"}, + required = false, + description = "JDBC driver class name. If specified with paimon.jdbc.driver_url, " + + "the driver will be loaded dynamically." + ) + private String driverClass; + + protected PaimonJdbcMetaStoreProperties(Map<String, String> props) { + super(props); + } + + @Override + public String getPaimonCatalogType() { + return PaimonExternalCatalog.PAIMON_JDBC; + } + + @Override + protected void checkRequiredProperties() { + super.checkRequiredProperties(); + if (StringUtils.isBlank(warehouse)) { + throw new IllegalArgumentException("Property warehouse is required."); + } + } + + @Override + public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) { + buildCatalogOptions(); + Configuration conf = new Configuration(); + for (StorageProperties storageProperties : storagePropertiesList) { + if (storageProperties.getHadoopStorageConfig() != null) { + conf.addResource(storageProperties.getHadoopStorageConfig()); + } + if (storageProperties.getType().equals(StorageProperties.Type.HDFS)) { + this.executionAuthenticator = new HadoopExecutionAuthenticator(((HdfsProperties) storageProperties) + .getHadoopAuthenticator()); + } + } + appendUserHadoopConfig(conf); + if (StringUtils.isNotBlank(driverUrl)) { + registerJdbcDriver(driverUrl, driverClass); + LOG.info("Using dynamic JDBC driver for Paimon JDBC catalog from: {}", driverUrl); + } + CatalogContext catalogContext = CatalogContext.create(catalogOptions, conf); + try { + return this.executionAuthenticator.execute(() -> CatalogFactory.createCatalog(catalogContext)); + } catch (Exception e) { + throw new RuntimeException("Failed to create Paimon catalog with JDBC metastore: " + e.getMessage(), e); + } + } + + @Override + protected void appendCustomCatalogOptions() { + catalogOptions.set(CatalogOptions.URI.key(), uri); + addIfNotBlank("jdbc.user", jdbcUser); + addIfNotBlank("jdbc.password", jdbcPassword); + appendRawJdbcCatalogOptions(); + } + + @Override + protected String getMetastoreType() { + return JdbcCatalogFactory.IDENTIFIER; + } + + private void addIfNotBlank(String key, String value) { + if (StringUtils.isNotBlank(value)) { + catalogOptions.set(key, value); + } + } + + private void appendRawJdbcCatalogOptions() { + origProps.forEach((key, value) -> { + if (key != null && key.startsWith(JDBC_PREFIX) && !catalogOptions.keySet().contains(key)) { + catalogOptions.set(key, value); + } + }); + } + + /** + * Register JDBC driver with DriverManager. + * This is necessary because DriverManager.getConnection() doesn't use Thread.contextClassLoader. + */ + private void registerJdbcDriver(String driverUrl, String driverClassName) { + try { + String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl); + URL url = new URL(fullDriverUrl); + + ClassLoader classLoader = DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> { + ClassLoader parent = getClass().getClassLoader(); + return URLClassLoader.newInstance(new URL[] {u}, parent); + }); + + if (StringUtils.isBlank(driverClassName)) { Review Comment: **[Low] Validation order: check `driverClassName` before creating ClassLoader.** The `StringUtils.isBlank(driverClassName)` check happens *after* `DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, ...)` which may create a new `URLClassLoader`. If `driverClassName` is blank, this ClassLoader is created for nothing (and cached permanently). Move this check before the `computeIfAbsent` call: ```java if (StringUtils.isBlank(driverClassName)) { throw new IllegalArgumentException(...); } ClassLoader classLoader = DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> { ... }); ``` ########## regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy: ########## @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_jdbc_catalog", "p0,external") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("Paimon test is not enabled, skip this test") + return + } + + String enabledJdbc = context.config.otherConfigs.get("enableJdbcTest") + if (enabledJdbc == null || !enabledJdbc.equalsIgnoreCase("true")) { + logger.info("Paimon JDBC catalog test requires enableJdbcTest, skip this test") + return + } + + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String minioPort = context.config.otherConfigs.get("paimon_jdbc_minio_port") + if (minioPort == null || minioPort.isEmpty()) { + minioPort = context.config.otherConfigs.get("iceberg_minio_port") + } + String jdbcPort = context.config.otherConfigs.get("pg_14_port") + if (externalEnvIp == null || externalEnvIp.isEmpty() + || minioPort == null || minioPort.isEmpty() + || jdbcPort == null || jdbcPort.isEmpty()) { + logger.info("Paimon JDBC catalog test environment is not fully configured, skip this test") + return + } + + String minioAk = context.config.otherConfigs.get("paimon_jdbc_minio_ak") + if (minioAk == null || minioAk.isEmpty()) { + minioAk = "admin" + } + String minioSk = context.config.otherConfigs.get("paimon_jdbc_minio_sk") + if (minioSk == null || minioSk.isEmpty()) { + minioSk = "password" + } + String warehouseBucket = context.config.otherConfigs.get("paimon_jdbc_warehouse_bucket") + if (warehouseBucket == null || warehouseBucket.isEmpty()) { + warehouseBucket = "warehouse" + } + + String catalogName = "test_paimon_jdbc_catalog" + String dbName = "paimon_jdbc_db" + String driverName = "postgresql-42.5.0.jar" + String driverDownloadUrl = "${getS3Url()}/regression/jdbc_driver/${driverName}" + String jdbcDriversDir = getFeConfig("jdbc_drivers_dir") + String localDriverDir = "${context.config.dataPath}/jdbc_driver" + String localDriverPath = "${localDriverDir}/${driverName}" + String sparkSeedCatalogName = "${catalogName}_seed" + + assertTrue(jdbcDriversDir != null && !jdbcDriversDir.isEmpty(), "jdbc_drivers_dir must be configured") + + def executeCommand = { String cmd, Boolean mustSuc -> + try { + logger.info("execute ${cmd}") + def proc = new ProcessBuilder("/bin/bash", "-c", cmd).redirectErrorStream(true).start() + int exitcode = proc.waitFor() + String output = proc.text + if (exitcode != 0) { + logger.info("exit code: ${exitcode}, output\n: ${output}") + if (mustSuc) { + assertTrue(false, "Execute failed: ${cmd}") + } + } + return output + } catch (IOException e) { + assertTrue(false, "Execute timeout: ${cmd}") + } + } + + executeCommand("mkdir -p ${localDriverDir}", false) + executeCommand("mkdir -p ${jdbcDriversDir}", true) + if (!new File(localDriverPath).exists()) { + executeCommand("/usr/bin/curl --max-time 600 ${driverDownloadUrl} --output ${localDriverPath}", true) + } + executeCommand("cp -f ${localDriverPath} ${jdbcDriversDir}/${driverName}", true) + + String sparkContainerName = executeCommand("docker ps --filter name=spark-iceberg --format {{.Names}}", false) + ?.trim() + if (sparkContainerName == null || sparkContainerName.isEmpty()) { + logger.info("spark-iceberg container not found, skip this test") + return + } + + def sparkPaimonJdbc = { String sqlText -> + String escapedSql = sqlText.replaceAll('"', '\\\\"') + String command = """docker exec ${sparkContainerName} spark-sql --master spark://${sparkContainerName}:7077 \ +--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions \ +--conf spark.sql.catalog.${sparkSeedCatalogName}=org.apache.paimon.spark.SparkCatalog \ Review Comment: **[Medium] Spark seed catalog uses `metastore=filesystem` but Doris catalog uses JDBC metastore.** The Doris catalog is created with `paimon.catalog.type=jdbc` backed by PostgreSQL. But the Spark seed catalog here uses `metastore=filesystem`. When Spark writes data via the filesystem metastore, the table metadata is stored on S3 as filesystem-based metadata files. The Doris JDBC catalog looks for metadata in PostgreSQL, not on S3 filesystem metadata files. This means the `INSERT` done via Spark may not be visible when querying through the Doris JDBC catalog, because the JDBC metastore doesn't know about the data written by the filesystem metastore. The Spark seed catalog should use `metastore=jdbc` pointing to the same PostgreSQL instance, e.g.: ``` --conf spark.sql.catalog.${sparkSeedCatalogName}.metastore=jdbc \ --conf spark.sql.catalog.${sparkSeedCatalogName}.uri=jdbc:postgresql://${externalEnvIp}:${jdbcPort}/postgres \ --conf spark.sql.catalog.${sparkSeedCatalogName}.jdbc.user=postgres \ --conf spark.sql.catalog.${sparkSeedCatalogName}.jdbc.password=123456 \ ``` Alternatively, if the intent is to test that JDBC catalog can read data written on the same storage path by another tool, then the test design should be documented with a clear comment explaining this, and the table creation through Doris SQL (which registers in JDBC metastore) should be sufficient for metadata. But then Spark should NOT recreate/shadow the table metadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
