This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 375965596cb [fix](fe) Fix Paimon JDBC driver registration for JNI 
scans (#61513)
375965596cb is described below

commit 375965596cbe09b4bd36090326982b07b42d227b
Author: Chenjunwei <[email protected]>
AuthorDate: Tue Mar 31 10:47:52 2026 +0800

    [fix](fe) Fix Paimon JDBC driver registration for JNI scans (#61513)
    
    ### What problem does this PR solve?
    
    Issue Number: None
    
    Related PR: None
    
    Problem Summary: Paimon JDBC system tables that execute through the BE
    JNI scanner could not initialize the PostgreSQL JDBC driver, so queries
    such as snapshots/manifests/partitions failed with `No suitable driver
    found`. This change propagates JDBC driver metadata to BE and registers
    the driver in a DriverManager-visible classloader before Paimon table
    initialization. The regression case is also extended to cover all
    supported Paimon JDBC system tables, including `row_tracking` on a
    row-tracking-enabled table.
---
 be/src/format/table/paimon_cpp_reader.cpp          |   9 +-
 be/src/format/table/paimon_jni_reader.cpp          |  11 +-
 .../common/classloader/JniScannerClassLoader.java  |   9 ++
 .../apache/doris/common/jdbc/JdbcDriverUtils.java  | 114 +++++++++++++++++
 .../apache/doris/paimon/PaimonJdbcDriverUtils.java |  59 +++++++++
 .../org/apache/doris/paimon/PaimonJniScanner.java  |   2 +-
 .../doris/paimon/PaimonJdbcDriverUtilsTest.java    | 139 +++++++++++++++++++++
 .../datasource/paimon/source/PaimonScanNode.java   |  27 ++++
 .../metastore/PaimonJdbcMetaStoreProperties.java   |  18 +++
 .../paimon/source/PaimonScanNodeTest.java          |  94 ++++++++++++++
 .../PaimonJdbcMetaStorePropertiesTest.java         |  37 ++++++
 gensrc/thrift/PlanNodes.thrift                     |   3 +
 .../paimon/test_paimon_jdbc_catalog.groovy         |  66 +++++++++-
 13 files changed, 577 insertions(+), 11 deletions(-)

diff --git a/be/src/format/table/paimon_cpp_reader.cpp 
b/be/src/format/table/paimon_cpp_reader.cpp
index 9bc84f3bbb3..e4b182c41ed 100644
--- a/be/src/format/table/paimon_cpp_reader.cpp
+++ b/be/src/format/table/paimon_cpp_reader.cpp
@@ -269,8 +269,12 @@ std::vector<std::string> 
PaimonCppReader::_build_read_columns() const {
 
 std::map<std::string, std::string> PaimonCppReader::_build_options() const {
     std::map<std::string, std::string> options;
-    if (_range.__isset.table_format_params && 
_range.table_format_params.__isset.paimon_params &&
-        _range.table_format_params.paimon_params.__isset.paimon_options) {
+    if (_range_params && _range_params->__isset.paimon_options &&
+        !_range_params->paimon_options.empty()) {
+        options.insert(_range_params->paimon_options.begin(), 
_range_params->paimon_options.end());
+    } else if (_range.__isset.table_format_params &&
+               _range.table_format_params.__isset.paimon_params &&
+               
_range.table_format_params.paimon_params.__isset.paimon_options) {
         
options.insert(_range.table_format_params.paimon_params.paimon_options.begin(),
                        
_range.table_format_params.paimon_params.paimon_options.end());
     }
@@ -310,7 +314,6 @@ std::map<std::string, std::string> 
PaimonCppReader::_build_options() const {
     copy_if_missing("fs.s3a.region", "AWS_REGION");
     copy_if_missing("fs.s3a.path.style.access", "use_path_style");
 
-    // FE currently does not pass paimon_options in scan ranges.
     // Backfill file.format/manifest.format from split file_format to avoid
     // paimon-cpp falling back to default manifest.format=avro.
     if (_range.__isset.table_format_params && 
_range.table_format_params.__isset.paimon_params &&
diff --git a/be/src/format/table/paimon_jni_reader.cpp 
b/be/src/format/table/paimon_jni_reader.cpp
index 4fc9c76e4c4..12e6171b3a5 100644
--- a/be/src/format/table/paimon_jni_reader.cpp
+++ b/be/src/format/table/paimon_jni_reader.cpp
@@ -65,8 +65,15 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
                       if (range_params->__isset.serialized_table) {
                           params["serialized_table"] = 
range_params->serialized_table;
                       }
-                      for (const auto& kv : paimon_params.paimon_options) {
-                          params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
+                      if (range_params->__isset.paimon_options &&
+                          !range_params->paimon_options.empty()) {
+                          for (const auto& kv : range_params->paimon_options) {
+                              params[PAIMON_OPTION_PREFIX + kv.first] = 
kv.second;
+                          }
+                      } else if (paimon_params.__isset.paimon_options) {
+                          for (const auto& kv : paimon_params.paimon_options) {
+                              params[PAIMON_OPTION_PREFIX + kv.first] = 
kv.second;
+                          }
                       }
                       if (range_params->__isset.properties && 
!range_params->properties.empty()) {
                           for (const auto& kv : range_params->properties) {
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java
index 1e6be07e097..a90366d7a78 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java
@@ -30,6 +30,15 @@ public class JniScannerClassLoader extends URLClassLoader {
         this.scannerName = scannerName;
     }
 
+    public synchronized void addURLIfAbsent(URL url) {
+        for (URL existingUrl : getURLs()) {
+            if (existingUrl.equals(url)) {
+                return;
+            }
+        }
+        super.addURL(url);
+    }
+
     @Override
     public String toString() {
         return "JniScannerClassLoader{"
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jdbc/JdbcDriverUtils.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jdbc/JdbcDriverUtils.java
new file mode 100644
index 00000000000..67430d36108
--- /dev/null
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jdbc/JdbcDriverUtils.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.jdbc;
+
+import org.apache.doris.common.classloader.JniScannerClassLoader;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public final class JdbcDriverUtils {
+    private static final ConcurrentHashMap<URL, ClassLoader> 
DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>();
+    private static final Set<String> REGISTERED_DRIVER_KEYS = 
ConcurrentHashMap.newKeySet();
+
+    private JdbcDriverUtils() {
+    }
+
+    public static void registerDriver(String driverUrl, String 
driverClassName, ClassLoader classLoader) {
+        try {
+            URL url = new URL(driverUrl);
+            String driverKey = driverUrl + "#" + driverClassName;
+            if (!REGISTERED_DRIVER_KEYS.add(driverKey)) {
+                return;
+            }
+            try {
+                ClassLoader driverClassLoader = prepareDriverClassLoader(url, 
classLoader);
+                Class<?> loadedDriverClass = Class.forName(driverClassName, 
true, driverClassLoader);
+                Driver driver = (Driver) 
loadedDriverClass.getDeclaredConstructor().newInstance();
+                DriverManager.registerDriver(new DriverShim(driver));
+            } catch (Exception e) {
+                REGISTERED_DRIVER_KEYS.remove(driverKey);
+                throw new RuntimeException("Failed to register JDBC driver: " 
+ driverClassName, e);
+            }
+        } catch (MalformedURLException e) {
+            throw new IllegalArgumentException("Invalid JDBC driver URL: " + 
driverUrl, e);
+        }
+    }
+
+    private static ClassLoader prepareDriverClassLoader(URL driverUrl, 
ClassLoader classLoader) {
+        if (classLoader instanceof JniScannerClassLoader) {
+            JniScannerClassLoader scannerClassLoader = (JniScannerClassLoader) 
classLoader;
+            scannerClassLoader.addURLIfAbsent(driverUrl);
+            return scannerClassLoader;
+        }
+        return DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(driverUrl,
+                url -> URLClassLoader.newInstance(new URL[] {url}, 
classLoader));
+    }
+
+    private static final class DriverShim implements Driver {
+        private final Driver delegate;
+
+        private DriverShim(Driver delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public Connection connect(String url, java.util.Properties info) 
throws java.sql.SQLException {
+            return delegate.connect(url, info);
+        }
+
+        @Override
+        public boolean acceptsURL(String url) throws java.sql.SQLException {
+            return delegate.acceptsURL(url);
+        }
+
+        @Override
+        public DriverPropertyInfo[] getPropertyInfo(String url, 
java.util.Properties info)
+                throws java.sql.SQLException {
+            return delegate.getPropertyInfo(url, info);
+        }
+
+        @Override
+        public int getMajorVersion() {
+            return delegate.getMajorVersion();
+        }
+
+        @Override
+        public int getMinorVersion() {
+            return delegate.getMinorVersion();
+        }
+
+        @Override
+        public boolean jdbcCompliant() {
+            return delegate.jdbcCompliant();
+        }
+
+        @Override
+        public java.util.logging.Logger getParentLogger() throws 
SQLFeatureNotSupportedException {
+            return delegate.getParentLogger();
+        }
+    }
+}
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJdbcDriverUtils.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJdbcDriverUtils.java
new file mode 100644
index 00000000000..09d7818dc71
--- /dev/null
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJdbcDriverUtils.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.paimon;
+
+import org.apache.doris.common.jdbc.JdbcDriverUtils;
+
+import java.util.Map;
+
+final class PaimonJdbcDriverUtils {
+    static final String PAIMON_JDBC_DRIVER_URL = "paimon.jdbc.driver_url";
+    static final String PAIMON_JDBC_DRIVER_CLASS = "paimon.jdbc.driver_class";
+    static final String JDBC_DRIVER_URL = "jdbc.driver_url";
+    static final String JDBC_DRIVER_CLASS = "jdbc.driver_class";
+
+    private PaimonJdbcDriverUtils() {
+    }
+
+    static void registerDriverIfNeeded(Map<String, String> params, ClassLoader 
parentClassLoader) {
+        String driverUrl = firstNonBlank(params.get(PAIMON_JDBC_DRIVER_URL), 
params.get(JDBC_DRIVER_URL));
+        if (driverUrl == null) {
+            return;
+        }
+        String driverClassName = 
firstNonBlank(params.get(PAIMON_JDBC_DRIVER_CLASS), 
params.get(JDBC_DRIVER_CLASS));
+        if (driverClassName == null) {
+            throw new IllegalArgumentException("paimon.jdbc.driver_class or 
jdbc.driver_class is required when "
+                    + "paimon.jdbc.driver_url or jdbc.driver_url is 
specified");
+        }
+        registerDriver(driverUrl, driverClassName, parentClassLoader);
+    }
+
+    static void registerDriver(String driverUrl, String driverClassName, 
ClassLoader parentClassLoader) {
+        JdbcDriverUtils.registerDriver(driverUrl, driverClassName, 
parentClassLoader);
+    }
+
+    private static String firstNonBlank(String first, String second) {
+        if (first != null && !first.trim().isEmpty()) {
+            return first;
+        }
+        if (second != null && !second.trim().isEmpty()) {
+            return second;
+        }
+        return null;
+    }
+}
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 5690b7f6505..8f64a51dc9b 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -92,6 +92,7 @@ public class PaimonJniScanner extends JniScanner {
             // so we need to provide a classloader, otherwise it will cause 
NPE.
             Thread.currentThread().setContextClassLoader(classLoader);
             preExecutionAuthenticator.execute(() -> {
+                PaimonJdbcDriverUtils.registerDriverIfNeeded(params, 
classLoader);
                 initTable();
                 initReader();
                 return null;
@@ -227,4 +228,3 @@ public class PaimonJniScanner extends JniScanner {
     }
 
 }
-
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/test/java/org/apache/doris/paimon/PaimonJdbcDriverUtilsTest.java
 
b/fe/be-java-extensions/paimon-scanner/src/test/java/org/apache/doris/paimon/PaimonJdbcDriverUtilsTest.java
new file mode 100644
index 00000000000..1f0df2371dd
--- /dev/null
+++ 
b/fe/be-java-extensions/paimon-scanner/src/test/java/org/apache/doris/paimon/PaimonJdbcDriverUtilsTest.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.paimon;
+
+import org.apache.doris.common.classloader.JniScannerClassLoader;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import java.util.logging.Logger;
+
+public class PaimonJdbcDriverUtilsTest {
+    private final List<Driver> registeredDrivers = new ArrayList<>();
+    private final List<Path> tempJars = new ArrayList<>();
+
+    @After
+    public void tearDown() throws Exception {
+        for (Driver driver : registeredDrivers) {
+            DriverManager.deregisterDriver(driver);
+        }
+        registeredDrivers.clear();
+        for (Path tempJar : tempJars) {
+            Files.deleteIfExists(tempJar);
+        }
+        tempJars.clear();
+    }
+
+    @Test
+    public void testRegisterDriverIfNeeded() throws Exception {
+        Path driverJar = createDriverJar();
+        Map<String, String> params = new HashMap<>();
+        params.put(PaimonJdbcDriverUtils.PAIMON_JDBC_DRIVER_URL, 
driverJar.toUri().toURL().toString());
+        params.put(PaimonJdbcDriverUtils.PAIMON_JDBC_DRIVER_CLASS, 
DummyJdbcDriver.class.getName());
+
+        JniScannerClassLoader scannerClassLoader =
+                new JniScannerClassLoader("paimon-test", List.of(), 
ClassLoader.getPlatformClassLoader());
+        PaimonJdbcDriverUtils.registerDriverIfNeeded(params, 
scannerClassLoader);
+
+        Driver driver = DriverManager.getDriver("jdbc:dummy:test");
+        registeredDrivers.add(driver);
+        Assert.assertTrue(driver.acceptsURL("jdbc:dummy:test"));
+    }
+
+    @Test
+    public void testRegisterDriverIfNeededRequiresDriverClass() {
+        Map<String, String> params = new HashMap<>();
+        params.put(PaimonJdbcDriverUtils.PAIMON_JDBC_DRIVER_URL, 
"file:///tmp/postgresql-42.5.0.jar");
+
+        IllegalArgumentException exception = 
Assert.assertThrows(IllegalArgumentException.class,
+                () -> PaimonJdbcDriverUtils.registerDriverIfNeeded(params, 
getClass().getClassLoader()));
+        Assert.assertTrue(exception.getMessage().contains("driver_class"));
+    }
+
+    private Path createDriverJar() throws IOException {
+        Path jarPath = Files.createTempFile("paimon-jdbc-driver", ".jar");
+        tempJars.add(jarPath);
+        String resourceName = DummyJdbcDriver.class.getName().replace('.', 
'/') + ".class";
+        try (JarOutputStream jarOutputStream = new 
JarOutputStream(Files.newOutputStream(jarPath));
+                InputStream inputStream = 
DummyJdbcDriver.class.getClassLoader().getResourceAsStream(resourceName)) {
+            Assert.assertNotNull(inputStream);
+            jarOutputStream.putNextEntry(new JarEntry(resourceName));
+            byte[] buffer = new byte[4096];
+            int bytesRead;
+            while ((bytesRead = inputStream.read(buffer)) >= 0) {
+                jarOutputStream.write(buffer, 0, bytesRead);
+            }
+            jarOutputStream.closeEntry();
+        }
+        return jarPath;
+    }
+
+    public static class DummyJdbcDriver implements Driver {
+        @Override
+        public java.sql.Connection connect(String url, Properties info) {
+            return null;
+        }
+
+        @Override
+        public boolean acceptsURL(String url) {
+            return url != null && url.startsWith("jdbc:dummy:");
+        }
+
+        @Override
+        public DriverPropertyInfo[] getPropertyInfo(String url, Properties 
info) {
+            return new DriverPropertyInfo[0];
+        }
+
+        @Override
+        public int getMajorVersion() {
+            return 1;
+        }
+
+        @Override
+        public int getMinorVersion() {
+            return 0;
+        }
+
+        @Override
+        public boolean jdbcCompliant() {
+            return false;
+        }
+
+        @Override
+        public Logger getParentLogger() throws SQLFeatureNotSupportedException 
{
+            throw new SQLFeatureNotSupportedException("not supported");
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index d54bd3be13a..b4fa0c5a6f0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -36,6 +36,7 @@ import org.apache.doris.datasource.paimon.PaimonUtil;
 import org.apache.doris.datasource.paimon.PaimonUtils;
 import org.apache.doris.datasource.paimon.profile.PaimonMetricRegistry;
 import org.apache.doris.datasource.paimon.profile.PaimonScanMetricsReporter;
+import 
org.apache.doris.datasource.property.metastore.PaimonJdbcMetaStoreProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanContext;
@@ -144,6 +145,7 @@ public class PaimonScanNode extends FileQueryScanNode {
     // get them in doInitialize() to ensure internal consistency of ScanNode
     private Map<StorageProperties.Type, StorageProperties> 
storagePropertiesMap;
     private Map<String, String> backendStorageProperties;
+    private Map<String, String> backendPaimonOptions = Collections.emptyMap();
 
     // The schema information involved in the current query process (including 
historical schema).
     protected ConcurrentHashMap<Long, Boolean> currentQuerySchema = new 
ConcurrentHashMap<>();
@@ -170,6 +172,7 @@ public class PaimonScanNode extends FileQueryScanNode {
                 source.getPaimonTable()
         );
         backendStorageProperties = 
CredentialUtils.getBackendPropertiesFromStorageMap(storagePropertiesMap);
+        backendPaimonOptions = getBackendPaimonOptions();
     }
 
     @VisibleForTesting
@@ -202,6 +205,13 @@ public class PaimonScanNode extends FileQueryScanNode {
         // Set paimon_predicate at ScanNode level to avoid redundant 
serialization in each split
         String serializedPredicate = 
PaimonUtil.encodeObjectToString(predicates);
         params.setPaimonPredicate(serializedPredicate);
+        setScanLevelPaimonOptions();
+    }
+
+    private void setScanLevelPaimonOptions() {
+        if (!backendPaimonOptions.isEmpty()) {
+            params.setPaimonOptions(backendPaimonOptions);
+        }
     }
 
     private void putHistorySchemaInfo(Long schemaId) {
@@ -463,6 +473,23 @@ public class PaimonScanNode extends FileQueryScanNode {
         return splits;
     }
 
+    @VisibleForTesting
+    Map<String, String> getBackendPaimonOptions() {
+        if (source == null) {
+            return Collections.emptyMap();
+        }
+        if (!(source.getCatalog() instanceof PaimonExternalCatalog)) {
+            return Collections.emptyMap();
+        }
+        PaimonExternalCatalog catalog = (PaimonExternalCatalog) 
source.getCatalog();
+        if (!(catalog.getCatalogProperty().getMetastoreProperties() instanceof 
PaimonJdbcMetaStoreProperties)) {
+            return Collections.emptyMap();
+        }
+        PaimonJdbcMetaStoreProperties jdbcMetaStoreProperties =
+                (PaimonJdbcMetaStoreProperties) 
catalog.getCatalogProperty().getMetastoreProperties();
+        return jdbcMetaStoreProperties.getBackendPaimonOptions();
+    }
+
     @VisibleForTesting
     boolean shouldForceJniForSystemTable() {
         if (source == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
index 92bcf023ff2..7568d59c5fe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
@@ -37,6 +37,8 @@ import org.apache.paimon.options.CatalogOptions;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -45,6 +47,8 @@ import java.util.concurrent.ConcurrentHashMap;
 public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties {
     private static final Logger LOG = 
LogManager.getLogger(PaimonJdbcMetaStoreProperties.class);
     private static final String JDBC_PREFIX = "jdbc.";
+    private static final String JDBC_DRIVER_URL = JDBC_PREFIX + 
JdbcResource.DRIVER_URL;
+    private static final String JDBC_DRIVER_CLASS = JDBC_PREFIX + 
JdbcResource.DRIVER_CLASS;
     private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new 
ConcurrentHashMap<>();
     private static final Set<String> REGISTERED_DRIVER_KEYS = 
ConcurrentHashMap.newKeySet();
 
@@ -157,6 +161,20 @@ public class PaimonJdbcMetaStoreProperties extends 
AbstractPaimonProperties {
         });
     }
 
+    public Map<String, String> getBackendPaimonOptions() {
+        if (StringUtils.isBlank(driverUrl)) {
+            return Collections.emptyMap();
+        }
+        if (StringUtils.isBlank(driverClass)) {
+            throw new IllegalArgumentException("jdbc.driver_class or 
paimon.jdbc.driver_class is required when "
+                    + "jdbc.driver_url or paimon.jdbc.driver_url is 
specified");
+        }
+        Map<String, String> backendPaimonOptions = new HashMap<>();
+        backendPaimonOptions.put(JDBC_DRIVER_URL, 
JdbcResource.getFullDriverUrl(driverUrl));
+        backendPaimonOptions.put(JDBC_DRIVER_CLASS, driverClass);
+        return backendPaimonOptions;
+    }
+
     /**
      * Register JDBC driver with DriverManager.
      * This is necessary because DriverManager.getConnection() doesn't use 
Thread.contextClassLoader.
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 03828278c94..f0e8a91d360 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -21,13 +21,19 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.FileSplitter;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonSysExternalTable;
+import org.apache.doris.datasource.property.metastore.MetastoreProperties;
+import 
org.apache.doris.datasource.property.metastore.PaimonJdbcMetaStoreProperties;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileScanRangeParams;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
@@ -475,6 +481,64 @@ public class PaimonScanNodeTest {
         Assert.assertEquals(100L * 1024L * 1024L, target);
     }
 
+    @Test
+    public void testGetBackendPaimonOptionsForJdbcCatalog() throws Exception {
+        String driverUrl = "file:///tmp/postgresql-42.5.0.jar";
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "paimon");
+        props.put("paimon.catalog.type", "jdbc");
+        props.put("uri", "jdbc:postgresql://127.0.0.1:5442/postgres");
+        props.put("warehouse", "s3://warehouse/path");
+        props.put("paimon.jdbc.driver_url", driverUrl);
+        props.put("paimon.jdbc.driver_class", "org.postgresql.Driver");
+        PaimonJdbcMetaStoreProperties jdbcMetaStoreProperties =
+                (PaimonJdbcMetaStoreProperties) 
MetastoreProperties.create(props);
+
+        CatalogProperty catalogProperty = Mockito.mock(CatalogProperty.class);
+        
Mockito.when(catalogProperty.getMetastoreProperties()).thenReturn(jdbcMetaStoreProperties);
+
+        PaimonExternalCatalog catalog = 
Mockito.mock(PaimonExternalCatalog.class);
+        Mockito.when(catalog.getCatalogProperty()).thenReturn(catalogProperty);
+
+        PaimonSource source = Mockito.mock(PaimonSource.class);
+        Mockito.when(source.getCatalog()).thenReturn(catalog);
+
+        PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new 
TupleDescriptor(new TupleId(0)),
+                false, sv, ScanContext.EMPTY);
+        node.setSource(source);
+
+        Map<String, String> backendOptions = node.getBackendPaimonOptions();
+        Assert.assertEquals("org.postgresql.Driver", 
backendOptions.get("jdbc.driver_class"));
+        Assert.assertEquals(driverUrl, backendOptions.get("jdbc.driver_url"));
+        Assert.assertEquals(2, backendOptions.size());
+    }
+
+    @Test
+    public void testApplyBackendPaimonOptionsAtScanNodeLevel() throws 
Exception {
+        PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new 
TupleDescriptor(new TupleId(0)),
+                false, sv, ScanContext.EMPTY);
+        PaimonSource source = Mockito.mock(PaimonSource.class);
+        
Mockito.when(source.getTableLocation()).thenReturn("file:///warehouse");
+        node.setSource(source);
+
+        Map<String, String> backendOptions = new HashMap<>();
+        backendOptions.put("jdbc.driver_url", 
"file:///tmp/postgresql-42.5.0.jar");
+        backendOptions.put("jdbc.driver_class", "org.postgresql.Driver");
+        setField(FileQueryScanNode.class, node, "params", new 
TFileScanRangeParams());
+        setField(PaimonScanNode.class, node, "backendPaimonOptions", 
backendOptions);
+        setField(PaimonScanNode.class, node, "storagePropertiesMap", 
Collections.emptyMap());
+
+        invokePrivateMethod(node, "setScanLevelPaimonOptions");
+
+        Assert.assertEquals(backendOptions, 
node.getFileScanRangeParams().getPaimonOptions());
+
+        TFileRangeDesc rangeDesc = new TFileRangeDesc();
+        invokePrivateMethod(node, "setPaimonParams",
+                new Class<?>[] {TFileRangeDesc.class, PaimonSplit.class},
+                rangeDesc, new 
PaimonSplit(createDataSplit("scan_level.parquet")));
+        
Assert.assertFalse(rangeDesc.getTableFormatParams().getPaimonParams().isSetPaimonOptions());
+    }
+
     private void mockJniReader(PaimonScanNode spyNode) {
         
Mockito.doReturn(false).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
     }
@@ -482,4 +546,34 @@ public class PaimonScanNodeTest {
     private void mockNativeReader(PaimonScanNode spyNode) {
         
Mockito.doReturn(true).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
     }
+
+    private void setField(Class<?> clazz, Object target, String fieldName, 
Object value) throws Exception {
+        java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(target, value);
+    }
+
+    private Object invokePrivateMethod(Object target, String methodName, 
Class<?>[] parameterTypes, Object... args)
+            throws Exception {
+        Method method = target.getClass().getDeclaredMethod(methodName, 
parameterTypes);
+        method.setAccessible(true);
+        return method.invoke(target, args);
+    }
+
+    private Object invokePrivateMethod(Object target, String methodName) 
throws Exception {
+        return invokePrivateMethod(target, methodName, new Class<?>[0]);
+    }
+
+    private DataSplit createDataSplit(String fileName) {
+        DataFileMeta dataFileMeta = DataFileMeta.forAppend(fileName, 64L * 
1024 * 1024, 1L, SimpleStats.EMPTY_STATS,
+                1L, 1L, 1L, Collections.<String>emptyList(), null, 
FileSource.APPEND,
+                Collections.<String>emptyList(), null, null, 
Collections.<String>emptyList());
+        return DataSplit.builder()
+                .rawConvertible(true)
+                .withPartition(BinaryRow.singleColumn(1))
+                .withBucket(1)
+                .withBucketPath("file://b1")
+                .withDataFiles(Collections.singletonList(dataFileMeta))
+                .build();
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
index 471d2f05a95..cd430d8a631 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.property.metastore;
 
+import org.apache.doris.catalog.JdbcResource;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 
 import org.apache.paimon.options.CatalogOptions;
@@ -152,4 +153,40 @@ public class PaimonJdbcMetaStorePropertiesTest {
         Assertions.assertThrows(IllegalArgumentException.class,
                 () -> jdbcProps.initializeCatalog("paimon_catalog", 
Collections.emptyList()));
     }
+
+    @Test
+    public void testGetBackendPaimonOptions() throws Exception {
+        String driverUrl = "file:///tmp/postgresql-42.5.0.jar";
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "paimon");
+        props.put("paimon.catalog.type", "jdbc");
+        props.put("uri", "jdbc:postgresql://127.0.0.1:5442/postgres");
+        props.put("warehouse", "s3://warehouse/path");
+        props.put("paimon.jdbc.driver_url", driverUrl);
+        props.put("paimon.jdbc.driver_class", "org.postgresql.Driver");
+
+        PaimonJdbcMetaStoreProperties jdbcProps = 
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+        Map<String, String> backendOptions = 
jdbcProps.getBackendPaimonOptions();
+
+        Assertions.assertEquals(
+                JdbcResource.getFullDriverUrl(driverUrl),
+                backendOptions.get("jdbc.driver_url"));
+        Assertions.assertEquals("org.postgresql.Driver", 
backendOptions.get("jdbc.driver_class"));
+        Assertions.assertEquals(2, backendOptions.size());
+    }
+
+    @Test
+    public void testGetBackendPaimonOptionsRequiresDriverClass() throws 
Exception {
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "paimon");
+        props.put("paimon.catalog.type", "jdbc");
+        props.put("uri", "jdbc:postgresql://127.0.0.1:5442/postgres");
+        props.put("warehouse", "s3://warehouse/path");
+        props.put("paimon.jdbc.driver_url", 
"file:///tmp/postgresql-42.5.0.jar");
+
+        PaimonJdbcMetaStoreProperties jdbcProps = 
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+        IllegalArgumentException exception = 
Assertions.assertThrows(IllegalArgumentException.class,
+                jdbcProps::getBackendPaimonOptions);
+        Assertions.assertTrue(exception.getMessage().contains("driver_class"));
+    }
 }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ebbf7987a9b..1f4ae4df04a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -498,6 +498,9 @@ struct TFileScanRangeParams {
     // enable mapping varbinary type for Doris external table and TVF
     28: optional bool enable_mapping_varbinary = false;
     29: optional bool enable_mapping_timestamp_tz = false;
+    // Paimon options from FE, used for jni/native scanner
+    // Set at ScanNode level to avoid redundant serialization in each split
+    30: optional map<string, string> paimon_options
 }
 
 struct TFileRangeDesc {
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
 
b/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
index 0f653063f14..82d8d5b0dfa 100644
--- 
a/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
+++ 
b/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
@@ -124,6 +124,22 @@ suite("test_paimon_jdbc_catalog", "p0,external") {
         executeCommand(command, true)
     }
 
+    def assertSystemTableReadable = { String tableExpr, List<String> 
expectedColumns = [], Integer minCount = null ->
+        def descRows = sql """DESC ${tableExpr}"""
+        assertTrue(descRows.size() > 0)
+        expectedColumns.each { col ->
+            assertTrue(descRows.toString().contains(col))
+        }
+
+        def countRows = sql """SELECT COUNT(*) FROM ${tableExpr}"""
+        assertEquals(1, countRows.size())
+        int countValue = countRows[0][0].toString().toInteger()
+        if (minCount != null) {
+            assertTrue(countValue >= minCount)
+        }
+        return countValue
+    }
+
     try {
         sql """switch internal"""
         sql """DROP CATALOG IF EXISTS ${catalogName}"""
@@ -189,15 +205,55 @@ suite("test_paimon_jdbc_catalog", "p0,external") {
         assertEquals(1, rowCount.size())
         assertEquals("2", rowCount[0][0].toString())
 
-        def schemaDesc = sql """DESC paimon_jdbc_tbl\$schemas"""
-        assertTrue(schemaDesc.toString().contains("schema_id"))
+        assertSystemTableReadable("paimon_jdbc_tbl\$schemas", ["schema_id"], 1)
+        assertSystemTableReadable("paimon_jdbc_tbl\$snapshots", 
["snapshot_id"], 1)
+        [
+            "paimon_jdbc_tbl\$options",
+            "paimon_jdbc_tbl\$audit_log",
+            "paimon_jdbc_tbl\$files",
+            "paimon_jdbc_tbl\$tags",
+            "paimon_jdbc_tbl\$branches",
+            "paimon_jdbc_tbl\$consumers",
+            "paimon_jdbc_tbl\$ro",
+            "paimon_jdbc_tbl\$aggregation_fields",
+            "paimon_jdbc_tbl\$binlog",
+            "paimon_jdbc_tbl\$manifests",
+            "paimon_jdbc_tbl\$partitions",
+            "paimon_jdbc_tbl\$buckets",
+            "paimon_jdbc_tbl\$statistics",
+            "paimon_jdbc_tbl\$table_indexes"
+        ].each { tableExpr ->
+            assertSystemTableReadable(tableExpr)
+        }
+
+        sql """DROP TABLE IF EXISTS paimon_jdbc_row_tracking_tbl"""
+        sql """
+            CREATE TABLE ${dbName}.paimon_jdbc_row_tracking_tbl (
+                id INT,
+                name STRING,
+                dt DATE
+            ) ENGINE=paimon
+            PROPERTIES (
+                'bucket' = '-1',
+                'row-tracking.enabled' = 'true'
+            )
+        """
+
+        sparkPaimonJdbc """
+            INSERT INTO 
${sparkSeedCatalogName}.${dbName}.paimon_jdbc_row_tracking_tbl VALUES
+            (3, 'carol', DATE '2025-01-03'),
+            (4, 'dave', DATE '2025-01-04')
+        """
 
-        def schemaCount = sql """SELECT COUNT(*) FROM 
paimon_jdbc_tbl\$schemas"""
-        assertEquals(1, schemaCount.size())
-        assertTrue(schemaCount[0][0].toString().toInteger() >= 1)
+        assertSystemTableReadable(
+            "paimon_jdbc_row_tracking_tbl\$row_tracking",
+            ["_row_id", "_sequence_number"],
+            1
+        )
     } finally {
         try {
             sql """SWITCH ${catalogName}"""
+            sql """DROP TABLE IF EXISTS 
${dbName}.paimon_jdbc_row_tracking_tbl"""
             sql """DROP TABLE IF EXISTS ${dbName}.paimon_jdbc_tbl"""
             sql """DROP DATABASE IF EXISTS ${dbName} FORCE"""
         } catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to