Copilot commented on code in PR #61694: URL: https://github.com/apache/doris/pull/61694#discussion_r2985455644
########## fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java: ########## @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.metastore; + +import org.apache.doris.catalog.JdbcResource; +import org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.datasource.property.ConnectorProperty; +import org.apache.doris.datasource.property.storage.HdfsProperties; +import org.apache.doris.datasource.property.storage.StorageProperties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.jdbc.JdbcCatalogFactory; +import org.apache.paimon.options.CatalogOptions; + +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties { + private static final Logger LOG = LogManager.getLogger(PaimonJdbcMetaStoreProperties.class); + private static final String JDBC_PREFIX = "jdbc."; + private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>(); + private static final Set<String> REGISTERED_DRIVER_KEYS = ConcurrentHashMap.newKeySet(); + + @ConnectorProperty( + names = {"uri", "paimon.jdbc.uri"}, + required = true, + description = "JDBC connection URI for the Paimon JDBC catalog." + ) + private String uri = ""; + + @ConnectorProperty( + names = {"paimon.jdbc.user", "jdbc.user"}, + required = false, + description = "Username for the Paimon JDBC catalog." + ) + private String jdbcUser; + + @ConnectorProperty( + names = {"paimon.jdbc.password", "jdbc.password"}, + required = false, + sensitive = true, + description = "Password for the Paimon JDBC catalog." + ) + private String jdbcPassword; + + @ConnectorProperty( + names = {"paimon.jdbc.driver_url", "jdbc.driver_url"}, + required = false, + description = "JDBC driver JAR file path or URL. " + + "Can be a local file name (will look in $DORIS_HOME/plugins/jdbc_drivers/) " + + "or a full URL (http://, https://, file://)." + ) + private String driverUrl; + + @ConnectorProperty( + names = {"paimon.jdbc.driver_class", "jdbc.driver_class"}, + required = false, + description = "JDBC driver class name. If specified with paimon.jdbc.driver_url, " + + "the driver will be loaded dynamically." + ) + private String driverClass; + + protected PaimonJdbcMetaStoreProperties(Map<String, String> props) { + super(props); + } + + @Override + public String getPaimonCatalogType() { + return PaimonExternalCatalog.PAIMON_JDBC; + } + + @Override + protected void checkRequiredProperties() { + super.checkRequiredProperties(); + if (StringUtils.isBlank(warehouse)) { + throw new IllegalArgumentException("Property warehouse is required."); + } + } + + @Override + public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) { + buildCatalogOptions(); + Configuration conf = new Configuration(); + for (StorageProperties storageProperties : storagePropertiesList) { + if (storageProperties.getHadoopStorageConfig() != null) { + conf.addResource(storageProperties.getHadoopStorageConfig()); + } + if (storageProperties.getType().equals(StorageProperties.Type.HDFS)) { + this.executionAuthenticator = new HadoopExecutionAuthenticator(((HdfsProperties) storageProperties) + .getHadoopAuthenticator()); + } + } + appendUserHadoopConfig(conf); + if (StringUtils.isNotBlank(driverUrl)) { + registerJdbcDriver(driverUrl, driverClass); + LOG.info("Using dynamic JDBC driver for Paimon JDBC catalog from: {}", driverUrl); + } + CatalogContext catalogContext = CatalogContext.create(catalogOptions, conf); + try { + return this.executionAuthenticator.execute(() -> CatalogFactory.createCatalog(catalogContext)); + } catch (Exception e) { + throw new RuntimeException("Failed to create Paimon catalog with JDBC metastore: " + e.getMessage(), e); + } + } + + @Override + protected void appendCustomCatalogOptions() { + catalogOptions.set(CatalogOptions.URI.key(), uri); + addIfNotBlank("jdbc.user", jdbcUser); + addIfNotBlank("jdbc.password", jdbcPassword); + appendRawJdbcCatalogOptions(); + } + + @Override + protected String getMetastoreType() { + return JdbcCatalogFactory.IDENTIFIER; + } + + private void addIfNotBlank(String key, String value) { + if (StringUtils.isNotBlank(value)) { + catalogOptions.set(key, value); + } + } + + private void appendRawJdbcCatalogOptions() { + origProps.forEach((key, value) -> { + if (key != null && key.startsWith(JDBC_PREFIX) && !catalogOptions.keySet().contains(key)) { + catalogOptions.set(key, value); + } + }); + } + + /** + * Register JDBC driver with DriverManager. + * This is necessary because DriverManager.getConnection() doesn't use Thread.contextClassLoader. + */ + private void registerJdbcDriver(String driverUrl, String driverClassName) { + try { + if (StringUtils.isBlank(driverClassName)) { + throw new IllegalArgumentException( + "jdbc.driver_class or paimon.jdbc.driver_class is required when jdbc.driver_url " + + "or paimon.jdbc.driver_url is specified"); + } + + String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl); + URL url = new URL(fullDriverUrl); + String driverKey = fullDriverUrl + "#" + driverClassName; + if (!REGISTERED_DRIVER_KEYS.add(driverKey)) { + LOG.info("JDBC driver already registered for Paimon catalog: {} from {}", + driverClassName, fullDriverUrl); + return; Review Comment: `REGISTERED_DRIVER_KEYS.add(driverKey)` happens before driver class loading/registration, but the key is only removed on caught `Exception`. If registration fails with an `Error` (e.g., `NoClassDefFoundError` / `LinkageError` from a bad JAR), the key can remain and future attempts will incorrectly short-circuit as “already registered”. Consider ensuring the key is removed on any failure (e.g., via `try`/`finally` that only keeps the key on success, or by handling broader failure types). ########## fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java: ########## @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.metastore; + +import org.apache.doris.catalog.JdbcResource; +import org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.datasource.property.ConnectorProperty; +import org.apache.doris.datasource.property.storage.HdfsProperties; +import org.apache.doris.datasource.property.storage.StorageProperties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.jdbc.JdbcCatalogFactory; +import org.apache.paimon.options.CatalogOptions; + +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties { + private static final Logger LOG = LogManager.getLogger(PaimonJdbcMetaStoreProperties.class); + private static final String JDBC_PREFIX = "jdbc."; + private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>(); + private static final Set<String> REGISTERED_DRIVER_KEYS = ConcurrentHashMap.newKeySet(); + + @ConnectorProperty( + names = {"uri", "paimon.jdbc.uri"}, + required = true, + description = "JDBC connection URI for the Paimon JDBC catalog." + ) + private String uri = ""; + + @ConnectorProperty( + names = {"paimon.jdbc.user", "jdbc.user"}, + required = false, + description = "Username for the Paimon JDBC catalog." + ) + private String jdbcUser; + + @ConnectorProperty( + names = {"paimon.jdbc.password", "jdbc.password"}, + required = false, + sensitive = true, + description = "Password for the Paimon JDBC catalog." + ) + private String jdbcPassword; + + @ConnectorProperty( + names = {"paimon.jdbc.driver_url", "jdbc.driver_url"}, + required = false, + description = "JDBC driver JAR file path or URL. " + + "Can be a local file name (will look in $DORIS_HOME/plugins/jdbc_drivers/) " + + "or a full URL (http://, https://, file://)." + ) + private String driverUrl; + + @ConnectorProperty( + names = {"paimon.jdbc.driver_class", "jdbc.driver_class"}, + required = false, + description = "JDBC driver class name. If specified with paimon.jdbc.driver_url, " + + "the driver will be loaded dynamically." + ) + private String driverClass; + + protected PaimonJdbcMetaStoreProperties(Map<String, String> props) { + super(props); + } + + @Override + public String getPaimonCatalogType() { + return PaimonExternalCatalog.PAIMON_JDBC; + } + + @Override + protected void checkRequiredProperties() { + super.checkRequiredProperties(); + if (StringUtils.isBlank(warehouse)) { + throw new IllegalArgumentException("Property warehouse is required."); + } + } + + @Override + public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) { + buildCatalogOptions(); + Configuration conf = new Configuration(); + for (StorageProperties storageProperties : storagePropertiesList) { + if (storageProperties.getHadoopStorageConfig() != null) { + conf.addResource(storageProperties.getHadoopStorageConfig()); + } + if (storageProperties.getType().equals(StorageProperties.Type.HDFS)) { + this.executionAuthenticator = new HadoopExecutionAuthenticator(((HdfsProperties) storageProperties) + .getHadoopAuthenticator()); + } + } + appendUserHadoopConfig(conf); + if (StringUtils.isNotBlank(driverUrl)) { + registerJdbcDriver(driverUrl, driverClass); + LOG.info("Using dynamic JDBC driver for Paimon JDBC catalog from: {}", driverUrl); + } + CatalogContext catalogContext = CatalogContext.create(catalogOptions, conf); + try { + return this.executionAuthenticator.execute(() -> CatalogFactory.createCatalog(catalogContext)); + } catch (Exception e) { + throw new RuntimeException("Failed to create Paimon catalog with JDBC metastore: " + e.getMessage(), e); + } + } + + @Override + protected void appendCustomCatalogOptions() { + catalogOptions.set(CatalogOptions.URI.key(), uri); + addIfNotBlank("jdbc.user", jdbcUser); + addIfNotBlank("jdbc.password", jdbcPassword); + appendRawJdbcCatalogOptions(); + } + + @Override + protected String getMetastoreType() { + return JdbcCatalogFactory.IDENTIFIER; + } + + private void addIfNotBlank(String key, String value) { + if (StringUtils.isNotBlank(value)) { + catalogOptions.set(key, value); + } + } + + private void appendRawJdbcCatalogOptions() { + origProps.forEach((key, value) -> { + if (key != null && key.startsWith(JDBC_PREFIX) && !catalogOptions.keySet().contains(key)) { + catalogOptions.set(key, value); + } + }); + } + + /** + * Register JDBC driver with DriverManager. + * This is necessary because DriverManager.getConnection() doesn't use Thread.contextClassLoader. + */ + private void registerJdbcDriver(String driverUrl, String driverClassName) { + try { + if (StringUtils.isBlank(driverClassName)) { + throw new IllegalArgumentException( + "jdbc.driver_class or paimon.jdbc.driver_class is required when jdbc.driver_url " Review Comment: The dynamic JDBC driver registration implementation (`registerJdbcDriver`/`DriverShim` and related caches) is very similar to the existing implementation in `IcebergJdbcMetaStoreProperties`. Keeping two near-identical implementations tends to cause long-term drift and missed fixes in one path. Consider extracting a shared helper for driver URL normalization + classloader caching + `DriverManager` registration and reuse it from both classes. ########## regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_jdbc_catalog", "p0,external") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("Paimon test is not enabled, skip this test") + return + } + + String enabledJdbc = context.config.otherConfigs.get("enableJdbcTest") + if (enabledJdbc == null || !enabledJdbc.equalsIgnoreCase("true")) { + logger.info("Paimon JDBC catalog test requires enableJdbcTest, skip this test") + return + } + + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String minioPort = context.config.otherConfigs.get("paimon_jdbc_minio_port") + if (minioPort == null || minioPort.isEmpty()) { + minioPort = context.config.otherConfigs.get("iceberg_minio_port") + } + String jdbcPort = context.config.otherConfigs.get("pg_14_port") + if (externalEnvIp == null || externalEnvIp.isEmpty() + || minioPort == null || minioPort.isEmpty() + || jdbcPort == null || jdbcPort.isEmpty()) { + logger.info("Paimon JDBC catalog test environment is not fully configured, skip this test") + return + } + + String minioAk = context.config.otherConfigs.get("paimon_jdbc_minio_ak") + if (minioAk == null || minioAk.isEmpty()) { + minioAk = "admin" + } + String minioSk = context.config.otherConfigs.get("paimon_jdbc_minio_sk") + if (minioSk == null || minioSk.isEmpty()) { + minioSk = "password" + } + String warehouseBucket = context.config.otherConfigs.get("paimon_jdbc_warehouse_bucket") + if (warehouseBucket == null || warehouseBucket.isEmpty()) { + warehouseBucket = "warehouse" + } + + String catalogName = "test_paimon_jdbc_catalog" + String dbName = "paimon_jdbc_db" + String driverName = "postgresql-42.5.0.jar" + String driverDownloadUrl = "${getS3Url()}/regression/jdbc_driver/${driverName}" + String jdbcDriversDir = getFeConfig("jdbc_drivers_dir") + String localDriverDir = "${context.config.dataPath}/jdbc_driver" + String localDriverPath = "${localDriverDir}/${driverName}" + String sparkDriverPath = "/tmp/${driverName}" + String sparkSeedCatalogName = "${catalogName}_seed" + + assertTrue(jdbcDriversDir != null && !jdbcDriversDir.isEmpty(), "jdbc_drivers_dir must be configured") + + def executeCommand = { String cmd, Boolean mustSuc -> + try { + logger.info("execute ${cmd}") + def proc = new ProcessBuilder("/bin/bash", "-c", cmd).redirectErrorStream(true).start() + int exitcode = proc.waitFor() + String output = proc.text + if (exitcode != 0) { + logger.info("exit code: ${exitcode}, output\n: ${output}") Review Comment: `executeCommand` uses `proc.waitFor()` with no timeout, so a hung `curl`/`docker`/`spark-sql` command can block the entire regression run indefinitely. Also the `catch (IOException)` path is labeled as a timeout, but `waitFor()` does not throw `IOException`. Consider using `waitFor(timeout, TimeUnit)` (or Groovy’s `waitForOrKill`) and handling `InterruptedException`, reporting the real exception message when command execution fails. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
