This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7e7fcf1b337 [improvement](jdbc catalog) Optimize connection pool
parameter settings (#31177)
7e7fcf1b337 is described below
commit 7e7fcf1b3373026c622c590bccd7edb5533b56d1
Author: zy-kkk <[email protected]>
AuthorDate: Wed Feb 21 18:00:05 2024 +0800
[improvement](jdbc catalog) Optimize connection pool parameter settings
(#31177)
pick from (#30588)
---
be/src/common/config.cpp | 3 +
be/src/common/config.h | 3 +
be/src/runtime/descriptors.cpp | 27 +++-
be/src/runtime/descriptors.h | 12 ++
be/src/vec/exec/scan/new_jdbc_scanner.cpp | 6 +
be/src/vec/exec/vjdbc_connector.cpp | 8 +
be/src/vec/exec/vjdbc_connector.h | 6 +
be/src/vec/sink/vjdbc_table_sink.cpp | 8 +
conf/be.conf | 4 +-
.../java/org/apache/doris/jdbc/JdbcDataSource.java | 51 ++++++-
.../apache/doris/jdbc/JdbcDataSourceConfig.java | 170 +++++++++++++++++++++
.../java/org/apache/doris/jdbc/JdbcExecutor.java | 116 +++++++-------
.../main/java/org/apache/doris/common/Config.java | 20 ---
.../org/apache/doris/catalog/JdbcResource.java | 100 ++++++++++--
.../java/org/apache/doris/catalog/JdbcTable.java | 54 +++++++
.../java/org/apache/doris/catalog/Resource.java | 2 +
.../java/org/apache/doris/catalog/ResourceMgr.java | 1 +
.../doris/catalog/external/JdbcExternalTable.java | 6 +
.../doris/datasource/jdbc/JdbcExternalCatalog.java | 66 +++++---
.../doris/datasource/jdbc/client/JdbcClient.java | 37 ++---
.../datasource/jdbc/client/JdbcClientConfig.java | 72 +++++++++
.../doris/planner/external/jdbc/JdbcTableSink.java | 28 +---
.../org/apache/doris/catalog/JdbcResourceTest.java | 146 +++++++++++++++++-
.../datasource/jdbc/JdbcExternalCatalogTest.java | 46 ++++--
fe/pom.xml | 2 +-
gensrc/thrift/Descriptors.thrift | 7 +-
gensrc/thrift/Types.thrift | 8 +
27 files changed, 832 insertions(+), 177 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f881edaaba3..728f0e005d9 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -763,6 +763,9 @@ DEFINE_mInt32(segment_compression_threshold_kb, "256");
// The connection timeout when connecting to external table such as odbc table.
DEFINE_mInt32(external_table_connect_timeout_sec, "30");
+// Time to clean up useless JDBC connection pool cache
+DEFINE_mInt32(jdbc_connection_pool_cache_clear_time_sec, "28800");
+
// Global bitmap cache capacity for aggregation cache, size in bytes
DEFINE_Int64(delete_bitmap_agg_cache_capacity, "104857600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 822832cfeca..29d6d56fb80 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -817,6 +817,9 @@ DECLARE_mInt32(segment_compression_threshold_kb);
// The connection timeout when connecting to external table such as odbc table.
DECLARE_mInt32(external_table_connect_timeout_sec);
+// Time to clean up useless JDBC connection pool cache
+DECLARE_mInt32(jdbc_connection_pool_cache_clear_time_sec);
+
// Global bitmap cache capacity for aggregation cache, size in bytes
DECLARE_Int64(delete_bitmap_agg_cache_capacity);
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index f8125588ae5..a73f05f326b 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -251,6 +251,7 @@ std::string ODBCTableDescriptor::debug_string() const {
JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc)
: TableDescriptor(tdesc),
+ _jdbc_catalog_id(tdesc.jdbcTable.catalog_id),
_jdbc_resource_name(tdesc.jdbcTable.jdbc_resource_name),
_jdbc_driver_url(tdesc.jdbcTable.jdbc_driver_url),
_jdbc_driver_class(tdesc.jdbcTable.jdbc_driver_class),
@@ -258,17 +259,27 @@ JdbcTableDescriptor::JdbcTableDescriptor(const
TTableDescriptor& tdesc)
_jdbc_url(tdesc.jdbcTable.jdbc_url),
_jdbc_table_name(tdesc.jdbcTable.jdbc_table_name),
_jdbc_user(tdesc.jdbcTable.jdbc_user),
- _jdbc_passwd(tdesc.jdbcTable.jdbc_password) {}
+ _jdbc_passwd(tdesc.jdbcTable.jdbc_password),
+ _connection_pool_min_size(tdesc.jdbcTable.connection_pool_min_size),
+ _connection_pool_max_size(tdesc.jdbcTable.connection_pool_max_size),
+
_connection_pool_max_wait_time(tdesc.jdbcTable.connection_pool_max_wait_time),
+
_connection_pool_max_life_time(tdesc.jdbcTable.connection_pool_max_life_time),
+
_connection_pool_keep_alive(tdesc.jdbcTable.connection_pool_keep_alive) {}
std::string JdbcTableDescriptor::debug_string() const {
fmt::memory_buffer buf;
- fmt::format_to(buf,
- "JDBCTable({} ,_jdbc_resource_name={} ,_jdbc_driver_url={} "
- ",_jdbc_driver_class={} ,_jdbc_driver_checksum={}
,_jdbc_url={} "
- ",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={})",
- TableDescriptor::debug_string(), _jdbc_resource_name,
_jdbc_driver_url,
- _jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url,
_jdbc_table_name,
- _jdbc_user, _jdbc_passwd);
+ fmt::format_to(
+ buf,
+ "JDBCTable({} ,_jdbc_catalog_id = {}, _jdbc_resource_name={}
,_jdbc_driver_url={} "
+ ",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} "
+ ",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={}
,_connection_pool_min_size={} "
+ ",_connection_pool_max_size={} ,_connection_pool_max_wait_time={} "
+ ",_connection_pool_max_life_time={}
,_connection_pool_keep_alive={})",
+ TableDescriptor::debug_string(), _jdbc_catalog_id,
_jdbc_resource_name,
+ _jdbc_driver_url, _jdbc_driver_class, _jdbc_driver_checksum,
_jdbc_url,
+ _jdbc_table_name, _jdbc_user, _jdbc_passwd,
_connection_pool_min_size,
+ _connection_pool_max_size, _connection_pool_max_wait_time,
+ _connection_pool_max_life_time, _connection_pool_keep_alive);
return fmt::to_string(buf);
}
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 6808523ab58..015e0e60607 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -307,6 +307,7 @@ class JdbcTableDescriptor : public TableDescriptor {
public:
JdbcTableDescriptor(const TTableDescriptor& tdesc);
std::string debug_string() const override;
+ int64_t jdbc_catalog_id() const { return _jdbc_catalog_id; }
const std::string& jdbc_resource_name() const { return
_jdbc_resource_name; }
const std::string& jdbc_driver_url() const { return _jdbc_driver_url; }
const std::string& jdbc_driver_class() const { return _jdbc_driver_class; }
@@ -315,8 +316,14 @@ public:
const std::string& jdbc_table_name() const { return _jdbc_table_name; }
const std::string& jdbc_user() const { return _jdbc_user; }
const std::string& jdbc_passwd() const { return _jdbc_passwd; }
+ int32_t connection_pool_min_size() const { return
_connection_pool_min_size; }
+ int32_t connection_pool_max_size() const { return
_connection_pool_max_size; }
+ int32_t connection_pool_max_wait_time() const { return
_connection_pool_max_wait_time; }
+ int32_t connection_pool_max_life_time() const { return
_connection_pool_max_life_time; }
+ bool connection_pool_keep_alive() const { return
_connection_pool_keep_alive; }
private:
+ int64_t _jdbc_catalog_id;
std::string _jdbc_resource_name;
std::string _jdbc_driver_url;
std::string _jdbc_driver_class;
@@ -325,6 +332,11 @@ private:
std::string _jdbc_table_name;
std::string _jdbc_user;
std::string _jdbc_passwd;
+ int32_t _connection_pool_min_size;
+ int32_t _connection_pool_max_size;
+ int32_t _connection_pool_max_wait_time;
+ int32_t _connection_pool_max_life_time;
+ bool _connection_pool_keep_alive;
};
class TupleDescriptor {
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index 3d71aeb4f72..91ea57f0bbf 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -82,6 +82,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const
VExprContextSPtrs& con
if (jdbc_table == nullptr) {
return Status::InternalError("jdbc table pointer is NULL of
VJdbcScanNode::prepare.");
}
+ _jdbc_param.catalog_id = jdbc_table->jdbc_catalog_id();
_jdbc_param.driver_class = jdbc_table->jdbc_driver_class();
_jdbc_param.driver_path = jdbc_table->jdbc_driver_url();
_jdbc_param.resource_name = jdbc_table->jdbc_resource_name();
@@ -92,6 +93,11 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const
VExprContextSPtrs& con
_jdbc_param.tuple_desc = _tuple_desc;
_jdbc_param.query_string = std::move(_query_string);
_jdbc_param.table_type = _table_type;
+ _jdbc_param.connection_pool_min_size =
jdbc_table->connection_pool_min_size();
+ _jdbc_param.connection_pool_max_size =
jdbc_table->connection_pool_max_size();
+ _jdbc_param.connection_pool_max_life_time =
jdbc_table->connection_pool_max_life_time();
+ _jdbc_param.connection_pool_max_wait_time =
jdbc_table->connection_pool_max_wait_time();
+ _jdbc_param.connection_pool_keep_alive =
jdbc_table->connection_pool_keep_alive();
get_parent()->_scanner_profile->add_info_string("JdbcDriverClass",
_jdbc_param.driver_class);
get_parent()->_scanner_profile->add_info_string("JdbcDriverUrl",
_jdbc_param.driver_path);
diff --git a/be/src/vec/exec/vjdbc_connector.cpp
b/be/src/vec/exec/vjdbc_connector.cpp
index 20fcfd196b0..666a06531d2 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -150,6 +150,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
TJdbcExecutorCtorParams ctor_params;
ctor_params.__set_statement(_sql_str);
+ ctor_params.__set_catalog_id(_conn_param.catalog_id);
ctor_params.__set_jdbc_url(_conn_param.jdbc_url);
ctor_params.__set_jdbc_user(_conn_param.user);
ctor_params.__set_jdbc_password(_conn_param.passwd);
@@ -158,6 +159,13 @@ Status JdbcConnector::open(RuntimeState* state, bool read)
{
ctor_params.__set_batch_size(read ? state->batch_size() : 0);
ctor_params.__set_op(read ? TJdbcOperation::READ :
TJdbcOperation::WRITE);
ctor_params.__set_table_type(_conn_param.table_type);
+
ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size);
+
ctor_params.__set_connection_pool_max_size(_conn_param.connection_pool_max_size);
+
ctor_params.__set_connection_pool_max_wait_time(_conn_param.connection_pool_max_wait_time);
+
ctor_params.__set_connection_pool_max_life_time(_conn_param.connection_pool_max_life_time);
+ ctor_params.__set_connection_pool_cache_clear_time(
+ config::jdbc_connection_pool_cache_clear_time_sec);
+
ctor_params.__set_connection_pool_keep_alive(_conn_param.connection_pool_keep_alive);
jbyteArray ctor_params_bytes;
// Pushed frame will be popped when jni_frame goes out-of-scope.
diff --git a/be/src/vec/exec/vjdbc_connector.h
b/be/src/vec/exec/vjdbc_connector.h
index 361705ac4c2..55b39f0aec2 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -43,6 +43,7 @@ class IColumn;
class VExprContext;
struct JdbcConnectorParam {
+ int64_t catalog_id;
std::string driver_path;
std::string driver_class;
std::string resource_name;
@@ -52,6 +53,11 @@ struct JdbcConnectorParam {
std::string passwd;
std::string query_string;
TOdbcTableType::type table_type;
+ int32_t connection_pool_min_size;
+ int32_t connection_pool_max_size;
+ int32_t connection_pool_max_wait_time;
+ int32_t connection_pool_max_life_time;
+ bool connection_pool_keep_alive;
const TupleDescriptor* tuple_desc;
};
diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp
b/be/src/vec/sink/vjdbc_table_sink.cpp
index e2eab4d7b0c..2663dc236a1 100644
--- a/be/src/vec/sink/vjdbc_table_sink.cpp
+++ b/be/src/vec/sink/vjdbc_table_sink.cpp
@@ -47,6 +47,7 @@ Status VJdbcTableSink::init(const TDataSink& t_sink) {
RETURN_IF_ERROR(VTableSink::init(t_sink));
const TJdbcTableSink& t_jdbc_sink = t_sink.jdbc_table_sink;
+ _jdbc_param.catalog_id = t_jdbc_sink.jdbc_table.catalog_id;
_jdbc_param.jdbc_url = t_jdbc_sink.jdbc_table.jdbc_url;
_jdbc_param.user = t_jdbc_sink.jdbc_table.jdbc_user;
_jdbc_param.passwd = t_jdbc_sink.jdbc_table.jdbc_password;
@@ -56,6 +57,13 @@ Status VJdbcTableSink::init(const TDataSink& t_sink) {
_jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name;
_jdbc_param.table_type = t_jdbc_sink.table_type;
_jdbc_param.query_string = t_jdbc_sink.insert_sql;
+ _jdbc_param.connection_pool_min_size =
t_jdbc_sink.jdbc_table.connection_pool_min_size;
+ _jdbc_param.connection_pool_max_size =
t_jdbc_sink.jdbc_table.connection_pool_max_size;
+ _jdbc_param.connection_pool_max_wait_time =
+ t_jdbc_sink.jdbc_table.connection_pool_max_wait_time;
+ _jdbc_param.connection_pool_max_life_time =
+ t_jdbc_sink.jdbc_table.connection_pool_max_life_time;
+ _jdbc_param.connection_pool_keep_alive =
t_jdbc_sink.jdbc_table.connection_pool_keep_alive;
_table_name = t_jdbc_sink.jdbc_table.jdbc_table_name;
_use_transaction = t_jdbc_sink.use_transaction;
diff --git a/conf/be.conf b/conf/be.conf
index 9c877901d59..15121111646 100644
--- a/conf/be.conf
+++ b/conf/be.conf
@@ -19,10 +19,10 @@ CUR_DATE=`date +%Y%m%d-%H%M%S`
PPROF_TMPDIR="$DORIS_HOME/log/"
-JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log
-Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.java.command=DorisBE
-XX:-CriticalJNINatives -DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100
-DJDBC_MAX_IDLE_TIME=300000 -DJDBC_MAX_WAIT_TIME=5000 -DJDBC_KEEP_ALIVE=FALSE"
+JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log
-Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.java.command=DorisBE
-XX:-CriticalJNINatives"
# For jdk 9+, this JAVA_OPTS will be used as default JVM options
-JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.java.command=DorisBE
-XX:-CriticalJNINatives -DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100
-DJDBC_MAX_IDLE_TIME=300000 -DJDBC_MAX_WAIT_TIME=5000 -DJDBC_KEEP_ALIVE=FALSE"
+JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.java.command=DorisBE
-XX:-CriticalJNINatives"
# since 1.2, the JAVA_HOME need to be set to run BE process.
# JAVA_HOME=/path/to/jdk/
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java
index 6a6a022d29c..3c8ac38cf7d 100644
---
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java
@@ -18,32 +18,77 @@
package org.apache.doris.jdbc;
import com.alibaba.druid.pool.DruidDataSource;
+import org.apache.log4j.Logger;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
public class JdbcDataSource {
+ private static final Logger LOG = Logger.getLogger(JdbcDataSource.class);
private static final JdbcDataSource jdbcDataSource = new JdbcDataSource();
private final Map<String, DruidDataSource> sourcesMap = new
ConcurrentHashMap<>();
+ private final Map<String, Long> lastAccessTimeMap = new
ConcurrentHashMap<>();
+ private final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ private long cleanupInterval = 8 * 60 * 60 * 1000; // 8 hours
+ private ScheduledFuture<?> cleanupTask = null;
+
+ private JdbcDataSource() {
+ startCleanupTask();
+ }
public static JdbcDataSource getDataSource() {
return jdbcDataSource;
}
public DruidDataSource getSource(String cacheKey) {
+ lastAccessTimeMap.put(cacheKey, System.currentTimeMillis());
return sourcesMap.get(cacheKey);
}
public void putSource(String cacheKey, DruidDataSource ds) {
sourcesMap.put(cacheKey, ds);
+ lastAccessTimeMap.put(cacheKey, System.currentTimeMillis());
}
public Map<String, DruidDataSource> getSourcesMap() {
return sourcesMap;
}
- public String createCacheKey(String jdbcUrl, String jdbcUser, String
jdbcPassword, String jdbcDriverUrl,
- String jdbcDriverClass) {
- return jdbcUrl + jdbcUser + jdbcPassword + jdbcDriverUrl +
jdbcDriverClass;
+ public void setCleanupInterval(long interval) {
+ this.cleanupInterval = interval * 1000L;
+ restartCleanupTask();
+ }
+
+ private synchronized void restartCleanupTask() {
+ if (cleanupTask != null && !cleanupTask.isCancelled()) {
+ cleanupTask.cancel(false);
+ }
+ cleanupTask = executor.scheduleAtFixedRate(() -> {
+ try {
+ long now = System.currentTimeMillis();
+ lastAccessTimeMap.forEach((key, lastAccessTime) -> {
+ if (now - lastAccessTime > cleanupInterval) {
+ DruidDataSource ds = sourcesMap.remove(key);
+ if (ds != null) {
+ ds.close();
+ }
+ lastAccessTimeMap.remove(key);
+ LOG.info("remove jdbc data source: " +
key.split("jdbc")[0]);
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("failed to cleanup jdbc data source", e);
+ }
+ }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
+ }
+
+ private void startCleanupTask() {
+ if (cleanupTask == null || cleanupTask.isCancelled()) {
+ restartCleanupTask();
+ }
}
}
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
new file mode 100644
index 00000000000..dcf576986fe
--- /dev/null
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.thrift.TJdbcOperation;
+import org.apache.doris.thrift.TOdbcTableType;
+
+public class JdbcDataSourceConfig {
+ private Long catalogId;
+ private String jdbcUrl;
+ private String jdbcUser;
+ private String jdbcPassword;
+ private String jdbcDriverUrl;
+ private String jdbcDriverClass;
+ private int batchSize;
+ private TJdbcOperation op;
+ private TOdbcTableType tableType;
+ private int connectionPoolMinSize;
+ private int connectionPoolMaxSize;
+ private int connectionPoolMaxWaitTime;
+ private int connectionPoolMaxLifeTime;
+ private boolean connectionPoolKeepAlive;
+
+ public String createCacheKey() {
+ return catalogId + jdbcUrl + jdbcUser + jdbcPassword + jdbcDriverUrl +
jdbcDriverClass
+ + connectionPoolMinSize + connectionPoolMaxSize +
connectionPoolMaxLifeTime + connectionPoolMaxWaitTime
+ + connectionPoolKeepAlive;
+ }
+
+ public long getCatalogId() {
+ return catalogId;
+ }
+
+ public JdbcDataSourceConfig setCatalogId(long catalogId) {
+ this.catalogId = catalogId;
+ return this;
+ }
+
+ public String getJdbcUrl() {
+ return jdbcUrl;
+ }
+
+ public JdbcDataSourceConfig setJdbcUrl(String jdbcUrl) {
+ this.jdbcUrl = jdbcUrl;
+ return this;
+ }
+
+ public String getJdbcUser() {
+ return jdbcUser;
+ }
+
+ public JdbcDataSourceConfig setJdbcUser(String jdbcUser) {
+ this.jdbcUser = jdbcUser;
+ return this;
+ }
+
+ public String getJdbcPassword() {
+ return jdbcPassword;
+ }
+
+ public JdbcDataSourceConfig setJdbcPassword(String jdbcPassword) {
+ this.jdbcPassword = jdbcPassword;
+ return this;
+ }
+
+ public String getJdbcDriverUrl() {
+ return jdbcDriverUrl;
+ }
+
+ public JdbcDataSourceConfig setJdbcDriverUrl(String jdbcDriverUrl) {
+ this.jdbcDriverUrl = jdbcDriverUrl;
+ return this;
+ }
+
+ public String getJdbcDriverClass() {
+ return jdbcDriverClass;
+ }
+
+ public JdbcDataSourceConfig setJdbcDriverClass(String jdbcDriverClass) {
+ this.jdbcDriverClass = jdbcDriverClass;
+ return this;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public JdbcDataSourceConfig setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public TJdbcOperation getOp() {
+ return op;
+ }
+
+ public JdbcDataSourceConfig setOp(TJdbcOperation op) {
+ this.op = op;
+ return this;
+ }
+
+ public TOdbcTableType getTableType() {
+ return tableType;
+ }
+
+ public JdbcDataSourceConfig setTableType(TOdbcTableType tableType) {
+ this.tableType = tableType;
+ return this;
+ }
+
+ public int getConnectionPoolMinSize() {
+ return connectionPoolMinSize;
+ }
+
+ public JdbcDataSourceConfig setConnectionPoolMinSize(int
connectionPoolMinSize) {
+ this.connectionPoolMinSize = connectionPoolMinSize;
+ return this;
+ }
+
+ public int getConnectionPoolMaxSize() {
+ return connectionPoolMaxSize;
+ }
+
+ public JdbcDataSourceConfig setConnectionPoolMaxSize(int
connectionPoolMaxSize) {
+ this.connectionPoolMaxSize = connectionPoolMaxSize;
+ return this;
+ }
+
+ public int getConnectionPoolMaxWaitTime() {
+ return connectionPoolMaxWaitTime;
+ }
+
+ public JdbcDataSourceConfig setConnectionPoolMaxWaitTime(int
connectionPoolMaxWaitTime) {
+ this.connectionPoolMaxWaitTime = connectionPoolMaxWaitTime;
+ return this;
+ }
+
+ public int getConnectionPoolMaxLifeTime() {
+ return connectionPoolMaxLifeTime;
+ }
+
+ public JdbcDataSourceConfig setConnectionPoolMaxLifeTime(int
connectionPoolMaxLifeTime) {
+ this.connectionPoolMaxLifeTime = connectionPoolMaxLifeTime;
+ return this;
+ }
+
+ public boolean isConnectionPoolKeepAlive() {
+ return connectionPoolKeepAlive;
+ }
+
+ public JdbcDataSourceConfig setConnectionPoolKeepAlive(boolean
connectionPoolKeepAlive) {
+ this.connectionPoolKeepAlive = connectionPoolKeepAlive;
+ return this;
+ }
+}
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
index 2cd878dfc0d..dd6307ffe1a 100644
---
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
@@ -85,14 +85,9 @@ public class JdbcExecutor {
private int curBlockRows = 0;
private static final byte[] emptyBytes = new byte[0];
private DruidDataSource druidDataSource = null;
- private byte[] druidDataSourceLock = new byte[0];
- private int minPoolSize;
- private int maxPoolSize;
- private int minIdleSize;
- private int maxIdleTime;
- private int maxWaitTime;
- private boolean isKeepAlive;
+ private final byte[] druidDataSourceLock = new byte[0];
private TOdbcTableType tableType;
+ private JdbcDataSourceConfig config;
public JdbcExecutor(byte[] thriftParams) throws Exception {
TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -103,20 +98,23 @@ public class JdbcExecutor {
throw new InternalException(e.getMessage());
}
tableType = request.table_type;
- minPoolSize = Integer.valueOf(System.getProperty("JDBC_MIN_POOL",
"1"));
- maxPoolSize = Integer.valueOf(System.getProperty("JDBC_MAX_POOL",
"100"));
- maxIdleTime = Integer.valueOf(System.getProperty("JDBC_MAX_IDLE_TIME",
"300000"));
- maxWaitTime = Integer.valueOf(System.getProperty("JDBC_MAX_WAIT_TIME",
"5000"));
- isKeepAlive = Boolean.valueOf(System.getProperty("JDBC_KEEP_ALIVE",
"false"));
- minIdleSize = minPoolSize > 0 ? 1 : 0;
- LOG.info("JdbcExecutor set minPoolSize = " + minPoolSize
- + ", maxPoolSize = " + maxPoolSize
- + ", maxIdleTime = " + maxIdleTime
- + ", maxWaitTime = " + maxWaitTime
- + ", minIdleSize = " + minIdleSize
- + ", isKeepAlive = " + isKeepAlive);
- init(request.driver_path, request.statement, request.batch_size,
request.jdbc_driver_class,
- request.jdbc_url, request.jdbc_user, request.jdbc_password,
request.op, request.table_type);
+ this.config = new JdbcDataSourceConfig()
+ .setCatalogId(request.catalog_id)
+ .setJdbcUser(request.jdbc_user)
+ .setJdbcPassword(request.jdbc_password)
+ .setJdbcUrl(request.jdbc_url)
+ .setJdbcDriverUrl(request.driver_path)
+ .setJdbcDriverClass(request.jdbc_driver_class)
+ .setBatchSize(request.batch_size)
+ .setOp(request.op)
+ .setTableType(request.table_type)
+ .setConnectionPoolMinSize(request.connection_pool_min_size)
+ .setConnectionPoolMaxSize(request.connection_pool_max_size)
+
.setConnectionPoolMaxWaitTime(request.connection_pool_max_wait_time)
+
.setConnectionPoolMaxLifeTime(request.connection_pool_max_life_time)
+
.setConnectionPoolKeepAlive(request.connection_pool_keep_alive);
+
JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time);
+ init(config, request.statement);
}
public boolean isNebula() {
@@ -155,11 +153,11 @@ public class JdbcExecutor {
}
}
- if (minIdleSize == 0) {
+ if (config.getConnectionPoolMinSize() == 0) {
// Close and remove the datasource if necessary
if (druidDataSource != null) {
druidDataSource.close();
- JdbcDataSource.getDataSource().getSourcesMap().clear();
+
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
druidDataSource = null;
}
}
@@ -441,19 +439,18 @@ public class JdbcExecutor {
}
}
- private void init(String driverUrl, String sql, int batchSize, String
driverClass, String jdbcUrl, String jdbcUser,
- String jdbcPassword, TJdbcOperation op, TOdbcTableType tableType)
throws UdfRuntimeException {
- String druidDataSourceKey =
JdbcDataSource.getDataSource().createCacheKey(jdbcUrl, jdbcUser, jdbcPassword,
- driverUrl, driverClass);
+ private void init(JdbcDataSourceConfig config, String sql) throws
UdfRuntimeException {
+ String druidDataSourceKey = config.createCacheKey();
try {
if (isNebula()) {
- batchSizeNum = batchSize;
- Class.forName(driverClass);
- conn = DriverManager.getConnection(jdbcUrl, jdbcUser,
jdbcPassword);
+ batchSizeNum = config.getBatchSize();
+ Class.forName(config.getJdbcDriverClass());
+ conn =
DriverManager.getConnection(config.getJdbcDriverClass(), config.getJdbcUser(),
+ config.getJdbcPassword());
stmt = conn.prepareStatement(sql);
} else {
ClassLoader parent = getClass().getClassLoader();
- ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl,
parent);
+ ClassLoader classLoader =
UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent);
druidDataSource =
JdbcDataSource.getDataSource().getSource(druidDataSourceKey);
if (druidDataSource == null) {
synchronized (druidDataSourceLock) {
@@ -462,53 +459,61 @@ public class JdbcExecutor {
long start = System.currentTimeMillis();
DruidDataSource ds = new DruidDataSource();
ds.setDriverClassLoader(classLoader);
- ds.setDriverClassName(driverClass);
- ds.setUrl(jdbcUrl);
- ds.setUsername(jdbcUser);
- ds.setPassword(jdbcPassword);
- ds.setMinIdle(minIdleSize);
- ds.setInitialSize(minPoolSize);
- ds.setMaxActive(maxPoolSize);
- ds.setMaxWait(maxWaitTime);
+ ds.setDriverClassName(config.getJdbcDriverClass());
+ ds.setUrl(config.getJdbcUrl());
+ ds.setUsername(config.getJdbcUser());
+ ds.setPassword(config.getJdbcPassword());
+ ds.setMinIdle(config.getConnectionPoolMinSize());
// default 1
+
ds.setInitialSize(config.getConnectionPoolMinSize()); // default 1
+
ds.setMaxActive(config.getConnectionPoolMaxSize()); // default 10
+
ds.setMaxWait(config.getConnectionPoolMaxWaitTime()); // default 5000
ds.setTestWhileIdle(true);
ds.setTestOnBorrow(false);
- setValidationQuery(ds, tableType);
- ds.setTimeBetweenEvictionRunsMillis(maxIdleTime /
5);
- ds.setMinEvictableIdleTimeMillis(maxIdleTime);
- ds.setKeepAlive(isKeepAlive);
+ setValidationQuery(ds, config.getTableType());
+ // default 3 min
+
ds.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime() /
10L);
+ // default 15 min
+
ds.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime() / 2L);
+ // default 30 min
+
ds.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime());
+
ds.setKeepAlive(config.isConnectionPoolKeepAlive());
+ // default 6 min
+
ds.setKeepAliveBetweenTimeMillis(config.getConnectionPoolMaxLifeTime() / 5L);
druidDataSource = ds;
- // here is a cache of datasource, which using the
string(jdbcUrl + jdbcUser +
- // jdbcPassword) as key.
- // and the default datasource init = 1, min = 1,
max = 100, if one of connection idle
- // time greater than 10 minutes. then connection
will be retrieved.
JdbcDataSource.getDataSource().putSource(druidDataSourceKey, ds);
- LOG.info("init datasource [" + (jdbcUrl +
jdbcUser) + "] cost: " + (
+ LOG.info("JdbcClient set"
+ + " ConnectionPoolMinSize = " +
config.getConnectionPoolMinSize()
+ + ", ConnectionPoolMaxSize = " +
config.getConnectionPoolMaxSize()
+ + ", ConnectionPoolMaxWaitTime = " +
config.getConnectionPoolMaxWaitTime()
+ + ", ConnectionPoolMaxLifeTime = " +
config.getConnectionPoolMaxLifeTime()
+ + ", ConnectionPoolKeepAlive = " +
config.isConnectionPoolKeepAlive());
+ LOG.info("init datasource [" +
(config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + (
System.currentTimeMillis() - start) + "
ms");
}
}
}
-
long start = System.currentTimeMillis();
conn = druidDataSource.getConnection();
- LOG.info("get connection [" + (jdbcUrl + jdbcUser) + "] cost:
" + (System.currentTimeMillis() - start)
- + " ms");
- if (op == TJdbcOperation.READ) {
+ LOG.info("get connection [" + (config.getJdbcUrl() +
config.getJdbcUser()) + "] cost: " + (
+ System.currentTimeMillis() - start)
+ + " ms");
+ if (config.getOp() == TJdbcOperation.READ) {
conn.setAutoCommit(false);
Preconditions.checkArgument(sql != null);
stmt = conn.prepareStatement(sql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
if (tableType == TOdbcTableType.MYSQL) {
stmt.setFetchSize(Integer.MIN_VALUE);
} else {
- stmt.setFetchSize(batchSize);
+ stmt.setFetchSize(config.getBatchSize());
}
- batchSizeNum = batchSize;
+ batchSizeNum = config.getBatchSize();
} else {
LOG.info("insert sql: " + sql);
preparedStatement = conn.prepareStatement(sql);
}
}
} catch (MalformedURLException e) {
- throw new UdfRuntimeException("MalformedURLException to load class
about " + driverUrl, e);
+ throw new UdfRuntimeException("MalformedURLException to load class
about " + config.getJdbcDriverUrl(), e);
} catch (SQLException e) {
throw new UdfRuntimeException("Initialize datasource failed: ", e);
} catch (FileNotFoundException e) {
@@ -2213,4 +2218,3 @@ public class JdbcExecutor {
return i;
}
}
-
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index eacefe71549..7dda751e8f6 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -140,26 +140,6 @@ public class Config extends ConfigBase {
+ "if the specified driver file path is not an absolute
path, Doris will find jars from this path"})
public static String jdbc_drivers_dir = System.getenv("DORIS_HOME") +
"/jdbc_drivers";
- @ConfField(description = {"JDBC Catalog FE 连接池的最小连接数",
- "The minimum number of JDBC Catalog FE connection pool"})
- public static int jdbc_min_pool_size = 1;
-
- @ConfField(description = {"JDBC Catalog FE 连接池的最大连接数",
- "The maximum number of JDBC Catalog FE connection pool"})
- public static int jdbc_max_pool_size = 100;
-
- @ConfField(description = {"JDBC Catalog FE 连接池的最大空闲连接时间",
- "The maximum idle time of JDBC Catalog FE connection pool"})
- public static int jdbc_max_idle_time = 300000;
-
- @ConfField(description = {"JDBC Catalog FE 连接池的最大等待时间",
- "The maximum wait time of JDBC Catalog FE connection pool"})
- public static int jdbc_max_wait_time = 5000;
-
- @ConfField(description = {"JDBC Catalog FE 连接池的 KeepAlive 策略",
- "The keep alive strategy of JDBC Catalog FE connection pool"})
- public static boolean jdbc_keep_alive = false;
-
@ConfField(mutable = true, masterOnly = true, description = {"broker load
时,单个节点上 load 执行计划的默认并行度",
"The default parallelism of the load execution plan on a single
node when the broker load is submitted"})
public static int default_load_parallelism = 1;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
index 999b5c4d34f..c5b66e13f48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
@@ -40,12 +40,14 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.Map;
/**
* External JDBC Catalog resource for external table query.
- *
+ * <p>
* create external resource jdbc_mysql
* properties (
* "type"="jdbc",
@@ -55,7 +57,7 @@ import java.util.Map;
* "driver_url"="http://127.0.0.1:8888/mysql-connector-java-5.1.47.jar",
* "driver_class"="com.mysql.jdbc.Driver"
* );
- *
+ * <p>
* DROP RESOURCE "jdbc_mysql";
*/
public class JdbcResource extends Resource {
@@ -94,7 +96,13 @@ public class JdbcResource extends Resource {
public static final String TYPE = "type";
public static final String ONLY_SPECIFIED_DATABASE =
"only_specified_database";
public static final String LOWER_CASE_TABLE_NAMES =
"lower_case_table_names";
+ public static final String CONNECTION_POOL_MIN_SIZE =
"connection_pool_min_size";
+ public static final String CONNECTION_POOL_MAX_SIZE =
"connection_pool_max_size";
+ public static final String CONNECTION_POOL_MAX_WAIT_TIME =
"connection_pool_max_wait_time";
+ public static final String CONNECTION_POOL_MAX_LIFE_TIME =
"connection_pool_max_life_time";
+ public static final String CONNECTION_POOL_KEEP_ALIVE =
"connection_pool_keep_alive";
public static final String CHECK_SUM = "checksum";
+ public static final String CREATE_TIME = "create_time";
private static final ImmutableList<String> ALL_PROPERTIES = new
ImmutableList.Builder<String>().add(
JDBC_URL,
USER,
@@ -102,16 +110,27 @@ public class JdbcResource extends Resource {
DRIVER_CLASS,
DRIVER_URL,
TYPE,
+ CREATE_TIME,
ONLY_SPECIFIED_DATABASE,
LOWER_CASE_TABLE_NAMES,
INCLUDE_DATABASE_LIST,
- EXCLUDE_DATABASE_LIST
+ EXCLUDE_DATABASE_LIST,
+ CONNECTION_POOL_MIN_SIZE,
+ CONNECTION_POOL_MAX_SIZE,
+ CONNECTION_POOL_MAX_LIFE_TIME,
+ CONNECTION_POOL_MAX_WAIT_TIME,
+ CONNECTION_POOL_KEEP_ALIVE
).build();
private static final ImmutableList<String> OPTIONAL_PROPERTIES = new
ImmutableList.Builder<String>().add(
ONLY_SPECIFIED_DATABASE,
LOWER_CASE_TABLE_NAMES,
INCLUDE_DATABASE_LIST,
- EXCLUDE_DATABASE_LIST
+ EXCLUDE_DATABASE_LIST,
+ CONNECTION_POOL_MIN_SIZE,
+ CONNECTION_POOL_MAX_SIZE,
+ CONNECTION_POOL_MAX_LIFE_TIME,
+ CONNECTION_POOL_MAX_WAIT_TIME,
+ CONNECTION_POOL_KEEP_ALIVE
).build();
// The default value of optional properties
@@ -123,6 +142,11 @@ public class JdbcResource extends Resource {
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, "");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, "");
+ OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MIN_SIZE, "1");
+ OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_SIZE, "10");
+ OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME,
"1800000");
+ OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME,
"5000");
+ OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE,
"false");
}
// timeout for both connection and read. 10 seconds is long enough.
@@ -138,7 +162,7 @@ public class JdbcResource extends Resource {
this(name, Maps.newHashMap());
}
- private JdbcResource(String name, Map<String, String> configs) {
+ public JdbcResource(String name, Map<String, String> configs) {
super(name, ResourceType.JDBC);
this.configs = configs;
}
@@ -168,13 +192,11 @@ public class JdbcResource extends Resource {
@Override
protected void setProperties(Map<String, String> properties) throws
DdlException {
Preconditions.checkState(properties != null);
- for (String key : properties.keySet()) {
- if (!ALL_PROPERTIES.contains(key)) {
- throw new DdlException("JDBC resource Property of " + key + "
is unknown");
- }
- }
+ validateProperties(properties);
configs = properties;
- handleOptionalArguments();
+ applyDefaultProperties();
+ String currentDateTime =
LocalDateTime.now(ZoneId.systemDefault()).toString().replace("T", " ");
+ configs.put(CREATE_TIME, currentDateTime);
// check properties
for (String property : ALL_PROPERTIES) {
String value = configs.get(property);
@@ -190,7 +212,9 @@ public class JdbcResource extends Resource {
* This function used to handle optional arguments
* eg: only_specified_database、lower_case_table_names
*/
- private void handleOptionalArguments() {
+
+ @Override
+ public void applyDefaultProperties() {
for (String s : OPTIONAL_PROPERTIES) {
if (!configs.containsKey(s)) {
configs.put(s, OPTIONAL_PROPERTIES_DEFAULT_VALUE.get(s));
@@ -384,4 +408,56 @@ public class JdbcResource extends Resource {
}
}
+ public static String getDefaultPropertyValue(String propertyName) {
+ return OPTIONAL_PROPERTIES_DEFAULT_VALUE.getOrDefault(propertyName,
"");
+ }
+
+ public static void validateProperties(Map<String, String> properties)
throws DdlException {
+ for (String key : properties.keySet()) {
+ if (!ALL_PROPERTIES.contains(key)) {
+ throw new DdlException("JDBC resource Property of " + key + "
is unknown");
+ }
+ }
+ }
+
+ public static void checkBooleanProperty(String propertyName, String
propertyValue) throws DdlException {
+ if (!propertyValue.equalsIgnoreCase("true") &&
!propertyValue.equalsIgnoreCase("false")) {
+ throw new DdlException(propertyName + " must be true or false");
+ }
+ }
+
+ public static void checkDatabaseListProperties(String
onlySpecifiedDatabase,
+ Map<String, Boolean> includeDatabaseList, Map<String, Boolean>
excludeDatabaseList) throws DdlException {
+ if (!onlySpecifiedDatabase.equalsIgnoreCase("true")) {
+ if ((includeDatabaseList != null &&
!includeDatabaseList.isEmpty()) || (excludeDatabaseList != null
+ && !excludeDatabaseList.isEmpty())) {
+ throw new DdlException(
+ "include_database_list and exclude_database_list "
+ + "cannot be set when only_specified_database
is false");
+ }
+ }
+ }
+
+ public static void checkConnectionPoolProperties(int minSize, int maxSize,
int maxWaitTime, int maxLifeTime)
+ throws DdlException {
+ if (minSize < 0) {
+ throw new DdlException("connection_pool_min_size must be greater
than or equal to 0");
+ }
+ if (maxSize < 1) {
+ throw new DdlException("connection_pool_max_size must be greater
than or equal to 1");
+ }
+ if (maxSize < minSize) {
+ throw new DdlException(
+ "connection_pool_max_size must be greater than or equal to
connection_pool_min_size");
+ }
+ if (maxWaitTime < 0) {
+ throw new DdlException("connection_pool_max_wait_time must be
greater than or equal to 0");
+ }
+ if (maxWaitTime > 30000) {
+ throw new DdlException("connection_pool_max_wait_time must be less
than or equal to 30000");
+ }
+ if (maxLifeTime < 150000) {
+ throw new DdlException("connection_pool_max_life_time must be
greater than or equal to 150000");
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
index 795f00e7bfe..511d67c91b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
@@ -47,6 +47,7 @@ import java.util.stream.Collectors;
public class JdbcTable extends Table {
private static final Logger LOG = LogManager.getLogger(JdbcTable.class);
+ private static final String CATALOG_ID = "catalog_id";
private static final String TABLE = "table";
private static final String REAL_DATABASE = "real_database";
private static final String REAL_TABLE = "real_table";
@@ -75,6 +76,14 @@ public class JdbcTable extends Table {
private String driverUrl;
private String checkSum;
+ private long catalogId = -1;
+
+ private int connectionPoolMinSize;
+ private int connectionPoolMaxSize;
+ private int connectionPoolMaxWaitTime;
+ private int connectionPoolMaxLifeTime;
+ private boolean connectionPoolKeepAlive;
+
static {
Map<String, TOdbcTableType> tempMap = new CaseInsensitiveMap();
tempMap.put("nebula", TOdbcTableType.NEBULA);
@@ -157,6 +166,35 @@ public class JdbcTable extends Table {
return getFromJdbcResourceOrDefault(JdbcResource.DRIVER_URL,
driverUrl);
}
+ public long getCatalogId() {
+ return catalogId;
+ }
+
+ public int getConnectionPoolMinSize() {
+ return
Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MIN_SIZE,
+ String.valueOf(connectionPoolMinSize)));
+ }
+
+ public int getConnectionPoolMaxSize() {
+ return
Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_SIZE,
+ String.valueOf(connectionPoolMaxSize)));
+ }
+
+ public int getConnectionPoolMaxWaitTime() {
+ return
Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME,
+ String.valueOf(connectionPoolMaxWaitTime)));
+ }
+
+ public int getConnectionPoolMaxLifeTime() {
+ return
Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME,
+ String.valueOf(connectionPoolMaxLifeTime)));
+ }
+
+ public boolean isConnectionPoolKeepAlive() {
+ return
Boolean.parseBoolean(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_KEEP_ALIVE,
+ String.valueOf(connectionPoolKeepAlive)));
+ }
+
private String getFromJdbcResourceOrDefault(String key, String defaultVal)
{
if (Strings.isNullOrEmpty(resourceName)) {
return defaultVal;
@@ -171,6 +209,7 @@ public class JdbcTable extends Table {
@Override
public TTableDescriptor toThrift() {
TJdbcTable tJdbcTable = new TJdbcTable();
+ tJdbcTable.setCatalogId(catalogId);
tJdbcTable.setJdbcUrl(getJdbcUrl());
tJdbcTable.setJdbcUser(getJdbcUser());
tJdbcTable.setJdbcPassword(getJdbcPasswd());
@@ -179,6 +218,11 @@ public class JdbcTable extends Table {
tJdbcTable.setJdbcDriverUrl(getDriverUrl());
tJdbcTable.setJdbcResourceName(resourceName);
tJdbcTable.setJdbcDriverChecksum(checkSum);
+ tJdbcTable.setConnectionPoolMinSize(getConnectionPoolMinSize());
+ tJdbcTable.setConnectionPoolMaxSize(getConnectionPoolMaxSize());
+
tJdbcTable.setConnectionPoolMaxWaitTime(getConnectionPoolMaxWaitTime());
+
tJdbcTable.setConnectionPoolMaxLifeTime(getConnectionPoolMaxLifeTime());
+ tJdbcTable.setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.JDBC_TABLE, fullSchema.size(), 0,
getName(), "");
tTableDescriptor.setJdbcTable(tJdbcTable);
@@ -189,6 +233,7 @@ public class JdbcTable extends Table {
public void write(DataOutput out) throws IOException {
super.write(out);
Map<String, String> serializeMap = Maps.newHashMap();
+ serializeMap.put(CATALOG_ID, String.valueOf(catalogId));
serializeMap.put(TABLE, externalTableName);
serializeMap.put(RESOURCE, resourceName);
serializeMap.put(TABLE_TYPE, jdbcTypeName);
@@ -225,6 +270,7 @@ public class JdbcTable extends Table {
String value = Text.readString(in);
serializeMap.put(key, value);
}
+ catalogId = serializeMap.get(CATALOG_ID) != null ?
Long.parseLong(serializeMap.get(CATALOG_ID)) : -1;
externalTableName = serializeMap.get(TABLE);
resourceName = serializeMap.get(RESOURCE);
jdbcTypeName = serializeMap.get(TABLE_TYPE);
@@ -340,6 +386,14 @@ public class JdbcTable extends Table {
driverClass = jdbcResource.getProperty(DRIVER_CLASS);
driverUrl = jdbcResource.getProperty(DRIVER_URL);
checkSum = jdbcResource.getProperty(CHECK_SUM);
+ connectionPoolMinSize =
Integer.parseInt(jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MIN_SIZE));
+ connectionPoolMaxSize =
Integer.parseInt(jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_SIZE));
+ connectionPoolMaxWaitTime = Integer.parseInt(
+
jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME));
+ connectionPoolMaxLifeTime = Integer.parseInt(
+
jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME));
+ connectionPoolKeepAlive = Boolean.parseBoolean(
+
jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_KEEP_ALIVE));
String urlType = jdbcUrl.split(":")[1];
if (!jdbcTypeName.equalsIgnoreCase(urlType)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
index 781c5fb3f61..1a51d42f57a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
@@ -296,4 +296,6 @@ public abstract class Resource implements Writable,
GsonPostProcessable {
}
});
}
+
+ public void applyDefaultProperties() {}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
index 5696f1cd258..5718f8af07e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
@@ -95,6 +95,7 @@ public class ResourceMgr implements Writable {
}
public void replayCreateResource(Resource resource) {
+ resource.applyDefaultProperties();
nameToResource.put(resource.getName(), resource);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
index a02c59080fc..05023591bfd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
@@ -87,6 +87,7 @@ public class JdbcExternalTable extends ExternalTable {
JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog;
String fullDbName = this.dbName + "." + this.name;
JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema,
TableType.JDBC_EXTERNAL_TABLE);
+ jdbcTable.setCatalogId(jdbcCatalog.getId());
jdbcTable.setExternalTableName(fullDbName);
jdbcTable.setRealDatabaseName(((JdbcExternalCatalog)
catalog).getJdbcClient().getRealDatabaseName(this.dbName));
jdbcTable.setRealTableName(
@@ -99,6 +100,11 @@ public class JdbcExternalTable extends ExternalTable {
jdbcTable.setDriverUrl(jdbcCatalog.getDriverUrl());
jdbcTable.setResourceName(jdbcCatalog.getResource());
jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
+
jdbcTable.setConnectionPoolMinSize(jdbcCatalog.getConnectionPoolMinSize());
+
jdbcTable.setConnectionPoolMaxSize(jdbcCatalog.getConnectionPoolMaxSize());
+
jdbcTable.setConnectionPoolMaxLifeTime(jdbcCatalog.getConnectionPoolMaxLifeTime());
+
jdbcTable.setConnectionPoolMaxWaitTime(jdbcCatalog.getConnectionPoolMaxWaitTime());
+
jdbcTable.setConnectionPoolKeepAlive(jdbcCatalog.isConnectionPoolKeepAlive());
return jdbcTable;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index daeb5feed73..c8dfb7dfe4d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -65,6 +65,17 @@ public class JdbcExternalCatalog extends ExternalCatalog {
throw new DdlException("Required property '" +
requiredProperty + "' is missing");
}
}
+
+ Map<String, String> propertiesWithoutCheckSum =
Maps.newHashMap(catalogProperty.getProperties());
+ propertiesWithoutCheckSum.remove(JdbcResource.CHECK_SUM);
+ JdbcResource.validateProperties(propertiesWithoutCheckSum);
+
+
JdbcResource.checkBooleanProperty(JdbcResource.ONLY_SPECIFIED_DATABASE,
getOnlySpecifiedDatabase());
+ JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_TABLE_NAMES,
getLowerCaseTableNames());
+ JdbcResource.checkDatabaseListProperties(getOnlySpecifiedDatabase(),
getIncludeDatabaseMap(),
+ getExcludeDatabaseMap());
+ JdbcResource.checkConnectionPoolProperties(getConnectionPoolMinSize(),
getConnectionPoolMaxSize(),
+ getConnectionPoolMaxWaitTime(),
getConnectionPoolMaxLifeTime());
}
@Override
@@ -130,7 +141,8 @@ public class JdbcExternalCatalog extends ExternalCatalog {
}
public String getOnlySpecifiedDatabase() {
- return
catalogProperty.getOrDefault(JdbcResource.ONLY_SPECIFIED_DATABASE, "false");
+ return
catalogProperty.getOrDefault(JdbcResource.ONLY_SPECIFIED_DATABASE,
JdbcResource.getDefaultPropertyValue(
+ JdbcResource.ONLY_SPECIFIED_DATABASE));
}
public String getLowerCaseTableNames() {
@@ -140,7 +152,33 @@ public class JdbcExternalCatalog extends ExternalCatalog {
}
// Otherwise, it defaults to false
- return
catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, "false");
+ return
catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES,
JdbcResource.getDefaultPropertyValue(
+ JdbcResource.LOWER_CASE_TABLE_NAMES));
+ }
+
+ public int getConnectionPoolMinSize() {
+ return
Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MIN_SIZE,
JdbcResource
+
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MIN_SIZE)));
+ }
+
+ public int getConnectionPoolMaxSize() {
+ return
Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_SIZE,
JdbcResource
+
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_SIZE)));
+ }
+
+ public int getConnectionPoolMaxWaitTime() {
+ return
Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME,
JdbcResource
+
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME)));
+ }
+
+ public int getConnectionPoolMaxLifeTime() {
+ return
Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME,
JdbcResource
+
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME)));
+ }
+
+ public boolean isConnectionPoolKeepAlive() {
+ return
Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_KEEP_ALIVE,
JdbcResource
+
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE)));
}
@Override
@@ -155,7 +193,12 @@ public class JdbcExternalCatalog extends ExternalCatalog {
.setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
.setIsLowerCaseTableNames(getLowerCaseTableNames())
.setIncludeDatabaseMap(getIncludeDatabaseMap())
- .setExcludeDatabaseMap(getExcludeDatabaseMap());
+ .setExcludeDatabaseMap(getExcludeDatabaseMap())
+ .setConnectionPoolMinSize(getConnectionPoolMinSize())
+ .setConnectionPoolMaxSize(getConnectionPoolMaxSize())
+ .setConnectionPoolMaxLifeTime(getConnectionPoolMaxLifeTime())
+ .setConnectionPoolMaxWaitTime(getConnectionPoolMaxWaitTime())
+ .setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());
jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig);
}
@@ -193,22 +236,5 @@ public class JdbcExternalCatalog extends ExternalCatalog {
properties.put(JdbcResource.CHECK_SUM,
JdbcResource.computeObjectChecksum(properties.get(JdbcResource.DRIVER_URL)));
}
- String onlySpecifiedDatabase = getOnlySpecifiedDatabase();
- if (!onlySpecifiedDatabase.equalsIgnoreCase("true") &&
!onlySpecifiedDatabase.equalsIgnoreCase("false")) {
- throw new DdlException("only_specified_database must be true or
false");
- }
- String lowerCaseTableNames = getLowerCaseTableNames();
- if (!lowerCaseTableNames.equalsIgnoreCase("true") &&
!lowerCaseTableNames.equalsIgnoreCase("false")) {
- throw new DdlException("lower_case_table_names must be true or
false");
- }
- if (!onlySpecifiedDatabase.equalsIgnoreCase("true")) {
- Map<String, Boolean> includeDatabaseList = getIncludeDatabaseMap();
- Map<String, Boolean> excludeDatabaseList = getExcludeDatabaseMap();
- if ((includeDatabaseList != null && !includeDatabaseList.isEmpty())
- || (excludeDatabaseList != null &&
!excludeDatabaseList.isEmpty())) {
- throw new DdlException("include_database_list and
exclude_database_list can not be set when "
- + "only_specified_database is false");
- }
- }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
index 796d0db64fb..60daac66b6d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.Util;
@@ -109,17 +108,16 @@ public abstract class JdbcClient {
Optional.ofNullable(jdbcClientConfig.getExcludeDatabaseMap()).orElse(Collections.emptyMap());
String jdbcUrl = jdbcClientConfig.getJdbcUrl();
this.dbType = parseDbType(jdbcUrl);
- initializeDataSource(jdbcClientConfig.getPassword(), jdbcUrl,
jdbcClientConfig.getDriverUrl(),
- jdbcClientConfig.getDriverClass());
+ initializeDataSource(jdbcClientConfig);
}
// Initialize DruidDataSource
- private void initializeDataSource(String password, String jdbcUrl, String
driverUrl, String driverClass) {
+ private void initializeDataSource(JdbcClientConfig config) {
ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
try {
// TODO(ftw): The problem here is that the jar package is handled
by FE
// and URLClassLoader may load the jar package directly into
memory
- URL[] urls = {new URL(JdbcResource.getFullDriverUrl(driverUrl))};
+ URL[] urls = {new
URL(JdbcResource.getFullDriverUrl(config.getDriverUrl()))};
// set parent ClassLoader to null, we can achieve class loading
isolation.
ClassLoader parent = getClass().getClassLoader();
ClassLoader classLoader = URLClassLoader.newInstance(urls, parent);
@@ -128,24 +126,29 @@ public abstract class JdbcClient {
Thread.currentThread().setContextClassLoader(classLoader);
dataSource = new DruidDataSource();
dataSource.setDriverClassLoader(classLoader);
- dataSource.setDriverClassName(driverClass);
- dataSource.setUrl(jdbcUrl);
- dataSource.setUsername(jdbcUser);
- dataSource.setPassword(password);
- dataSource.setMinIdle(Config.jdbc_min_pool_size > 0 ? 1 : 0);
- dataSource.setInitialSize(Config.jdbc_min_pool_size);
- dataSource.setMaxActive(Config.jdbc_max_pool_size);
-
dataSource.setTimeBetweenEvictionRunsMillis(Config.jdbc_max_idle_time * 2L);
-
dataSource.setMinEvictableIdleTimeMillis(Config.jdbc_max_idle_time);
+ dataSource.setDriverClassName(config.getDriverClass());
+ dataSource.setUrl(config.getJdbcUrl());
+ dataSource.setUsername(config.getUser());
+ dataSource.setPassword(config.getPassword());
+ dataSource.setMinIdle(config.getConnectionPoolMinSize()); //
default 1
+ dataSource.setInitialSize(config.getConnectionPoolMinSize()); //
default 1
+ dataSource.setMaxActive(config.getConnectionPoolMaxSize()); //
default 10
// set connection timeout to 5s.
// The default is 30s, which is too long.
// Because when querying information_schema db, BE will call
thrift rpc(default timeout is 30s)
// to FE to get schema info, and may create connection here, if we
set it too long and the url is invalid,
// it may cause the thrift rpc timeout.
- dataSource.setMaxWait(Config.jdbc_max_wait_time);
- dataSource.setKeepAlive(Config.jdbc_keep_alive);
+ dataSource.setMaxWait(config.getConnectionPoolMaxWaitTime()); //
default 5000
+
dataSource.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime()
/ 10L); // default 3 min
+
dataSource.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime()
/ 2L); // default 15 min
+
dataSource.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime());
// default 30 min
+ LOG.info("JdbcClient set"
+ + " ConnectionPoolMinSize = " +
config.getConnectionPoolMinSize()
+ + ", ConnectionPoolMaxSize = " +
config.getConnectionPoolMaxSize()
+ + ", ConnectionPoolMaxWaitTime = " +
config.getConnectionPoolMaxWaitTime()
+ + ", ConnectionPoolMaxLifeTime = " +
config.getConnectionPoolMaxLifeTime());
} catch (MalformedURLException e) {
- throw new JdbcClientException("MalformedURLException to load class
about " + driverUrl, e);
+ throw new JdbcClientException("MalformedURLException to load class
about " + config.getDriverUrl(), e);
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
index 69dedd16d7b..4e8fa63ee33 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
@@ -18,6 +18,11 @@
package org.apache.doris.datasource.jdbc.client;
+
+import org.apache.doris.catalog.JdbcResource;
+
+import com.google.common.collect.Maps;
+
import java.util.Map;
public class JdbcClientConfig {
@@ -31,6 +36,28 @@ public class JdbcClientConfig {
private String isLowerCaseTableNames;
private Map<String, Boolean> includeDatabaseMap;
private Map<String, Boolean> excludeDatabaseMap;
+ private int connectionPoolMinSize;
+ private int connectionPoolMaxSize;
+ private int connectionPoolMaxWaitTime;
+ private int connectionPoolMaxLifeTime;
+ private boolean connectionPoolKeepAlive;
+
+ public JdbcClientConfig() {
+ this.onlySpecifiedDatabase =
JdbcResource.getDefaultPropertyValue(JdbcResource.ONLY_SPECIFIED_DATABASE);
+ this.isLowerCaseTableNames =
JdbcResource.getDefaultPropertyValue(JdbcResource.LOWER_CASE_TABLE_NAMES);
+ this.connectionPoolMinSize = Integer.parseInt(
+
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MIN_SIZE));
+ this.connectionPoolMaxSize = Integer.parseInt(
+
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_SIZE));
+ this.connectionPoolMaxWaitTime = Integer.parseInt(
+
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME));
+ this.connectionPoolMaxLifeTime = Integer.parseInt(
+
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME));
+ this.connectionPoolKeepAlive = Boolean.parseBoolean(
+
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE));
+ this.includeDatabaseMap = Maps.newHashMap();
+ this.excludeDatabaseMap = Maps.newHashMap();
+ }
public String getCatalog() {
return catalog;
@@ -104,6 +131,51 @@ public class JdbcClientConfig {
return this;
}
+ public int getConnectionPoolMinSize() {
+ return connectionPoolMinSize;
+ }
+
+ public JdbcClientConfig setConnectionPoolMinSize(int
connectionPoolMinSize) {
+ this.connectionPoolMinSize = connectionPoolMinSize;
+ return this;
+ }
+
+ public int getConnectionPoolMaxSize() {
+ return connectionPoolMaxSize;
+ }
+
+ public JdbcClientConfig setConnectionPoolMaxSize(int
connectionPoolMaxSize) {
+ this.connectionPoolMaxSize = connectionPoolMaxSize;
+ return this;
+ }
+
+ public int getConnectionPoolMaxLifeTime() {
+ return connectionPoolMaxLifeTime;
+ }
+
+ public JdbcClientConfig setConnectionPoolMaxLifeTime(int
connectionPoolMaxLifeTime) {
+ this.connectionPoolMaxLifeTime = connectionPoolMaxLifeTime;
+ return this;
+ }
+
+ public int getConnectionPoolMaxWaitTime() {
+ return connectionPoolMaxWaitTime;
+ }
+
+ public JdbcClientConfig setConnectionPoolMaxWaitTime(int
connectionPoolMaxWaitTime) {
+ this.connectionPoolMaxWaitTime = connectionPoolMaxWaitTime;
+ return this;
+ }
+
+ public boolean isConnectionPoolKeepAlive() {
+ return connectionPoolKeepAlive;
+ }
+
+ public JdbcClientConfig setConnectionPoolKeepAlive(boolean
connectionPoolKeepAlive) {
+ this.connectionPoolKeepAlive = connectionPoolKeepAlive;
+ return this;
+ }
+
public Map<String, Boolean> getIncludeDatabaseMap() {
return includeDatabaseMap;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
index 1f85ac3feff..f55821953e2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
@@ -37,30 +37,19 @@ import java.util.List;
public class JdbcTableSink extends DataSink {
private static final Logger LOG =
LogManager.getLogger(JdbcTableSink.class);
- private final String resourceName;
private final String externalTableName;
private final String dorisTableName;
- private final String jdbcUrl;
- private final String jdbcUser;
- private final String jdbcPasswd;
- private final String driverClass;
- private final String driverUrl;
- private final String checkSum;
private final TOdbcTableType jdbcType;
private final boolean useTransaction;
private String insertSql;
+ private JdbcTable jdbcTable;
+
public JdbcTableSink(JdbcTable jdbcTable, List<String> insertCols) {
- resourceName = jdbcTable.getResourceName();
+ this.jdbcTable = jdbcTable;
jdbcType = jdbcTable.getJdbcTableType();
externalTableName = jdbcTable.getProperRealFullTableName(jdbcType);
useTransaction =
ConnectContext.get().getSessionVariable().isEnableOdbcTransaction();
- jdbcUrl = jdbcTable.getJdbcUrl();
- jdbcUser = jdbcTable.getJdbcUser();
- jdbcPasswd = jdbcTable.getJdbcPasswd();
- driverClass = jdbcTable.getDriverClass();
- driverUrl = jdbcTable.getDriverUrl();
- checkSum = jdbcTable.getCheckSum();
dorisTableName = jdbcTable.getName();
insertSql = jdbcTable.getInsertSql(insertCols);
}
@@ -81,20 +70,11 @@ public class JdbcTableSink extends DataSink {
protected TDataSink toThrift() {
TDataSink tDataSink = new TDataSink(TDataSinkType.JDBC_TABLE_SINK);
TJdbcTableSink jdbcTableSink = new TJdbcTableSink();
- TJdbcTable jdbcTable = new TJdbcTable();
+ TJdbcTable jdbcTable = this.jdbcTable.toThrift().getJdbcTable();
jdbcTableSink.setJdbcTable(jdbcTable);
- jdbcTableSink.jdbc_table.setJdbcUrl(jdbcUrl);
- jdbcTableSink.jdbc_table.setJdbcUser(jdbcUser);
- jdbcTableSink.jdbc_table.setJdbcPassword(jdbcPasswd);
- jdbcTableSink.jdbc_table.setJdbcTableName(externalTableName);
- jdbcTableSink.jdbc_table.setJdbcDriverUrl(driverUrl);
- jdbcTableSink.jdbc_table.setJdbcDriverClass(driverClass);
- jdbcTableSink.jdbc_table.setJdbcDriverChecksum(checkSum);
- jdbcTableSink.jdbc_table.setJdbcResourceName(resourceName);
jdbcTableSink.setInsertSql(insertSql);
jdbcTableSink.setUseTransaction(useTransaction);
jdbcTableSink.setTableType(jdbcType);
-
tDataSink.setJdbcTableSink(jdbcTableSink);
return tDataSink;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
index e91898fb82d..ee0d1949be2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
@@ -17,13 +17,154 @@
package org.apache.doris.catalog;
+import org.apache.doris.analysis.AccessTestUtil;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.AccessControllerManager;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import java.util.Map;
+
public class JdbcResourceTest {
+ private final ResourceMgr resourceMgr = new ResourceMgr();
+
+ private Map<String, String> jdbcProperties;
+
+ private Analyzer analyzer;
+
+ @Before
+ public void setUp() {
+ FeConstants.runningUnitTest = true;
+ analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
+ jdbcProperties = Maps.newHashMap();
+ jdbcProperties.put("type", "jdbc");
+ jdbcProperties.put("user", "postgres");
+ jdbcProperties.put("password", "");
+ jdbcProperties.put("jdbc_url",
"jdbc:postgresql://127.0.0.1:5432/postgres?currentSchema=doris_test");
+ jdbcProperties.put("driver_url", "postgresql-42.5.0.jar");
+ jdbcProperties.put("driver_class", "org.postgresql.Driver");
+ jdbcProperties.put("checksum", "20c8228267b6c9ce620fddb39467d3eb");
+ }
+
+ @Test
+ public void testJdbcResourceCreateWithDefaultProperties(@Mocked Env env,
+ @Injectable AccessControllerManager accessManager)
+ throws UserException {
+ new Expectations() {
+ {
+ env.getAccessManager();
+ result = accessManager;
+ accessManager.checkGlobalPriv((ConnectContext) any,
PrivPredicate.ADMIN);
+ result = true;
+ }
+ };
+
+ jdbcProperties.remove("checksum");
+
+ CreateResourceStmt stmt = new CreateResourceStmt(true, false,
"jdbc_resource_pg_14", jdbcProperties);
+
+ stmt.analyze(analyzer);
+
+ resourceMgr.createResource(stmt);
+
+ JdbcResource jdbcResource = (JdbcResource)
resourceMgr.getResource("jdbc_resource_pg_14");
+
+
+ // Verify the default properties were applied during the replay
+ Map<String, String> properties = jdbcResource.getCopiedProperties();
+ Assert.assertEquals("1", properties.get("connection_pool_min_size"));
+ Assert.assertEquals("10", properties.get("connection_pool_max_size"));
+ Assert.assertEquals("1800000",
properties.get("connection_pool_max_life_time"));
+ Assert.assertEquals("5000",
properties.get("connection_pool_max_wait_time"));
+ Assert.assertEquals("false",
properties.get("connection_pool_keep_alive"));
+ }
+
+ @Test
+ public void testJdbcResourceReplayWithDefaultProperties() {
+
+ JdbcResource jdbcResource = new JdbcResource("jdbc_resource_pg_14",
jdbcProperties);
+
+ // Replay the resource creation to simulate the edit log replay
+ resourceMgr.replayCreateResource(jdbcResource);
+
+ // Retrieve the replayed resource
+ Resource replayedResource =
resourceMgr.getResource("jdbc_resource_pg_14");
+
+ Assert.assertNotNull(replayedResource);
+ Assert.assertTrue(replayedResource instanceof JdbcResource);
+
+ // Verify the default properties were applied during the replay
+ Map<String, String> properties =
replayedResource.getCopiedProperties();
+ Assert.assertEquals("1", properties.get("connection_pool_min_size"));
+ Assert.assertEquals("10", properties.get("connection_pool_max_size"));
+ Assert.assertEquals("1800000",
properties.get("connection_pool_max_life_time"));
+ Assert.assertEquals("5000",
properties.get("connection_pool_max_wait_time"));
+ Assert.assertEquals("false",
properties.get("connection_pool_keep_alive"));
+ }
+
+ @Test
+ public void testJdbcResourceReplayWithSetProperties() {
+
+ // Add some properties to the JDBC properties
+ jdbcProperties.put("connection_pool_min_size", "2");
+ jdbcProperties.put("connection_pool_max_size", "20");
+ jdbcProperties.put("connection_pool_max_life_time", "3600000");
+ jdbcProperties.put("connection_pool_max_wait_time", "10000");
+ jdbcProperties.put("connection_pool_keep_alive", "true");
+
+ JdbcResource jdbcResource = new JdbcResource("jdbc_resource_pg_14",
jdbcProperties);
+
+ // Replay the resource creation to simulate the edit log replay
+ resourceMgr.replayCreateResource(jdbcResource);
+
+ // Retrieve the replayed resource
+ Resource replayedResource =
resourceMgr.getResource("jdbc_resource_pg_14");
+
+ Assert.assertNotNull(replayedResource);
+ Assert.assertTrue(replayedResource instanceof JdbcResource);
+
+ // Verify the modified properties were applied during the replay
+ Map<String, String> properties =
replayedResource.getCopiedProperties();
+ Assert.assertEquals("2", properties.get("connection_pool_min_size"));
+ Assert.assertEquals("20", properties.get("connection_pool_max_size"));
+ Assert.assertEquals("3600000",
properties.get("connection_pool_max_life_time"));
+ Assert.assertEquals("10000",
properties.get("connection_pool_max_wait_time"));
+ Assert.assertEquals("true",
properties.get("connection_pool_keep_alive"));
+ }
+
+ @Test
+ public void testJdbcResourceReplayWithModifiedAfterSetDefaultProperties()
throws DdlException {
+ JdbcResource jdbcResource = new JdbcResource("jdbc_resource_pg_14",
jdbcProperties);
+
+ // Replay the resource creation to simulate the edit log replay
+ resourceMgr.replayCreateResource(jdbcResource);
+
+ // Retrieve the replayed resource
+ Resource replayedResource =
resourceMgr.getResource("jdbc_resource_pg_14");
+ Map<String, String> newProperties = Maps.newHashMap();
+ newProperties.put(JdbcResource.CONNECTION_POOL_MIN_SIZE, "2");
+ replayedResource.modifyProperties(newProperties);
+ Map<String, String> properties =
replayedResource.getCopiedProperties();
+ Assert.assertEquals("2", properties.get("connection_pool_min_size"));
+ resourceMgr.replayCreateResource(replayedResource);
+ Resource replayedResource2 =
resourceMgr.getResource("jdbc_resource_pg_14");
+ Map<String, String> properties2 =
replayedResource2.getCopiedProperties();
+ Assert.assertEquals("2", properties2.get("connection_pool_min_size"));
+ }
+
@Test
public void testHandleJdbcUrlForMySql() throws DdlException {
String inputUrl = "jdbc:mysql://127.0.0.1:3306/test";
@@ -36,7 +177,7 @@ public class JdbcResourceTest {
@Test
public void testHandleJdbcUrlForSqlServerWithoutParams() throws
DdlException {
- String inputUrl =
"jdbc:sqlserver://43.129.237.12:1433;databaseName=doris_test";
+ String inputUrl =
"jdbc:sqlserver://127.0.0.1:1433;databaseName=doris_test";
String resultUrl = JdbcResource.handleJdbcUrl(inputUrl);
// Ensure that the result URL for SQL Server doesn't have '?' or '&'
@@ -49,7 +190,8 @@ public class JdbcResourceTest {
@Test
public void testHandleJdbcUrlForSqlServerWithParams() throws DdlException {
- String inputUrl =
"jdbc:sqlserver://43.129.237.12:1433;encrypt=false;databaseName=doris_test;trustServerCertificate=false";
+ String inputUrl
+ =
"jdbc:sqlserver://127.0.0.1:1433;encrypt=false;databaseName=doris_test;trustServerCertificate=false";
String resultUrl = JdbcResource.handleJdbcUrl(inputUrl);
// Ensure that the result URL for SQL Server doesn't have '?' or '&'
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
index 0f2977a9886..8394daf0682 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
@@ -20,10 +20,12 @@ package org.apache.doris.datasource.jdbc;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.datasource.CatalogFactory;
+import com.google.common.collect.Maps;
import org.junit.Assert;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.Before;
+import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
@@ -31,32 +33,54 @@ import java.util.Map;
public class JdbcExternalCatalogTest {
private JdbcExternalCatalog jdbcExternalCatalog;
- @BeforeEach
+ @Before
public void setUp() throws DdlException {
FeConstants.runningUnitTest = true;
Map<String, String> properties = new HashMap<>();
+ properties.put("type", "jdbc");
properties.put(JdbcResource.DRIVER_URL, "ojdbc8.jar");
properties.put(JdbcResource.JDBC_URL,
"jdbc:oracle:thin:@127.0.0.1:1521:XE");
properties.put(JdbcResource.DRIVER_CLASS,
"oracle.jdbc.driver.OracleDriver");
- jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog",
"testResource", properties, "testComment");
+ jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog", null,
properties, "testComment");
}
@Test
- public void setDefaultPropsWhenCreatingTest() {
+ public void replayJdbcCatalogTest() throws DdlException {
+
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.CONNECTION_POOL_MIN_SIZE,
"1");
+ JdbcExternalCatalog replayJdbcCatalog = (JdbcExternalCatalog)
CatalogFactory.createFromLog(
+ jdbcExternalCatalog.constructEditLog());
+ Map<String, String> properties = replayJdbcCatalog.getProperties();
+ Assert.assertEquals("1", properties.get("connection_pool_min_size"));
+ Map<String, String> newProperties = Maps.newHashMap();
+ newProperties.put(JdbcResource.CONNECTION_POOL_MIN_SIZE, "2");
+
jdbcExternalCatalog.getCatalogProperty().modifyCatalogProps(newProperties);
+ jdbcExternalCatalog.notifyPropertiesUpdated(newProperties);
+ JdbcExternalCatalog replayJdbcCatalog2 = (JdbcExternalCatalog)
CatalogFactory.createFromLog(
+ jdbcExternalCatalog.constructEditLog());
+ Map<String, String> properties2 = replayJdbcCatalog2.getProperties();
+ Assert.assertEquals("2", properties2.get("connection_pool_min_size"));
+ }
+
+ @Test
+ public void checkPropertiesTest() {
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.ONLY_SPECIFIED_DATABASE,
"1");
- Exception exception1 = Assert.assertThrows(DdlException.class, () ->
jdbcExternalCatalog.setDefaultPropsWhenCreating(false));
- Assert.assertEquals("errCode = 2, detailMessage =
only_specified_database must be true or false", exception1.getMessage());
+ Exception exception1 = Assert.assertThrows(DdlException.class, () ->
jdbcExternalCatalog.checkProperties());
+ Assert.assertEquals("errCode = 2, detailMessage =
only_specified_database must be true or false",
+ exception1.getMessage());
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.ONLY_SPECIFIED_DATABASE,
"true");
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.LOWER_CASE_TABLE_NAMES,
"1");
- Exception exception2 = Assert.assertThrows(DdlException.class, () ->
jdbcExternalCatalog.setDefaultPropsWhenCreating(false));
- Assert.assertEquals("errCode = 2, detailMessage =
lower_case_table_names must be true or false", exception2.getMessage());
+ Exception exception2 = Assert.assertThrows(DdlException.class, () ->
jdbcExternalCatalog.checkProperties());
+ Assert.assertEquals("errCode = 2, detailMessage =
lower_case_table_names must be true or false",
+ exception2.getMessage());
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.ONLY_SPECIFIED_DATABASE,
"false");
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.LOWER_CASE_TABLE_NAMES,
"false");
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.INCLUDE_DATABASE_LIST,
"db1,db2");
- DdlException exceptione3 = Assert.assertThrows(DdlException.class, ()
-> jdbcExternalCatalog.setDefaultPropsWhenCreating(false));
- Assert.assertEquals("errCode = 2, detailMessage =
include_database_list and exclude_database_list can not be set when
only_specified_database is false", exceptione3.getMessage());
+ DdlException exceptione3 = Assert.assertThrows(DdlException.class, ()
-> jdbcExternalCatalog.checkProperties());
+ Assert.assertEquals(
+ "errCode = 2, detailMessage = include_database_list and
exclude_database_list cannot be set when only_specified_database is false",
+ exceptione3.getMessage());
}
}
diff --git a/fe/pom.xml b/fe/pom.xml
index fa994bcadcb..695230bde6b 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -251,7 +251,7 @@ under the License.
<commons-io.version>2.7</commons-io.version>
<json-simple.version>1.1.1</json-simple.version>
<junit.version>5.8.2</junit.version>
- <druid.version>1.2.5</druid.version>
+ <druid.version>1.2.20</druid.version>
<clickhouse.version>0.4.6</clickhouse.version>
<thrift.version>0.16.0</thrift.version>
<tomcat-embed-core.version>8.5.86</tomcat-embed-core.version>
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 37adc3d4274..6507d04c2dd 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -318,7 +318,12 @@ struct TJdbcTable {
6: optional string jdbc_resource_name
7: optional string jdbc_driver_class
8: optional string jdbc_driver_checksum
-
+ 9: optional i32 connection_pool_min_size
+ 10: optional i32 connection_pool_max_size
+ 11: optional i32 connection_pool_max_wait_time
+ 12: optional i32 connection_pool_max_life_time
+ 13: optional bool connection_pool_keep_alive
+ 14: optional i64 catalog_id
}
struct TMCTable {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 25a26b575b6..fccf90e173d 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -421,6 +421,14 @@ struct TJdbcExecutorCtorParams {
8: optional string driver_path
9: optional TOdbcTableType table_type
+
+ 10: optional i32 connection_pool_min_size
+ 11: optional i32 connection_pool_max_size
+ 12: optional i32 connection_pool_max_wait_time
+ 13: optional i32 connection_pool_max_life_time
+ 14: optional i32 connection_pool_cache_clear_time
+ 15: optional bool connection_pool_keep_alive
+ 16: optional i64 catalog_id
}
struct TJavaUdfExecutorCtorParams {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]