This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 375965596cb [fix](fe) Fix Paimon JDBC driver registration for JNI
scans (#61513)
375965596cb is described below
commit 375965596cbe09b4bd36090326982b07b42d227b
Author: Chenjunwei <[email protected]>
AuthorDate: Tue Mar 31 10:47:52 2026 +0800
[fix](fe) Fix Paimon JDBC driver registration for JNI scans (#61513)
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: Paimon JDBC system tables that execute through the BE
JNI scanner could not initialize the PostgreSQL JDBC driver, so queries
such as snapshots/manifests/partitions failed with `No suitable driver
found`. This change propagates JDBC driver metadata to BE and registers
the driver in a DriverManager-visible classloader before Paimon table
initialization. The regression case is also extended to cover all
supported Paimon JDBC system tables, including `row_tracking` on a
row-tracking-enabled table.
---
be/src/format/table/paimon_cpp_reader.cpp | 9 +-
be/src/format/table/paimon_jni_reader.cpp | 11 +-
.../common/classloader/JniScannerClassLoader.java | 9 ++
.../apache/doris/common/jdbc/JdbcDriverUtils.java | 114 +++++++++++++++++
.../apache/doris/paimon/PaimonJdbcDriverUtils.java | 59 +++++++++
.../org/apache/doris/paimon/PaimonJniScanner.java | 2 +-
.../doris/paimon/PaimonJdbcDriverUtilsTest.java | 139 +++++++++++++++++++++
.../datasource/paimon/source/PaimonScanNode.java | 27 ++++
.../metastore/PaimonJdbcMetaStoreProperties.java | 18 +++
.../paimon/source/PaimonScanNodeTest.java | 94 ++++++++++++++
.../PaimonJdbcMetaStorePropertiesTest.java | 37 ++++++
gensrc/thrift/PlanNodes.thrift | 3 +
.../paimon/test_paimon_jdbc_catalog.groovy | 66 +++++++++-
13 files changed, 577 insertions(+), 11 deletions(-)
diff --git a/be/src/format/table/paimon_cpp_reader.cpp
b/be/src/format/table/paimon_cpp_reader.cpp
index 9bc84f3bbb3..e4b182c41ed 100644
--- a/be/src/format/table/paimon_cpp_reader.cpp
+++ b/be/src/format/table/paimon_cpp_reader.cpp
@@ -269,8 +269,12 @@ std::vector<std::string>
PaimonCppReader::_build_read_columns() const {
std::map<std::string, std::string> PaimonCppReader::_build_options() const {
std::map<std::string, std::string> options;
- if (_range.__isset.table_format_params &&
_range.table_format_params.__isset.paimon_params &&
- _range.table_format_params.paimon_params.__isset.paimon_options) {
+ if (_range_params && _range_params->__isset.paimon_options &&
+ !_range_params->paimon_options.empty()) {
+ options.insert(_range_params->paimon_options.begin(),
_range_params->paimon_options.end());
+ } else if (_range.__isset.table_format_params &&
+ _range.table_format_params.__isset.paimon_params &&
+
_range.table_format_params.paimon_params.__isset.paimon_options) {
options.insert(_range.table_format_params.paimon_params.paimon_options.begin(),
_range.table_format_params.paimon_params.paimon_options.end());
}
@@ -310,7 +314,6 @@ std::map<std::string, std::string>
PaimonCppReader::_build_options() const {
copy_if_missing("fs.s3a.region", "AWS_REGION");
copy_if_missing("fs.s3a.path.style.access", "use_path_style");
- // FE currently does not pass paimon_options in scan ranges.
// Backfill file.format/manifest.format from split file_format to avoid
// paimon-cpp falling back to default manifest.format=avro.
if (_range.__isset.table_format_params &&
_range.table_format_params.__isset.paimon_params &&
diff --git a/be/src/format/table/paimon_jni_reader.cpp
b/be/src/format/table/paimon_jni_reader.cpp
index 4fc9c76e4c4..12e6171b3a5 100644
--- a/be/src/format/table/paimon_jni_reader.cpp
+++ b/be/src/format/table/paimon_jni_reader.cpp
@@ -65,8 +65,15 @@ PaimonJniReader::PaimonJniReader(const
std::vector<SlotDescriptor*>& file_slot_d
if (range_params->__isset.serialized_table) {
params["serialized_table"] =
range_params->serialized_table;
}
- for (const auto& kv : paimon_params.paimon_options) {
- params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
+ if (range_params->__isset.paimon_options &&
+ !range_params->paimon_options.empty()) {
+ for (const auto& kv : range_params->paimon_options) {
+ params[PAIMON_OPTION_PREFIX + kv.first] =
kv.second;
+ }
+ } else if (paimon_params.__isset.paimon_options) {
+ for (const auto& kv : paimon_params.paimon_options) {
+ params[PAIMON_OPTION_PREFIX + kv.first] =
kv.second;
+ }
}
if (range_params->__isset.properties &&
!range_params->properties.empty()) {
for (const auto& kv : range_params->properties) {
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java
index 1e6be07e097..a90366d7a78 100644
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java
@@ -30,6 +30,15 @@ public class JniScannerClassLoader extends URLClassLoader {
this.scannerName = scannerName;
}
+ public synchronized void addURLIfAbsent(URL url) {
+ for (URL existingUrl : getURLs()) {
+ if (existingUrl.equals(url)) {
+ return;
+ }
+ }
+ super.addURL(url);
+ }
+
@Override
public String toString() {
return "JniScannerClassLoader{"
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jdbc/JdbcDriverUtils.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jdbc/JdbcDriverUtils.java
new file mode 100644
index 00000000000..67430d36108
--- /dev/null
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jdbc/JdbcDriverUtils.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.jdbc;
+
+import org.apache.doris.common.classloader.JniScannerClassLoader;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public final class JdbcDriverUtils {
+ private static final ConcurrentHashMap<URL, ClassLoader>
DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>();
+ private static final Set<String> REGISTERED_DRIVER_KEYS =
ConcurrentHashMap.newKeySet();
+
+ private JdbcDriverUtils() {
+ }
+
+ public static void registerDriver(String driverUrl, String
driverClassName, ClassLoader classLoader) {
+ try {
+ URL url = new URL(driverUrl);
+ String driverKey = driverUrl + "#" + driverClassName;
+ if (!REGISTERED_DRIVER_KEYS.add(driverKey)) {
+ return;
+ }
+ try {
+ ClassLoader driverClassLoader = prepareDriverClassLoader(url,
classLoader);
+ Class<?> loadedDriverClass = Class.forName(driverClassName,
true, driverClassLoader);
+ Driver driver = (Driver)
loadedDriverClass.getDeclaredConstructor().newInstance();
+ DriverManager.registerDriver(new DriverShim(driver));
+ } catch (Exception e) {
+ REGISTERED_DRIVER_KEYS.remove(driverKey);
+ throw new RuntimeException("Failed to register JDBC driver: "
+ driverClassName, e);
+ }
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Invalid JDBC driver URL: " +
driverUrl, e);
+ }
+ }
+
+ private static ClassLoader prepareDriverClassLoader(URL driverUrl,
ClassLoader classLoader) {
+ if (classLoader instanceof JniScannerClassLoader) {
+ JniScannerClassLoader scannerClassLoader = (JniScannerClassLoader)
classLoader;
+ scannerClassLoader.addURLIfAbsent(driverUrl);
+ return scannerClassLoader;
+ }
+ return DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(driverUrl,
+ url -> URLClassLoader.newInstance(new URL[] {url},
classLoader));
+ }
+
+ private static final class DriverShim implements Driver {
+ private final Driver delegate;
+
+ private DriverShim(Driver delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Connection connect(String url, java.util.Properties info)
throws java.sql.SQLException {
+ return delegate.connect(url, info);
+ }
+
+ @Override
+ public boolean acceptsURL(String url) throws java.sql.SQLException {
+ return delegate.acceptsURL(url);
+ }
+
+ @Override
+ public DriverPropertyInfo[] getPropertyInfo(String url,
java.util.Properties info)
+ throws java.sql.SQLException {
+ return delegate.getPropertyInfo(url, info);
+ }
+
+ @Override
+ public int getMajorVersion() {
+ return delegate.getMajorVersion();
+ }
+
+ @Override
+ public int getMinorVersion() {
+ return delegate.getMinorVersion();
+ }
+
+ @Override
+ public boolean jdbcCompliant() {
+ return delegate.jdbcCompliant();
+ }
+
+ @Override
+ public java.util.logging.Logger getParentLogger() throws
SQLFeatureNotSupportedException {
+ return delegate.getParentLogger();
+ }
+ }
+}
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJdbcDriverUtils.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJdbcDriverUtils.java
new file mode 100644
index 00000000000..09d7818dc71
--- /dev/null
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJdbcDriverUtils.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.paimon;
+
+import org.apache.doris.common.jdbc.JdbcDriverUtils;
+
+import java.util.Map;
+
+final class PaimonJdbcDriverUtils {
+ static final String PAIMON_JDBC_DRIVER_URL = "paimon.jdbc.driver_url";
+ static final String PAIMON_JDBC_DRIVER_CLASS = "paimon.jdbc.driver_class";
+ static final String JDBC_DRIVER_URL = "jdbc.driver_url";
+ static final String JDBC_DRIVER_CLASS = "jdbc.driver_class";
+
+ private PaimonJdbcDriverUtils() {
+ }
+
+ static void registerDriverIfNeeded(Map<String, String> params, ClassLoader
parentClassLoader) {
+ String driverUrl = firstNonBlank(params.get(PAIMON_JDBC_DRIVER_URL),
params.get(JDBC_DRIVER_URL));
+ if (driverUrl == null) {
+ return;
+ }
+ String driverClassName =
firstNonBlank(params.get(PAIMON_JDBC_DRIVER_CLASS),
params.get(JDBC_DRIVER_CLASS));
+ if (driverClassName == null) {
+ throw new IllegalArgumentException("paimon.jdbc.driver_class or
jdbc.driver_class is required when "
+ + "paimon.jdbc.driver_url or jdbc.driver_url is
specified");
+ }
+ registerDriver(driverUrl, driverClassName, parentClassLoader);
+ }
+
+ static void registerDriver(String driverUrl, String driverClassName,
ClassLoader parentClassLoader) {
+ JdbcDriverUtils.registerDriver(driverUrl, driverClassName,
parentClassLoader);
+ }
+
+ private static String firstNonBlank(String first, String second) {
+ if (first != null && !first.trim().isEmpty()) {
+ return first;
+ }
+ if (second != null && !second.trim().isEmpty()) {
+ return second;
+ }
+ return null;
+ }
+}
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 5690b7f6505..8f64a51dc9b 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -92,6 +92,7 @@ public class PaimonJniScanner extends JniScanner {
// so we need to provide a classloader, otherwise it will cause
NPE.
Thread.currentThread().setContextClassLoader(classLoader);
preExecutionAuthenticator.execute(() -> {
+ PaimonJdbcDriverUtils.registerDriverIfNeeded(params,
classLoader);
initTable();
initReader();
return null;
@@ -227,4 +228,3 @@ public class PaimonJniScanner extends JniScanner {
}
}
-
diff --git
a/fe/be-java-extensions/paimon-scanner/src/test/java/org/apache/doris/paimon/PaimonJdbcDriverUtilsTest.java
b/fe/be-java-extensions/paimon-scanner/src/test/java/org/apache/doris/paimon/PaimonJdbcDriverUtilsTest.java
new file mode 100644
index 00000000000..1f0df2371dd
--- /dev/null
+++
b/fe/be-java-extensions/paimon-scanner/src/test/java/org/apache/doris/paimon/PaimonJdbcDriverUtilsTest.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.paimon;
+
+import org.apache.doris.common.classloader.JniScannerClassLoader;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import java.util.logging.Logger;
+
+public class PaimonJdbcDriverUtilsTest {
+ private final List<Driver> registeredDrivers = new ArrayList<>();
+ private final List<Path> tempJars = new ArrayList<>();
+
+ @After
+ public void tearDown() throws Exception {
+ for (Driver driver : registeredDrivers) {
+ DriverManager.deregisterDriver(driver);
+ }
+ registeredDrivers.clear();
+ for (Path tempJar : tempJars) {
+ Files.deleteIfExists(tempJar);
+ }
+ tempJars.clear();
+ }
+
+ @Test
+ public void testRegisterDriverIfNeeded() throws Exception {
+ Path driverJar = createDriverJar();
+ Map<String, String> params = new HashMap<>();
+ params.put(PaimonJdbcDriverUtils.PAIMON_JDBC_DRIVER_URL,
driverJar.toUri().toURL().toString());
+ params.put(PaimonJdbcDriverUtils.PAIMON_JDBC_DRIVER_CLASS,
DummyJdbcDriver.class.getName());
+
+ JniScannerClassLoader scannerClassLoader =
+ new JniScannerClassLoader("paimon-test", List.of(),
ClassLoader.getPlatformClassLoader());
+ PaimonJdbcDriverUtils.registerDriverIfNeeded(params,
scannerClassLoader);
+
+ Driver driver = DriverManager.getDriver("jdbc:dummy:test");
+ registeredDrivers.add(driver);
+ Assert.assertTrue(driver.acceptsURL("jdbc:dummy:test"));
+ }
+
+ @Test
+ public void testRegisterDriverIfNeededRequiresDriverClass() {
+ Map<String, String> params = new HashMap<>();
+ params.put(PaimonJdbcDriverUtils.PAIMON_JDBC_DRIVER_URL,
"file:///tmp/postgresql-42.5.0.jar");
+
+ IllegalArgumentException exception =
Assert.assertThrows(IllegalArgumentException.class,
+ () -> PaimonJdbcDriverUtils.registerDriverIfNeeded(params,
getClass().getClassLoader()));
+ Assert.assertTrue(exception.getMessage().contains("driver_class"));
+ }
+
+ private Path createDriverJar() throws IOException {
+ Path jarPath = Files.createTempFile("paimon-jdbc-driver", ".jar");
+ tempJars.add(jarPath);
+ String resourceName = DummyJdbcDriver.class.getName().replace('.',
'/') + ".class";
+ try (JarOutputStream jarOutputStream = new
JarOutputStream(Files.newOutputStream(jarPath));
+ InputStream inputStream =
DummyJdbcDriver.class.getClassLoader().getResourceAsStream(resourceName)) {
+ Assert.assertNotNull(inputStream);
+ jarOutputStream.putNextEntry(new JarEntry(resourceName));
+ byte[] buffer = new byte[4096];
+ int bytesRead;
+ while ((bytesRead = inputStream.read(buffer)) >= 0) {
+ jarOutputStream.write(buffer, 0, bytesRead);
+ }
+ jarOutputStream.closeEntry();
+ }
+ return jarPath;
+ }
+
+ public static class DummyJdbcDriver implements Driver {
+ @Override
+ public java.sql.Connection connect(String url, Properties info) {
+ return null;
+ }
+
+ @Override
+ public boolean acceptsURL(String url) {
+ return url != null && url.startsWith("jdbc:dummy:");
+ }
+
+ @Override
+ public DriverPropertyInfo[] getPropertyInfo(String url, Properties
info) {
+ return new DriverPropertyInfo[0];
+ }
+
+ @Override
+ public int getMajorVersion() {
+ return 1;
+ }
+
+ @Override
+ public int getMinorVersion() {
+ return 0;
+ }
+
+ @Override
+ public boolean jdbcCompliant() {
+ return false;
+ }
+
+ @Override
+ public Logger getParentLogger() throws SQLFeatureNotSupportedException
{
+ throw new SQLFeatureNotSupportedException("not supported");
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index d54bd3be13a..b4fa0c5a6f0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -36,6 +36,7 @@ import org.apache.doris.datasource.paimon.PaimonUtil;
import org.apache.doris.datasource.paimon.PaimonUtils;
import org.apache.doris.datasource.paimon.profile.PaimonMetricRegistry;
import org.apache.doris.datasource.paimon.profile.PaimonScanMetricsReporter;
+import
org.apache.doris.datasource.property.metastore.PaimonJdbcMetaStoreProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanContext;
@@ -144,6 +145,7 @@ public class PaimonScanNode extends FileQueryScanNode {
// get them in doInitialize() to ensure internal consistency of ScanNode
private Map<StorageProperties.Type, StorageProperties>
storagePropertiesMap;
private Map<String, String> backendStorageProperties;
+ private Map<String, String> backendPaimonOptions = Collections.emptyMap();
// The schema information involved in the current query process (including
historical schema).
protected ConcurrentHashMap<Long, Boolean> currentQuerySchema = new
ConcurrentHashMap<>();
@@ -170,6 +172,7 @@ public class PaimonScanNode extends FileQueryScanNode {
source.getPaimonTable()
);
backendStorageProperties =
CredentialUtils.getBackendPropertiesFromStorageMap(storagePropertiesMap);
+ backendPaimonOptions = getBackendPaimonOptions();
}
@VisibleForTesting
@@ -202,6 +205,13 @@ public class PaimonScanNode extends FileQueryScanNode {
// Set paimon_predicate at ScanNode level to avoid redundant
serialization in each split
String serializedPredicate =
PaimonUtil.encodeObjectToString(predicates);
params.setPaimonPredicate(serializedPredicate);
+ setScanLevelPaimonOptions();
+ }
+
+ private void setScanLevelPaimonOptions() {
+ if (!backendPaimonOptions.isEmpty()) {
+ params.setPaimonOptions(backendPaimonOptions);
+ }
}
private void putHistorySchemaInfo(Long schemaId) {
@@ -463,6 +473,23 @@ public class PaimonScanNode extends FileQueryScanNode {
return splits;
}
+ @VisibleForTesting
+ Map<String, String> getBackendPaimonOptions() {
+ if (source == null) {
+ return Collections.emptyMap();
+ }
+ if (!(source.getCatalog() instanceof PaimonExternalCatalog)) {
+ return Collections.emptyMap();
+ }
+ PaimonExternalCatalog catalog = (PaimonExternalCatalog)
source.getCatalog();
+ if (!(catalog.getCatalogProperty().getMetastoreProperties() instanceof
PaimonJdbcMetaStoreProperties)) {
+ return Collections.emptyMap();
+ }
+ PaimonJdbcMetaStoreProperties jdbcMetaStoreProperties =
+ (PaimonJdbcMetaStoreProperties)
catalog.getCatalogProperty().getMetastoreProperties();
+ return jdbcMetaStoreProperties.getBackendPaimonOptions();
+ }
+
@VisibleForTesting
boolean shouldForceJniForSystemTable() {
if (source == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
index 92bcf023ff2..7568d59c5fe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
@@ -37,6 +37,8 @@ import org.apache.paimon.options.CatalogOptions;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -45,6 +47,8 @@ import java.util.concurrent.ConcurrentHashMap;
public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties {
private static final Logger LOG =
LogManager.getLogger(PaimonJdbcMetaStoreProperties.class);
private static final String JDBC_PREFIX = "jdbc.";
+ private static final String JDBC_DRIVER_URL = JDBC_PREFIX +
JdbcResource.DRIVER_URL;
+ private static final String JDBC_DRIVER_CLASS = JDBC_PREFIX +
JdbcResource.DRIVER_CLASS;
private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new
ConcurrentHashMap<>();
private static final Set<String> REGISTERED_DRIVER_KEYS =
ConcurrentHashMap.newKeySet();
@@ -157,6 +161,20 @@ public class PaimonJdbcMetaStoreProperties extends
AbstractPaimonProperties {
});
}
+ public Map<String, String> getBackendPaimonOptions() {
+ if (StringUtils.isBlank(driverUrl)) {
+ return Collections.emptyMap();
+ }
+ if (StringUtils.isBlank(driverClass)) {
+ throw new IllegalArgumentException("jdbc.driver_class or
paimon.jdbc.driver_class is required when "
+ + "jdbc.driver_url or paimon.jdbc.driver_url is
specified");
+ }
+ Map<String, String> backendPaimonOptions = new HashMap<>();
+ backendPaimonOptions.put(JDBC_DRIVER_URL,
JdbcResource.getFullDriverUrl(driverUrl));
+ backendPaimonOptions.put(JDBC_DRIVER_CLASS, driverClass);
+ return backendPaimonOptions;
+ }
+
/**
* Register JDBC driver with DriverManager.
* This is necessary because DriverManager.getConnection() doesn't use
Thread.contextClassLoader.
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 03828278c94..f0e8a91d360 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -21,13 +21,19 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplitter;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonSysExternalTable;
+import org.apache.doris.datasource.property.metastore.MetastoreProperties;
+import
org.apache.doris.datasource.property.metastore.PaimonJdbcMetaStoreProperties;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
@@ -475,6 +481,64 @@ public class PaimonScanNodeTest {
Assert.assertEquals(100L * 1024L * 1024L, target);
}
+ @Test
+ public void testGetBackendPaimonOptionsForJdbcCatalog() throws Exception {
+ String driverUrl = "file:///tmp/postgresql-42.5.0.jar";
+ Map<String, String> props = new HashMap<>();
+ props.put("type", "paimon");
+ props.put("paimon.catalog.type", "jdbc");
+ props.put("uri", "jdbc:postgresql://127.0.0.1:5442/postgres");
+ props.put("warehouse", "s3://warehouse/path");
+ props.put("paimon.jdbc.driver_url", driverUrl);
+ props.put("paimon.jdbc.driver_class", "org.postgresql.Driver");
+ PaimonJdbcMetaStoreProperties jdbcMetaStoreProperties =
+ (PaimonJdbcMetaStoreProperties)
MetastoreProperties.create(props);
+
+ CatalogProperty catalogProperty = Mockito.mock(CatalogProperty.class);
+
Mockito.when(catalogProperty.getMetastoreProperties()).thenReturn(jdbcMetaStoreProperties);
+
+ PaimonExternalCatalog catalog =
Mockito.mock(PaimonExternalCatalog.class);
+ Mockito.when(catalog.getCatalogProperty()).thenReturn(catalogProperty);
+
+ PaimonSource source = Mockito.mock(PaimonSource.class);
+ Mockito.when(source.getCatalog()).thenReturn(catalog);
+
+ PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new
TupleDescriptor(new TupleId(0)),
+ false, sv, ScanContext.EMPTY);
+ node.setSource(source);
+
+ Map<String, String> backendOptions = node.getBackendPaimonOptions();
+ Assert.assertEquals("org.postgresql.Driver",
backendOptions.get("jdbc.driver_class"));
+ Assert.assertEquals(driverUrl, backendOptions.get("jdbc.driver_url"));
+ Assert.assertEquals(2, backendOptions.size());
+ }
+
+ @Test
+ public void testApplyBackendPaimonOptionsAtScanNodeLevel() throws
Exception {
+ PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new
TupleDescriptor(new TupleId(0)),
+ false, sv, ScanContext.EMPTY);
+ PaimonSource source = Mockito.mock(PaimonSource.class);
+
Mockito.when(source.getTableLocation()).thenReturn("file:///warehouse");
+ node.setSource(source);
+
+ Map<String, String> backendOptions = new HashMap<>();
+ backendOptions.put("jdbc.driver_url",
"file:///tmp/postgresql-42.5.0.jar");
+ backendOptions.put("jdbc.driver_class", "org.postgresql.Driver");
+ setField(FileQueryScanNode.class, node, "params", new
TFileScanRangeParams());
+ setField(PaimonScanNode.class, node, "backendPaimonOptions",
backendOptions);
+ setField(PaimonScanNode.class, node, "storagePropertiesMap",
Collections.emptyMap());
+
+ invokePrivateMethod(node, "setScanLevelPaimonOptions");
+
+ Assert.assertEquals(backendOptions,
node.getFileScanRangeParams().getPaimonOptions());
+
+ TFileRangeDesc rangeDesc = new TFileRangeDesc();
+ invokePrivateMethod(node, "setPaimonParams",
+ new Class<?>[] {TFileRangeDesc.class, PaimonSplit.class},
+ rangeDesc, new
PaimonSplit(createDataSplit("scan_level.parquet")));
+
Assert.assertFalse(rangeDesc.getTableFormatParams().getPaimonParams().isSetPaimonOptions());
+ }
+
private void mockJniReader(PaimonScanNode spyNode) {
Mockito.doReturn(false).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
}
@@ -482,4 +546,34 @@ public class PaimonScanNodeTest {
private void mockNativeReader(PaimonScanNode spyNode) {
Mockito.doReturn(true).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
}
+
+ private void setField(Class<?> clazz, Object target, String fieldName,
Object value) throws Exception {
+ java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
+ private Object invokePrivateMethod(Object target, String methodName,
Class<?>[] parameterTypes, Object... args)
+ throws Exception {
+ Method method = target.getClass().getDeclaredMethod(methodName,
parameterTypes);
+ method.setAccessible(true);
+ return method.invoke(target, args);
+ }
+
+ private Object invokePrivateMethod(Object target, String methodName)
throws Exception {
+ return invokePrivateMethod(target, methodName, new Class<?>[0]);
+ }
+
+ private DataSplit createDataSplit(String fileName) {
+ DataFileMeta dataFileMeta = DataFileMeta.forAppend(fileName, 64L *
1024 * 1024, 1L, SimpleStats.EMPTY_STATS,
+ 1L, 1L, 1L, Collections.<String>emptyList(), null,
FileSource.APPEND,
+ Collections.<String>emptyList(), null, null,
Collections.<String>emptyList());
+ return DataSplit.builder()
+ .rawConvertible(true)
+ .withPartition(BinaryRow.singleColumn(1))
+ .withBucket(1)
+ .withBucketPath("file://b1")
+ .withDataFiles(Collections.singletonList(dataFileMeta))
+ .build();
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
index 471d2f05a95..cd430d8a631 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.property.metastore;
+import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.paimon.options.CatalogOptions;
@@ -152,4 +153,40 @@ public class PaimonJdbcMetaStorePropertiesTest {
Assertions.assertThrows(IllegalArgumentException.class,
() -> jdbcProps.initializeCatalog("paimon_catalog",
Collections.emptyList()));
}
+
+ @Test
+ public void testGetBackendPaimonOptions() throws Exception {
+ String driverUrl = "file:///tmp/postgresql-42.5.0.jar";
+ Map<String, String> props = new HashMap<>();
+ props.put("type", "paimon");
+ props.put("paimon.catalog.type", "jdbc");
+ props.put("uri", "jdbc:postgresql://127.0.0.1:5442/postgres");
+ props.put("warehouse", "s3://warehouse/path");
+ props.put("paimon.jdbc.driver_url", driverUrl);
+ props.put("paimon.jdbc.driver_class", "org.postgresql.Driver");
+
+ PaimonJdbcMetaStoreProperties jdbcProps =
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+ Map<String, String> backendOptions =
jdbcProps.getBackendPaimonOptions();
+
+ Assertions.assertEquals(
+ JdbcResource.getFullDriverUrl(driverUrl),
+ backendOptions.get("jdbc.driver_url"));
+ Assertions.assertEquals("org.postgresql.Driver",
backendOptions.get("jdbc.driver_class"));
+ Assertions.assertEquals(2, backendOptions.size());
+ }
+
+ @Test
+ public void testGetBackendPaimonOptionsRequiresDriverClass() throws
Exception {
+ Map<String, String> props = new HashMap<>();
+ props.put("type", "paimon");
+ props.put("paimon.catalog.type", "jdbc");
+ props.put("uri", "jdbc:postgresql://127.0.0.1:5442/postgres");
+ props.put("warehouse", "s3://warehouse/path");
+ props.put("paimon.jdbc.driver_url",
"file:///tmp/postgresql-42.5.0.jar");
+
+ PaimonJdbcMetaStoreProperties jdbcProps =
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+ IllegalArgumentException exception =
Assertions.assertThrows(IllegalArgumentException.class,
+ jdbcProps::getBackendPaimonOptions);
+ Assertions.assertTrue(exception.getMessage().contains("driver_class"));
+ }
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ebbf7987a9b..1f4ae4df04a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -498,6 +498,9 @@ struct TFileScanRangeParams {
// enable mapping varbinary type for Doris external table and TVF
28: optional bool enable_mapping_varbinary = false;
29: optional bool enable_mapping_timestamp_tz = false;
+ // Paimon options from FE, used for jni/native scanner
+ // Set at ScanNode level to avoid redundant serialization in each split
+ 30: optional map<string, string> paimon_options
}
struct TFileRangeDesc {
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
index 0f653063f14..82d8d5b0dfa 100644
---
a/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
+++
b/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
@@ -124,6 +124,22 @@ suite("test_paimon_jdbc_catalog", "p0,external") {
executeCommand(command, true)
}
+ def assertSystemTableReadable = { String tableExpr, List<String>
expectedColumns = [], Integer minCount = null ->
+ def descRows = sql """DESC ${tableExpr}"""
+ assertTrue(descRows.size() > 0)
+ expectedColumns.each { col ->
+ assertTrue(descRows.toString().contains(col))
+ }
+
+ def countRows = sql """SELECT COUNT(*) FROM ${tableExpr}"""
+ assertEquals(1, countRows.size())
+ int countValue = countRows[0][0].toString().toInteger()
+ if (minCount != null) {
+ assertTrue(countValue >= minCount)
+ }
+ return countValue
+ }
+
try {
sql """switch internal"""
sql """DROP CATALOG IF EXISTS ${catalogName}"""
@@ -189,15 +205,55 @@ suite("test_paimon_jdbc_catalog", "p0,external") {
assertEquals(1, rowCount.size())
assertEquals("2", rowCount[0][0].toString())
- def schemaDesc = sql """DESC paimon_jdbc_tbl\$schemas"""
- assertTrue(schemaDesc.toString().contains("schema_id"))
+ assertSystemTableReadable("paimon_jdbc_tbl\$schemas", ["schema_id"], 1)
+ assertSystemTableReadable("paimon_jdbc_tbl\$snapshots",
["snapshot_id"], 1)
+ [
+ "paimon_jdbc_tbl\$options",
+ "paimon_jdbc_tbl\$audit_log",
+ "paimon_jdbc_tbl\$files",
+ "paimon_jdbc_tbl\$tags",
+ "paimon_jdbc_tbl\$branches",
+ "paimon_jdbc_tbl\$consumers",
+ "paimon_jdbc_tbl\$ro",
+ "paimon_jdbc_tbl\$aggregation_fields",
+ "paimon_jdbc_tbl\$binlog",
+ "paimon_jdbc_tbl\$manifests",
+ "paimon_jdbc_tbl\$partitions",
+ "paimon_jdbc_tbl\$buckets",
+ "paimon_jdbc_tbl\$statistics",
+ "paimon_jdbc_tbl\$table_indexes"
+ ].each { tableExpr ->
+ assertSystemTableReadable(tableExpr)
+ }
+
+ sql """DROP TABLE IF EXISTS paimon_jdbc_row_tracking_tbl"""
+ sql """
+ CREATE TABLE ${dbName}.paimon_jdbc_row_tracking_tbl (
+ id INT,
+ name STRING,
+ dt DATE
+ ) ENGINE=paimon
+ PROPERTIES (
+ 'bucket' = '-1',
+ 'row-tracking.enabled' = 'true'
+ )
+ """
+
+ sparkPaimonJdbc """
+ INSERT INTO
${sparkSeedCatalogName}.${dbName}.paimon_jdbc_row_tracking_tbl VALUES
+ (3, 'carol', DATE '2025-01-03'),
+ (4, 'dave', DATE '2025-01-04')
+ """
- def schemaCount = sql """SELECT COUNT(*) FROM
paimon_jdbc_tbl\$schemas"""
- assertEquals(1, schemaCount.size())
- assertTrue(schemaCount[0][0].toString().toInteger() >= 1)
+ assertSystemTableReadable(
+ "paimon_jdbc_row_tracking_tbl\$row_tracking",
+ ["_row_id", "_sequence_number"],
+ 1
+ )
} finally {
try {
sql """SWITCH ${catalogName}"""
+ sql """DROP TABLE IF EXISTS
${dbName}.paimon_jdbc_row_tracking_tbl"""
sql """DROP TABLE IF EXISTS ${dbName}.paimon_jdbc_tbl"""
sql """DROP DATABASE IF EXISTS ${dbName} FORCE"""
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]