This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 469bc77a429 branch-2.1: [fix](jdbc catalog) Change BE jdbc Driver
loading to Java code (#48002)
469bc77a429 is described below
commit 469bc77a429b920116e01135ab7f2f8946f8acff
Author: zy-kkk <[email protected]>
AuthorDate: Fri Feb 21 17:39:26 2025 +0800
branch-2.1: [fix](jdbc catalog) Change BE jdbc Driver loading to Java code
(#48002)
cherry-pick from (#46912)
---
be/src/runtime/user_function_cache.cpp | 20 ++-----
be/src/runtime/user_function_cache.h | 1 -
be/src/vec/exec/vjdbc_connector.cpp | 29 ++++-----
be/src/vec/exec/vjdbc_connector.h | 2 +
.../org/apache/doris/jdbc/BaseJdbcExecutor.java | 70 +++++++++++++++++++++-
.../apache/doris/jdbc/JdbcDataSourceConfig.java | 10 ++++
.../org/apache/doris/catalog/JdbcResource.java | 7 ++-
.../org/apache/doris/catalog/JdbcResourceTest.java | 51 ++++++++++++++++
gensrc/thrift/Types.thrift | 1 +
9 files changed, 153 insertions(+), 38 deletions(-)
diff --git a/be/src/runtime/user_function_cache.cpp
b/be/src/runtime/user_function_cache.cpp
index ab9d90846ab..a5f354180b8 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -272,12 +272,10 @@ Status UserFunctionCache::_download_lib(const
std::string& url,
return Status::InternalError("fail to open file");
}
- std::string real_url = _get_real_url(url);
-
Md5Digest digest;
HttpClient client;
int64_t file_size = 0;
- RETURN_IF_ERROR(client.init(real_url));
+ RETURN_IF_ERROR(client.init(url));
Status status;
auto download_cb = [&status, &tmp_file, &fp, &digest, &file_size](const
void* data,
size_t
length) {
@@ -297,11 +295,10 @@ Status UserFunctionCache::_download_lib(const
std::string& url,
digest.digest();
if (!iequal(digest.hex(), entry->checksum)) {
fmt::memory_buffer error_msg;
- fmt::format_to(
- error_msg,
- " The checksum is not equal of {} ({}). The init info of first
create entry is:"
- "{} But download file check_sum is: {}, file_size is: {}.",
- url, real_url, entry->debug_string(), digest.hex(), file_size);
+ fmt::format_to(error_msg,
+ " The checksum is not equal of {}. The init info of
first create entry is:"
+ "{} But download file check_sum is: {}, file_size is:
{}.",
+ url, entry->debug_string(), digest.hex(), file_size);
std::string error(fmt::to_string(error_msg));
LOG(WARNING) << error;
return Status::InternalError(error);
@@ -323,13 +320,6 @@ Status UserFunctionCache::_download_lib(const std::string&
url,
return Status::OK();
}
-std::string UserFunctionCache::_get_real_url(const std::string& url) {
- if (url.find(":/") == std::string::npos) {
- return "file://" + config::jdbc_drivers_dir + "/" + url;
- }
- return url;
-}
-
std::string UserFunctionCache::_get_file_name_from_url(const std::string& url)
const {
std::string file_name;
size_t last_slash_pos = url.find_last_of('/');
diff --git a/be/src/runtime/user_function_cache.h
b/be/src/runtime/user_function_cache.h
index 5d1bff8b866..93759c261e2 100644
--- a/be/src/runtime/user_function_cache.h
+++ b/be/src/runtime/user_function_cache.h
@@ -72,7 +72,6 @@ private:
const std::string& file_name);
void _destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry);
- std::string _get_real_url(const std::string& url);
std::string _get_file_name_from_url(const std::string& url) const;
std::vector<std::string> _split_string_by_checksum(const std::string&
file);
diff --git a/be/src/vec/exec/vjdbc_connector.cpp
b/be/src/vec/exec/vjdbc_connector.cpp
index 0fa33bfaad9..203e7934084 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -119,23 +119,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read)
{
// Add a scoped cleanup jni reference object. This cleans up local refs
made below.
JniLocalFrame jni_frame;
{
- std::string local_location;
- std::hash<std::string> hash_str;
- auto* function_cache = UserFunctionCache::instance();
- if (_conn_param.resource_name.empty()) {
- // for jdbcExternalTable, _conn_param.resource_name == ""
- // so, we use _conn_param.driver_path as key of jarpath
- SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
- RETURN_IF_ERROR(function_cache->get_jarpath(
- std::abs((int64_t)hash_str(_conn_param.driver_path)),
_conn_param.driver_path,
- _conn_param.driver_checksum, &local_location));
- } else {
- SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
- RETURN_IF_ERROR(function_cache->get_jarpath(
- std::abs((int64_t)hash_str(_conn_param.resource_name)),
_conn_param.driver_path,
- _conn_param.driver_checksum, &local_location));
- }
- VLOG_QUERY << "driver local path = " << local_location;
+ std::string driver_path = _get_real_url(_conn_param.driver_path);
TJdbcExecutorCtorParams ctor_params;
ctor_params.__set_statement(_sql_str);
@@ -144,7 +128,8 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
ctor_params.__set_jdbc_user(_conn_param.user);
ctor_params.__set_jdbc_password(_conn_param.passwd);
ctor_params.__set_jdbc_driver_class(_conn_param.driver_class);
- ctor_params.__set_driver_path(local_location);
+ ctor_params.__set_driver_path(driver_path);
+ ctor_params.__set_jdbc_driver_checksum(_conn_param.driver_checksum);
if (state == nullptr) {
ctor_params.__set_batch_size(read ? 1 : 0);
} else {
@@ -601,4 +586,12 @@ jobject JdbcConnector::_get_java_table_type(JNIEnv* env,
TOdbcTableType::type ta
env->CallStaticObjectMethod(enumClass, findByValueMethod,
static_cast<jint>(tableType));
return javaEnumObj;
}
+
+std::string JdbcConnector::_get_real_url(const std::string& url) {
+ if (url.find(":/") == std::string::npos) {
+ return "file://" + config::jdbc_drivers_dir + "/" + url;
+ }
+ return url;
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vjdbc_connector.h
b/be/src/vec/exec/vjdbc_connector.h
index 954b0abfa78..c23dc11c865 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -134,6 +134,8 @@ private:
int rows);
jobject _get_java_table_type(JNIEnv* env, TOdbcTableType::type tableType);
+ std::string _get_real_url(const std::string& url);
+
bool _closed = false;
jclass _executor_factory_clazz;
jclass _executor_clazz;
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
index 56b7865dbb1..383f1fe9aa1 100644
---
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
@@ -18,7 +18,6 @@
package org.apache.doris.jdbc;
import org.apache.doris.common.exception.InternalException;
-import org.apache.doris.common.jni.utils.UdfUtils;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnValueConverter;
import org.apache.doris.common.jni.vec.VectorColumn;
@@ -27,7 +26,9 @@ import org.apache.doris.thrift.TJdbcExecutorCtorParams;
import org.apache.doris.thrift.TJdbcOperation;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.codec.binary.Hex;
import org.apache.log4j.Logger;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
@@ -35,8 +36,15 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.semver4j.Semver;
import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
import java.lang.reflect.Array;
import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLConnection;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Date;
@@ -57,6 +65,7 @@ public abstract class BaseJdbcExecutor implements
JdbcExecutor {
private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new
TBinaryProtocol.Factory();
private HikariDataSource hikariDataSource = null;
private final byte[] hikariDataSourceLock = new byte[0];
+ private ClassLoader classLoader = null;
private Connection conn = null;
protected JdbcDataSourceConfig config;
protected PreparedStatement preparedStatement = null;
@@ -68,6 +77,7 @@ public abstract class BaseJdbcExecutor implements
JdbcExecutor {
protected int batchSizeNum = 0;
protected int curBlockRows = 0;
protected String jdbcDriverVersion;
+ private static final Map<URL, ClassLoader> classLoaderMap =
Maps.newConcurrentMap();
public BaseJdbcExecutor(byte[] thriftParams) throws Exception {
setJdbcDriverSystemProperties();
@@ -85,6 +95,7 @@ public abstract class BaseJdbcExecutor implements
JdbcExecutor {
.setJdbcUrl(request.jdbc_url)
.setJdbcDriverUrl(request.driver_path)
.setJdbcDriverClass(request.jdbc_driver_class)
+ .setJdbcDriverChecksum(request.jdbc_driver_checksum)
.setBatchSize(request.batch_size)
.setOp(request.op)
.setTableType(request.table_type)
@@ -298,8 +309,7 @@ public abstract class BaseJdbcExecutor implements
JdbcExecutor {
ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
String hikariDataSourceKey = config.createCacheKey();
try {
- ClassLoader parent = getClass().getClassLoader();
- ClassLoader classLoader =
UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent);
+ initializeClassLoader(config);
Thread.currentThread().setContextClassLoader(classLoader);
hikariDataSource =
JdbcDataSource.getDataSource().getSource(hikariDataSourceKey);
if (hikariDataSource == null) {
@@ -357,6 +367,60 @@ public abstract class BaseJdbcExecutor implements
JdbcExecutor {
}
}
+ private synchronized void initializeClassLoader(JdbcDataSourceConfig
config)
+ throws MalformedURLException, FileNotFoundException {
+ try {
+ URL[] urls = {new URL(config.getJdbcDriverUrl())};
+ if (classLoaderMap.containsKey(urls[0])) {
+ this.classLoader = classLoaderMap.get(urls[0]);
+ } else {
+ String expectedChecksum = config.getJdbcDriverChecksum();
+ String actualChecksum =
computeObjectChecksum(urls[0].toString(), null);
+ if (!expectedChecksum.equals(actualChecksum)) {
+ throw new RuntimeException("Checksum mismatch for JDBC
driver.");
+ }
+ ClassLoader parent = getClass().getClassLoader();
+ this.classLoader = URLClassLoader.newInstance(urls, parent);
+ classLoaderMap.put(urls[0], this.classLoader);
+ }
+ } catch (MalformedURLException e) {
+ throw new RuntimeException("Error loading JDBC driver.", e);
+ }
+ }
+
+ public static String computeObjectChecksum(String urlStr, String
encodedAuthInfo) {
+ try (InputStream inputStream = getInputStreamFromUrl(urlStr,
encodedAuthInfo, 10000, 10000)) {
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ byte[] buf = new byte[4096];
+ int bytesRead;
+ while ((bytesRead = inputStream.read(buf)) != -1) {
+ digest.update(buf, 0, bytesRead);
+ }
+ return Hex.encodeHexString(digest.digest());
+ } catch (IOException | NoSuchAlgorithmException e) {
+ throw new RuntimeException("Compute driver checksum from url: " +
urlStr
+ + " encountered an error: " + e.getMessage());
+ }
+ }
+
+ public static InputStream getInputStreamFromUrl(String urlStr, String
encodedAuthInfo, int connectTimeoutMs,
+ int readTimeoutMs) throws IOException {
+ try {
+ URL url = new URL(urlStr);
+ URLConnection conn = url.openConnection();
+
+ if (encodedAuthInfo != null) {
+ conn.setRequestProperty("Authorization", "Basic " +
encodedAuthInfo);
+ }
+
+ conn.setConnectTimeout(connectTimeoutMs);
+ conn.setReadTimeout(readTimeoutMs);
+ return conn.getInputStream();
+ } catch (Exception e) {
+ throw new IOException("Failed to open URL connection: " + urlStr,
e);
+ }
+ }
+
protected void setValidationQuery(HikariDataSource ds) {
ds.setConnectionTestQuery("SELECT 1");
}
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
index a99377add25..87bb9849fe0 100644
---
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
@@ -27,6 +27,7 @@ public class JdbcDataSourceConfig {
private String jdbcPassword;
private String jdbcDriverUrl;
private String jdbcDriverClass;
+ private String jdbcDriverChecksum;
private int batchSize;
private TJdbcOperation op;
private TOdbcTableType tableType;
@@ -96,6 +97,15 @@ public class JdbcDataSourceConfig {
return this;
}
+ public String getJdbcDriverChecksum() {
+ return jdbcDriverChecksum;
+ }
+
+ public JdbcDataSourceConfig setJdbcDriverChecksum(String
jdbcDriverChecksum) {
+ this.jdbcDriverChecksum = jdbcDriverChecksum;
+ return this;
+ }
+
public int getBatchSize() {
return batchSize;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
index cd078b4376a..b5c61f908e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
@@ -277,6 +277,12 @@ public class JdbcResource extends Resource {
}
public static String getFullDriverUrl(String driverUrl) throws
IllegalArgumentException {
+ if (!(driverUrl.startsWith("file://") ||
driverUrl.startsWith("http://")
+ || driverUrl.startsWith("https://") ||
driverUrl.matches("^[^:/]+\\.jar$"))) {
+ throw new IllegalArgumentException("Invalid driver URL format.
Supported formats are: "
+ + "file://xxx.jar, http://xxx.jar, https://xxx.jar, or
xxx.jar (without prefix).");
+ }
+
try {
URI uri = new URI(driverUrl);
String schema = uri.getScheme();
@@ -481,4 +487,3 @@ public class JdbcResource extends Resource {
}
}
}
-
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
index 81c2157686a..3cd8f872ce9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
@@ -35,6 +35,7 @@ import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import java.util.Map;
@@ -216,4 +217,54 @@ public class JdbcResourceTest {
});
Assert.assertEquals("Driver URL does not match any allowed paths:
file:///postgresql-42.5.0.jar", exception.getMessage());
}
+
+ @Test
+ public void testValidDriverUrls() {
+ String fileUrl = "file://path/to/driver.jar";
+ Assertions.assertDoesNotThrow(() -> {
+ String result = JdbcResource.getFullDriverUrl(fileUrl);
+ Assert.assertEquals(fileUrl, result);
+ });
+
+ String httpUrl = "http://example.com/driver.jar";
+ Assertions.assertDoesNotThrow(() -> {
+ String result = JdbcResource.getFullDriverUrl(httpUrl);
+ Assert.assertEquals(httpUrl, result);
+ });
+
+ String httpsUrl = "https://example.com/driver.jar";
+ Assertions.assertDoesNotThrow(() -> {
+ String result = JdbcResource.getFullDriverUrl(httpsUrl);
+ Assert.assertEquals(httpsUrl, result);
+ });
+
+ String jarFile = "driver.jar";
+ Assertions.assertDoesNotThrow(() -> {
+ String result = JdbcResource.getFullDriverUrl(jarFile);
+ Assert.assertTrue(result.startsWith("file://"));
+ });
+ }
+
+ @Test
+ public void testInvalidDriverUrls() {
+ String invalidUrl1 = "/mnt/path/to/driver.jar";
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ JdbcResource.getFullDriverUrl(invalidUrl1);
+ });
+
+ String invalidUrl2 = "ftp://example.com/driver.jar";
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ JdbcResource.getFullDriverUrl(invalidUrl2);
+ });
+
+ String invalidUrl3 = "";
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ JdbcResource.getFullDriverUrl(invalidUrl3);
+ });
+
+ String invalidUrl4 = "example.com/driver";
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ JdbcResource.getFullDriverUrl(invalidUrl4);
+ });
+ }
}
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 89a67fdba15..336988e4b88 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -436,6 +436,7 @@ struct TJdbcExecutorCtorParams {
14: optional i32 connection_pool_cache_clear_time
15: optional bool connection_pool_keep_alive
16: optional i64 catalog_id
+ 17: optional string jdbc_driver_checksum
}
struct TJavaUdfExecutorCtorParams {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]