This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7e7fcf1b337 [improvement](jdbc catalog) Optimize connection pool 
parameter settings (#31177)
7e7fcf1b337 is described below

commit 7e7fcf1b3373026c622c590bccd7edb5533b56d1
Author: zy-kkk <[email protected]>
AuthorDate: Wed Feb 21 18:00:05 2024 +0800

    [improvement](jdbc catalog) Optimize connection pool parameter settings 
(#31177)
    
    pick from (#30588)
---
 be/src/common/config.cpp                           |   3 +
 be/src/common/config.h                             |   3 +
 be/src/runtime/descriptors.cpp                     |  27 +++-
 be/src/runtime/descriptors.h                       |  12 ++
 be/src/vec/exec/scan/new_jdbc_scanner.cpp          |   6 +
 be/src/vec/exec/vjdbc_connector.cpp                |   8 +
 be/src/vec/exec/vjdbc_connector.h                  |   6 +
 be/src/vec/sink/vjdbc_table_sink.cpp               |   8 +
 conf/be.conf                                       |   4 +-
 .../java/org/apache/doris/jdbc/JdbcDataSource.java |  51 ++++++-
 .../apache/doris/jdbc/JdbcDataSourceConfig.java    | 170 +++++++++++++++++++++
 .../java/org/apache/doris/jdbc/JdbcExecutor.java   | 116 +++++++-------
 .../main/java/org/apache/doris/common/Config.java  |  20 ---
 .../org/apache/doris/catalog/JdbcResource.java     | 100 ++++++++++--
 .../java/org/apache/doris/catalog/JdbcTable.java   |  54 +++++++
 .../java/org/apache/doris/catalog/Resource.java    |   2 +
 .../java/org/apache/doris/catalog/ResourceMgr.java |   1 +
 .../doris/catalog/external/JdbcExternalTable.java  |   6 +
 .../doris/datasource/jdbc/JdbcExternalCatalog.java |  66 +++++---
 .../doris/datasource/jdbc/client/JdbcClient.java   |  37 ++---
 .../datasource/jdbc/client/JdbcClientConfig.java   |  72 +++++++++
 .../doris/planner/external/jdbc/JdbcTableSink.java |  28 +---
 .../org/apache/doris/catalog/JdbcResourceTest.java | 146 +++++++++++++++++-
 .../datasource/jdbc/JdbcExternalCatalogTest.java   |  46 ++++--
 fe/pom.xml                                         |   2 +-
 gensrc/thrift/Descriptors.thrift                   |   7 +-
 gensrc/thrift/Types.thrift                         |   8 +
 27 files changed, 832 insertions(+), 177 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f881edaaba3..728f0e005d9 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -763,6 +763,9 @@ DEFINE_mInt32(segment_compression_threshold_kb, "256");
 // The connection timeout when connecting to external table such as odbc table.
 DEFINE_mInt32(external_table_connect_timeout_sec, "30");
 
+// Time to clean up useless JDBC connection pool cache
+DEFINE_mInt32(jdbc_connection_pool_cache_clear_time_sec, "28800");
+
 // Global bitmap cache capacity for aggregation cache, size in bytes
 DEFINE_Int64(delete_bitmap_agg_cache_capacity, "104857600");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 822832cfeca..29d6d56fb80 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -817,6 +817,9 @@ DECLARE_mInt32(segment_compression_threshold_kb);
 // The connection timeout when connecting to external table such as odbc table.
 DECLARE_mInt32(external_table_connect_timeout_sec);
 
+// Time to clean up useless JDBC connection pool cache
+DECLARE_mInt32(jdbc_connection_pool_cache_clear_time_sec);
+
 // Global bitmap cache capacity for aggregation cache, size in bytes
 DECLARE_Int64(delete_bitmap_agg_cache_capacity);
 
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index f8125588ae5..a73f05f326b 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -251,6 +251,7 @@ std::string ODBCTableDescriptor::debug_string() const {
 
 JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc)
         : TableDescriptor(tdesc),
+          _jdbc_catalog_id(tdesc.jdbcTable.catalog_id),
           _jdbc_resource_name(tdesc.jdbcTable.jdbc_resource_name),
           _jdbc_driver_url(tdesc.jdbcTable.jdbc_driver_url),
           _jdbc_driver_class(tdesc.jdbcTable.jdbc_driver_class),
@@ -258,17 +259,27 @@ JdbcTableDescriptor::JdbcTableDescriptor(const 
TTableDescriptor& tdesc)
           _jdbc_url(tdesc.jdbcTable.jdbc_url),
           _jdbc_table_name(tdesc.jdbcTable.jdbc_table_name),
           _jdbc_user(tdesc.jdbcTable.jdbc_user),
-          _jdbc_passwd(tdesc.jdbcTable.jdbc_password) {}
+          _jdbc_passwd(tdesc.jdbcTable.jdbc_password),
+          _connection_pool_min_size(tdesc.jdbcTable.connection_pool_min_size),
+          _connection_pool_max_size(tdesc.jdbcTable.connection_pool_max_size),
+          
_connection_pool_max_wait_time(tdesc.jdbcTable.connection_pool_max_wait_time),
+          
_connection_pool_max_life_time(tdesc.jdbcTable.connection_pool_max_life_time),
+          
_connection_pool_keep_alive(tdesc.jdbcTable.connection_pool_keep_alive) {}
 
 std::string JdbcTableDescriptor::debug_string() const {
     fmt::memory_buffer buf;
-    fmt::format_to(buf,
-                   "JDBCTable({} ,_jdbc_resource_name={} ,_jdbc_driver_url={} "
-                   ",_jdbc_driver_class={} ,_jdbc_driver_checksum={} 
,_jdbc_url={} "
-                   ",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={})",
-                   TableDescriptor::debug_string(), _jdbc_resource_name, 
_jdbc_driver_url,
-                   _jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url, 
_jdbc_table_name,
-                   _jdbc_user, _jdbc_passwd);
+    fmt::format_to(
+            buf,
+            "JDBCTable({} ,_jdbc_catalog_id = {}, _jdbc_resource_name={} 
,_jdbc_driver_url={} "
+            ",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} "
+            ",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={} 
,_connection_pool_min_size={} "
+            ",_connection_pool_max_size={} ,_connection_pool_max_wait_time={} "
+            ",_connection_pool_max_life_time={} 
,_connection_pool_keep_alive={})",
+            TableDescriptor::debug_string(), _jdbc_catalog_id, 
_jdbc_resource_name,
+            _jdbc_driver_url, _jdbc_driver_class, _jdbc_driver_checksum, 
_jdbc_url,
+            _jdbc_table_name, _jdbc_user, _jdbc_passwd, 
_connection_pool_min_size,
+            _connection_pool_max_size, _connection_pool_max_wait_time,
+            _connection_pool_max_life_time, _connection_pool_keep_alive);
     return fmt::to_string(buf);
 }
 
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 6808523ab58..015e0e60607 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -307,6 +307,7 @@ class JdbcTableDescriptor : public TableDescriptor {
 public:
     JdbcTableDescriptor(const TTableDescriptor& tdesc);
     std::string debug_string() const override;
+    int64_t jdbc_catalog_id() const { return _jdbc_catalog_id; }
     const std::string& jdbc_resource_name() const { return 
_jdbc_resource_name; }
     const std::string& jdbc_driver_url() const { return _jdbc_driver_url; }
     const std::string& jdbc_driver_class() const { return _jdbc_driver_class; }
@@ -315,8 +316,14 @@ public:
     const std::string& jdbc_table_name() const { return _jdbc_table_name; }
     const std::string& jdbc_user() const { return _jdbc_user; }
     const std::string& jdbc_passwd() const { return _jdbc_passwd; }
+    int32_t connection_pool_min_size() const { return 
_connection_pool_min_size; }
+    int32_t connection_pool_max_size() const { return 
_connection_pool_max_size; }
+    int32_t connection_pool_max_wait_time() const { return 
_connection_pool_max_wait_time; }
+    int32_t connection_pool_max_life_time() const { return 
_connection_pool_max_life_time; }
+    bool connection_pool_keep_alive() const { return 
_connection_pool_keep_alive; }
 
 private:
+    int64_t _jdbc_catalog_id;
     std::string _jdbc_resource_name;
     std::string _jdbc_driver_url;
     std::string _jdbc_driver_class;
@@ -325,6 +332,11 @@ private:
     std::string _jdbc_table_name;
     std::string _jdbc_user;
     std::string _jdbc_passwd;
+    int32_t _connection_pool_min_size;
+    int32_t _connection_pool_max_size;
+    int32_t _connection_pool_max_wait_time;
+    int32_t _connection_pool_max_life_time;
+    bool _connection_pool_keep_alive;
 };
 
 class TupleDescriptor {
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp 
b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index 3d71aeb4f72..91ea57f0bbf 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -82,6 +82,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const 
VExprContextSPtrs& con
     if (jdbc_table == nullptr) {
         return Status::InternalError("jdbc table pointer is NULL of 
VJdbcScanNode::prepare.");
     }
+    _jdbc_param.catalog_id = jdbc_table->jdbc_catalog_id();
     _jdbc_param.driver_class = jdbc_table->jdbc_driver_class();
     _jdbc_param.driver_path = jdbc_table->jdbc_driver_url();
     _jdbc_param.resource_name = jdbc_table->jdbc_resource_name();
@@ -92,6 +93,11 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const 
VExprContextSPtrs& con
     _jdbc_param.tuple_desc = _tuple_desc;
     _jdbc_param.query_string = std::move(_query_string);
     _jdbc_param.table_type = _table_type;
+    _jdbc_param.connection_pool_min_size = 
jdbc_table->connection_pool_min_size();
+    _jdbc_param.connection_pool_max_size = 
jdbc_table->connection_pool_max_size();
+    _jdbc_param.connection_pool_max_life_time = 
jdbc_table->connection_pool_max_life_time();
+    _jdbc_param.connection_pool_max_wait_time = 
jdbc_table->connection_pool_max_wait_time();
+    _jdbc_param.connection_pool_keep_alive = 
jdbc_table->connection_pool_keep_alive();
 
     get_parent()->_scanner_profile->add_info_string("JdbcDriverClass", 
_jdbc_param.driver_class);
     get_parent()->_scanner_profile->add_info_string("JdbcDriverUrl", 
_jdbc_param.driver_path);
diff --git a/be/src/vec/exec/vjdbc_connector.cpp 
b/be/src/vec/exec/vjdbc_connector.cpp
index 20fcfd196b0..666a06531d2 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -150,6 +150,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
 
         TJdbcExecutorCtorParams ctor_params;
         ctor_params.__set_statement(_sql_str);
+        ctor_params.__set_catalog_id(_conn_param.catalog_id);
         ctor_params.__set_jdbc_url(_conn_param.jdbc_url);
         ctor_params.__set_jdbc_user(_conn_param.user);
         ctor_params.__set_jdbc_password(_conn_param.passwd);
@@ -158,6 +159,13 @@ Status JdbcConnector::open(RuntimeState* state, bool read) 
{
         ctor_params.__set_batch_size(read ? state->batch_size() : 0);
         ctor_params.__set_op(read ? TJdbcOperation::READ : 
TJdbcOperation::WRITE);
         ctor_params.__set_table_type(_conn_param.table_type);
+        
ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size);
+        
ctor_params.__set_connection_pool_max_size(_conn_param.connection_pool_max_size);
+        
ctor_params.__set_connection_pool_max_wait_time(_conn_param.connection_pool_max_wait_time);
+        
ctor_params.__set_connection_pool_max_life_time(_conn_param.connection_pool_max_life_time);
+        ctor_params.__set_connection_pool_cache_clear_time(
+                config::jdbc_connection_pool_cache_clear_time_sec);
+        
ctor_params.__set_connection_pool_keep_alive(_conn_param.connection_pool_keep_alive);
 
         jbyteArray ctor_params_bytes;
         // Pushed frame will be popped when jni_frame goes out-of-scope.
diff --git a/be/src/vec/exec/vjdbc_connector.h 
b/be/src/vec/exec/vjdbc_connector.h
index 361705ac4c2..55b39f0aec2 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -43,6 +43,7 @@ class IColumn;
 class VExprContext;
 
 struct JdbcConnectorParam {
+    int64_t catalog_id;
     std::string driver_path;
     std::string driver_class;
     std::string resource_name;
@@ -52,6 +53,11 @@ struct JdbcConnectorParam {
     std::string passwd;
     std::string query_string;
     TOdbcTableType::type table_type;
+    int32_t connection_pool_min_size;
+    int32_t connection_pool_max_size;
+    int32_t connection_pool_max_wait_time;
+    int32_t connection_pool_max_life_time;
+    bool connection_pool_keep_alive;
 
     const TupleDescriptor* tuple_desc;
 };
diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp 
b/be/src/vec/sink/vjdbc_table_sink.cpp
index e2eab4d7b0c..2663dc236a1 100644
--- a/be/src/vec/sink/vjdbc_table_sink.cpp
+++ b/be/src/vec/sink/vjdbc_table_sink.cpp
@@ -47,6 +47,7 @@ Status VJdbcTableSink::init(const TDataSink& t_sink) {
     RETURN_IF_ERROR(VTableSink::init(t_sink));
     const TJdbcTableSink& t_jdbc_sink = t_sink.jdbc_table_sink;
 
+    _jdbc_param.catalog_id = t_jdbc_sink.jdbc_table.catalog_id;
     _jdbc_param.jdbc_url = t_jdbc_sink.jdbc_table.jdbc_url;
     _jdbc_param.user = t_jdbc_sink.jdbc_table.jdbc_user;
     _jdbc_param.passwd = t_jdbc_sink.jdbc_table.jdbc_password;
@@ -56,6 +57,13 @@ Status VJdbcTableSink::init(const TDataSink& t_sink) {
     _jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name;
     _jdbc_param.table_type = t_jdbc_sink.table_type;
     _jdbc_param.query_string = t_jdbc_sink.insert_sql;
+    _jdbc_param.connection_pool_min_size = 
t_jdbc_sink.jdbc_table.connection_pool_min_size;
+    _jdbc_param.connection_pool_max_size = 
t_jdbc_sink.jdbc_table.connection_pool_max_size;
+    _jdbc_param.connection_pool_max_wait_time =
+            t_jdbc_sink.jdbc_table.connection_pool_max_wait_time;
+    _jdbc_param.connection_pool_max_life_time =
+            t_jdbc_sink.jdbc_table.connection_pool_max_life_time;
+    _jdbc_param.connection_pool_keep_alive = 
t_jdbc_sink.jdbc_table.connection_pool_keep_alive;
     _table_name = t_jdbc_sink.jdbc_table.jdbc_table_name;
     _use_transaction = t_jdbc_sink.use_transaction;
 
diff --git a/conf/be.conf b/conf/be.conf
index 9c877901d59..15121111646 100644
--- a/conf/be.conf
+++ b/conf/be.conf
@@ -19,10 +19,10 @@ CUR_DATE=`date +%Y%m%d-%H%M%S`
 
 PPROF_TMPDIR="$DORIS_HOME/log/"
 
-JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.java.command=DorisBE 
-XX:-CriticalJNINatives -DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100 
-DJDBC_MAX_IDLE_TIME=300000 -DJDBC_MAX_WAIT_TIME=5000 -DJDBC_KEEP_ALIVE=FALSE"
+JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.java.command=DorisBE 
-XX:-CriticalJNINatives"
 
 # For jdk 9+, this JAVA_OPTS will be used as default JVM options
-JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.java.command=DorisBE 
-XX:-CriticalJNINatives -DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100 
-DJDBC_MAX_IDLE_TIME=300000 -DJDBC_MAX_WAIT_TIME=5000 -DJDBC_KEEP_ALIVE=FALSE"
+JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.java.command=DorisBE 
-XX:-CriticalJNINatives"
 
 # since 1.2, the JAVA_HOME need to be set to run BE process.
 # JAVA_HOME=/path/to/jdk/
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java
index 6a6a022d29c..3c8ac38cf7d 100644
--- 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java
@@ -18,32 +18,77 @@
 package org.apache.doris.jdbc;
 
 import com.alibaba.druid.pool.DruidDataSource;
+import org.apache.log4j.Logger;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 public class JdbcDataSource {
+    private static final Logger LOG = Logger.getLogger(JdbcDataSource.class);
     private static final JdbcDataSource jdbcDataSource = new JdbcDataSource();
     private final Map<String, DruidDataSource> sourcesMap = new 
ConcurrentHashMap<>();
+    private final Map<String, Long> lastAccessTimeMap = new 
ConcurrentHashMap<>();
+    private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+    private long cleanupInterval = 8 * 60 * 60 * 1000; // 8 hours
+    private ScheduledFuture<?> cleanupTask = null;
+
+    private JdbcDataSource() {
+        startCleanupTask();
+    }
 
     public static JdbcDataSource getDataSource() {
         return jdbcDataSource;
     }
 
     public DruidDataSource getSource(String cacheKey) {
+        lastAccessTimeMap.put(cacheKey, System.currentTimeMillis());
         return sourcesMap.get(cacheKey);
     }
 
     public void putSource(String cacheKey, DruidDataSource ds) {
         sourcesMap.put(cacheKey, ds);
+        lastAccessTimeMap.put(cacheKey, System.currentTimeMillis());
     }
 
     public Map<String, DruidDataSource> getSourcesMap() {
         return sourcesMap;
     }
 
-    public String createCacheKey(String jdbcUrl, String jdbcUser, String 
jdbcPassword, String jdbcDriverUrl,
-            String jdbcDriverClass) {
-        return jdbcUrl + jdbcUser + jdbcPassword + jdbcDriverUrl + 
jdbcDriverClass;
+    public void setCleanupInterval(long interval) {
+        this.cleanupInterval = interval * 1000L;
+        restartCleanupTask();
+    }
+
+    private synchronized void restartCleanupTask() {
+        if (cleanupTask != null && !cleanupTask.isCancelled()) {
+            cleanupTask.cancel(false);
+        }
+        cleanupTask = executor.scheduleAtFixedRate(() -> {
+            try {
+                long now = System.currentTimeMillis();
+                lastAccessTimeMap.forEach((key, lastAccessTime) -> {
+                    if (now - lastAccessTime > cleanupInterval) {
+                        DruidDataSource ds = sourcesMap.remove(key);
+                        if (ds != null) {
+                            ds.close();
+                        }
+                        lastAccessTimeMap.remove(key);
+                        LOG.info("remove jdbc data source: " + 
key.split("jdbc")[0]);
+                    }
+                });
+            } catch (Exception e) {
+                LOG.error("failed to cleanup jdbc data source", e);
+            }
+        }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
+    }
+
+    private void startCleanupTask() {
+        if (cleanupTask == null || cleanupTask.isCancelled()) {
+            restartCleanupTask();
+        }
     }
 }
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
new file mode 100644
index 00000000000..dcf576986fe
--- /dev/null
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.thrift.TJdbcOperation;
+import org.apache.doris.thrift.TOdbcTableType;
+
+public class JdbcDataSourceConfig {
+    private Long catalogId;
+    private String jdbcUrl;
+    private String jdbcUser;
+    private String jdbcPassword;
+    private String jdbcDriverUrl;
+    private String jdbcDriverClass;
+    private int batchSize;
+    private TJdbcOperation op;
+    private TOdbcTableType tableType;
+    private int connectionPoolMinSize;
+    private int connectionPoolMaxSize;
+    private int connectionPoolMaxWaitTime;
+    private int connectionPoolMaxLifeTime;
+    private boolean connectionPoolKeepAlive;
+
+    public String createCacheKey() {
+        return catalogId + jdbcUrl + jdbcUser + jdbcPassword + jdbcDriverUrl + 
jdbcDriverClass
+                + connectionPoolMinSize + connectionPoolMaxSize + 
connectionPoolMaxLifeTime + connectionPoolMaxWaitTime
+                + connectionPoolKeepAlive;
+    }
+
+    public long getCatalogId() {
+        return catalogId;
+    }
+
+    public JdbcDataSourceConfig setCatalogId(long catalogId) {
+        this.catalogId = catalogId;
+        return this;
+    }
+
+    public String getJdbcUrl() {
+        return jdbcUrl;
+    }
+
+    public JdbcDataSourceConfig setJdbcUrl(String jdbcUrl) {
+        this.jdbcUrl = jdbcUrl;
+        return this;
+    }
+
+    public String getJdbcUser() {
+        return jdbcUser;
+    }
+
+    public JdbcDataSourceConfig setJdbcUser(String jdbcUser) {
+        this.jdbcUser = jdbcUser;
+        return this;
+    }
+
+    public String getJdbcPassword() {
+        return jdbcPassword;
+    }
+
+    public JdbcDataSourceConfig setJdbcPassword(String jdbcPassword) {
+        this.jdbcPassword = jdbcPassword;
+        return this;
+    }
+
+    public String getJdbcDriverUrl() {
+        return jdbcDriverUrl;
+    }
+
+    public JdbcDataSourceConfig setJdbcDriverUrl(String jdbcDriverUrl) {
+        this.jdbcDriverUrl = jdbcDriverUrl;
+        return this;
+    }
+
+    public String getJdbcDriverClass() {
+        return jdbcDriverClass;
+    }
+
+    public JdbcDataSourceConfig setJdbcDriverClass(String jdbcDriverClass) {
+        this.jdbcDriverClass = jdbcDriverClass;
+        return this;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public JdbcDataSourceConfig setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public TJdbcOperation getOp() {
+        return op;
+    }
+
+    public JdbcDataSourceConfig setOp(TJdbcOperation op) {
+        this.op = op;
+        return this;
+    }
+
+    public TOdbcTableType getTableType() {
+        return tableType;
+    }
+
+    public JdbcDataSourceConfig setTableType(TOdbcTableType tableType) {
+        this.tableType = tableType;
+        return this;
+    }
+
+    public int getConnectionPoolMinSize() {
+        return connectionPoolMinSize;
+    }
+
+    public JdbcDataSourceConfig setConnectionPoolMinSize(int 
connectionPoolMinSize) {
+        this.connectionPoolMinSize = connectionPoolMinSize;
+        return this;
+    }
+
+    public int getConnectionPoolMaxSize() {
+        return connectionPoolMaxSize;
+    }
+
+    public JdbcDataSourceConfig setConnectionPoolMaxSize(int 
connectionPoolMaxSize) {
+        this.connectionPoolMaxSize = connectionPoolMaxSize;
+        return this;
+    }
+
+    public int getConnectionPoolMaxWaitTime() {
+        return connectionPoolMaxWaitTime;
+    }
+
+    public JdbcDataSourceConfig setConnectionPoolMaxWaitTime(int 
connectionPoolMaxWaitTime) {
+        this.connectionPoolMaxWaitTime = connectionPoolMaxWaitTime;
+        return this;
+    }
+
+    public int getConnectionPoolMaxLifeTime() {
+        return connectionPoolMaxLifeTime;
+    }
+
+    public JdbcDataSourceConfig setConnectionPoolMaxLifeTime(int 
connectionPoolMaxLifeTime) {
+        this.connectionPoolMaxLifeTime = connectionPoolMaxLifeTime;
+        return this;
+    }
+
+    public boolean isConnectionPoolKeepAlive() {
+        return connectionPoolKeepAlive;
+    }
+
+    public JdbcDataSourceConfig setConnectionPoolKeepAlive(boolean 
connectionPoolKeepAlive) {
+        this.connectionPoolKeepAlive = connectionPoolKeepAlive;
+        return this;
+    }
+}
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
index 2cd878dfc0d..dd6307ffe1a 100644
--- 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
@@ -85,14 +85,9 @@ public class JdbcExecutor {
     private int curBlockRows = 0;
     private static final byte[] emptyBytes = new byte[0];
     private DruidDataSource druidDataSource = null;
-    private byte[] druidDataSourceLock = new byte[0];
-    private int minPoolSize;
-    private int maxPoolSize;
-    private int minIdleSize;
-    private int maxIdleTime;
-    private int maxWaitTime;
-    private boolean isKeepAlive;
+    private final byte[] druidDataSourceLock = new byte[0];
     private TOdbcTableType tableType;
+    private JdbcDataSourceConfig config;
 
     public JdbcExecutor(byte[] thriftParams) throws Exception {
         TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -103,20 +98,23 @@ public class JdbcExecutor {
             throw new InternalException(e.getMessage());
         }
         tableType = request.table_type;
-        minPoolSize = Integer.valueOf(System.getProperty("JDBC_MIN_POOL", 
"1"));
-        maxPoolSize = Integer.valueOf(System.getProperty("JDBC_MAX_POOL", 
"100"));
-        maxIdleTime = Integer.valueOf(System.getProperty("JDBC_MAX_IDLE_TIME", 
"300000"));
-        maxWaitTime = Integer.valueOf(System.getProperty("JDBC_MAX_WAIT_TIME", 
"5000"));
-        isKeepAlive = Boolean.valueOf(System.getProperty("JDBC_KEEP_ALIVE", 
"false"));
-        minIdleSize = minPoolSize > 0 ? 1 : 0;
-        LOG.info("JdbcExecutor set minPoolSize = " + minPoolSize
-                + ", maxPoolSize = " + maxPoolSize
-                + ", maxIdleTime = " + maxIdleTime
-                + ", maxWaitTime = " + maxWaitTime
-                + ", minIdleSize = " + minIdleSize
-                + ", isKeepAlive = " + isKeepAlive);
-        init(request.driver_path, request.statement, request.batch_size, 
request.jdbc_driver_class,
-                request.jdbc_url, request.jdbc_user, request.jdbc_password, 
request.op, request.table_type);
+        this.config = new JdbcDataSourceConfig()
+                .setCatalogId(request.catalog_id)
+                .setJdbcUser(request.jdbc_user)
+                .setJdbcPassword(request.jdbc_password)
+                .setJdbcUrl(request.jdbc_url)
+                .setJdbcDriverUrl(request.driver_path)
+                .setJdbcDriverClass(request.jdbc_driver_class)
+                .setBatchSize(request.batch_size)
+                .setOp(request.op)
+                .setTableType(request.table_type)
+                .setConnectionPoolMinSize(request.connection_pool_min_size)
+                .setConnectionPoolMaxSize(request.connection_pool_max_size)
+                
.setConnectionPoolMaxWaitTime(request.connection_pool_max_wait_time)
+                
.setConnectionPoolMaxLifeTime(request.connection_pool_max_life_time)
+                
.setConnectionPoolKeepAlive(request.connection_pool_keep_alive);
+        
JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time);
+        init(config, request.statement);
     }
 
     public boolean isNebula() {
@@ -155,11 +153,11 @@ public class JdbcExecutor {
             }
         }
 
-        if (minIdleSize == 0) {
+        if (config.getConnectionPoolMinSize() == 0) {
             // Close and remove the datasource if necessary
             if (druidDataSource != null) {
                 druidDataSource.close();
-                JdbcDataSource.getDataSource().getSourcesMap().clear();
+                
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
                 druidDataSource = null;
             }
         }
@@ -441,19 +439,18 @@ public class JdbcExecutor {
         }
     }
 
-    private void init(String driverUrl, String sql, int batchSize, String 
driverClass, String jdbcUrl, String jdbcUser,
-            String jdbcPassword, TJdbcOperation op, TOdbcTableType tableType) 
throws UdfRuntimeException {
-        String druidDataSourceKey = 
JdbcDataSource.getDataSource().createCacheKey(jdbcUrl, jdbcUser, jdbcPassword,
-                driverUrl, driverClass);
+    private void init(JdbcDataSourceConfig config, String sql) throws 
UdfRuntimeException {
+        String druidDataSourceKey = config.createCacheKey();
         try {
             if (isNebula()) {
-                batchSizeNum = batchSize;
-                Class.forName(driverClass);
-                conn = DriverManager.getConnection(jdbcUrl, jdbcUser, 
jdbcPassword);
+                batchSizeNum = config.getBatchSize();
+                Class.forName(config.getJdbcDriverClass());
+                conn = 
DriverManager.getConnection(config.getJdbcDriverClass(), config.getJdbcUser(),
+                                                   config.getJdbcPassword());
                 stmt = conn.prepareStatement(sql);
             } else {
                 ClassLoader parent = getClass().getClassLoader();
-                ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, 
parent);
+                ClassLoader classLoader = 
UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent);
                 druidDataSource = 
JdbcDataSource.getDataSource().getSource(druidDataSourceKey);
                 if (druidDataSource == null) {
                     synchronized (druidDataSourceLock) {
@@ -462,53 +459,61 @@ public class JdbcExecutor {
                             long start = System.currentTimeMillis();
                             DruidDataSource ds = new DruidDataSource();
                             ds.setDriverClassLoader(classLoader);
-                            ds.setDriverClassName(driverClass);
-                            ds.setUrl(jdbcUrl);
-                            ds.setUsername(jdbcUser);
-                            ds.setPassword(jdbcPassword);
-                            ds.setMinIdle(minIdleSize);
-                            ds.setInitialSize(minPoolSize);
-                            ds.setMaxActive(maxPoolSize);
-                            ds.setMaxWait(maxWaitTime);
+                            ds.setDriverClassName(config.getJdbcDriverClass());
+                            ds.setUrl(config.getJdbcUrl());
+                            ds.setUsername(config.getJdbcUser());
+                            ds.setPassword(config.getJdbcPassword());
+                            ds.setMinIdle(config.getConnectionPoolMinSize()); 
// default 1
+                            
ds.setInitialSize(config.getConnectionPoolMinSize()); // default 1
+                            
ds.setMaxActive(config.getConnectionPoolMaxSize()); // default 10
+                            
ds.setMaxWait(config.getConnectionPoolMaxWaitTime()); // default 5000
                             ds.setTestWhileIdle(true);
                             ds.setTestOnBorrow(false);
-                            setValidationQuery(ds, tableType);
-                            ds.setTimeBetweenEvictionRunsMillis(maxIdleTime / 
5);
-                            ds.setMinEvictableIdleTimeMillis(maxIdleTime);
-                            ds.setKeepAlive(isKeepAlive);
+                            setValidationQuery(ds, config.getTableType());
+                            // default 3 min
+                            
ds.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime() / 
10L);
+                            // default 15 min
+                            
ds.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime() / 2L);
+                            // default 30 min
+                            
ds.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime());
+                            
ds.setKeepAlive(config.isConnectionPoolKeepAlive());
+                            // default 6 min
+                            
ds.setKeepAliveBetweenTimeMillis(config.getConnectionPoolMaxLifeTime() / 5L);
                             druidDataSource = ds;
-                            // here is a cache of datasource, which using the 
string(jdbcUrl + jdbcUser +
-                            // jdbcPassword) as key.
-                            // and the default datasource init = 1, min = 1, 
max = 100, if one of connection idle
-                            // time greater than 10 minutes. then connection 
will be retrieved.
                             
JdbcDataSource.getDataSource().putSource(druidDataSourceKey, ds);
-                            LOG.info("init datasource [" + (jdbcUrl + 
jdbcUser) + "] cost: " + (
+                            LOG.info("JdbcClient set"
+                                     + " ConnectionPoolMinSize = " + 
config.getConnectionPoolMinSize()
+                                     + ", ConnectionPoolMaxSize = " + 
config.getConnectionPoolMaxSize()
+                                     + ", ConnectionPoolMaxWaitTime = " + 
config.getConnectionPoolMaxWaitTime()
+                                     + ", ConnectionPoolMaxLifeTime = " + 
config.getConnectionPoolMaxLifeTime()
+                                     + ", ConnectionPoolKeepAlive = " + 
config.isConnectionPoolKeepAlive());
+                            LOG.info("init datasource [" + 
(config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + (
                                     System.currentTimeMillis() - start) + " 
ms");
                         }
                     }
                 }
-
                 long start = System.currentTimeMillis();
                 conn = druidDataSource.getConnection();
-                LOG.info("get connection [" + (jdbcUrl + jdbcUser) + "] cost: 
" + (System.currentTimeMillis() - start)
-                        + " ms");
-                if (op == TJdbcOperation.READ) {
+                LOG.info("get connection [" + (config.getJdbcUrl() + 
config.getJdbcUser()) + "] cost: " + (
+                        System.currentTimeMillis() - start)
+                         + " ms");
+                if (config.getOp() == TJdbcOperation.READ) {
                     conn.setAutoCommit(false);
                     Preconditions.checkArgument(sql != null);
                     stmt = conn.prepareStatement(sql, 
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
                     if (tableType == TOdbcTableType.MYSQL) {
                         stmt.setFetchSize(Integer.MIN_VALUE);
                     } else {
-                        stmt.setFetchSize(batchSize);
+                        stmt.setFetchSize(config.getBatchSize());
                     }
-                    batchSizeNum = batchSize;
+                    batchSizeNum = config.getBatchSize();
                 } else {
                     LOG.info("insert sql: " + sql);
                     preparedStatement = conn.prepareStatement(sql);
                 }
             }
         } catch (MalformedURLException e) {
-            throw new UdfRuntimeException("MalformedURLException to load class 
about " + driverUrl, e);
+            throw new UdfRuntimeException("MalformedURLException to load class 
about " + config.getJdbcDriverUrl(), e);
         } catch (SQLException e) {
             throw new UdfRuntimeException("Initialize datasource failed: ", e);
         } catch (FileNotFoundException e) {
@@ -2213,4 +2218,3 @@ public class JdbcExecutor {
         return i;
     }
 }
-
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index eacefe71549..7dda751e8f6 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -140,26 +140,6 @@ public class Config extends ConfigBase {
                     + "if the specified driver file path is not an absolute 
path, Doris will find jars from this path"})
     public static String jdbc_drivers_dir = System.getenv("DORIS_HOME") + 
"/jdbc_drivers";
 
-    @ConfField(description = {"JDBC Catalog FE 连接池的最小连接数",
-            "The minimum number of JDBC Catalog FE connection pool"})
-    public static int jdbc_min_pool_size = 1;
-
-    @ConfField(description = {"JDBC Catalog FE 连接池的最大连接数",
-            "The maximum number of JDBC Catalog FE connection pool"})
-    public static int jdbc_max_pool_size = 100;
-
-    @ConfField(description = {"JDBC Catalog FE 连接池的最大空闲连接时间",
-            "The maximum idle time of JDBC Catalog FE connection pool"})
-    public static int jdbc_max_idle_time = 300000;
-
-    @ConfField(description = {"JDBC Catalog FE 连接池的最大等待时间",
-            "The maximum wait time of JDBC Catalog FE connection pool"})
-    public static int jdbc_max_wait_time = 5000;
-
-    @ConfField(description = {"JDBC Catalog FE 连接池的 KeepAlive 策略",
-            "The keep alive strategy of JDBC Catalog FE connection pool"})
-    public static boolean jdbc_keep_alive = false;
-
     @ConfField(mutable = true, masterOnly = true, description = {"broker load 
时,单个节点上 load 执行计划的默认并行度",
             "The default parallelism of the load execution plan on a single 
node when the broker load is submitted"})
     public static int default_load_parallelism = 1;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
index 999b5c4d34f..c5b66e13f48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
@@ -40,12 +40,14 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.Map;
 
 
 /**
  * External JDBC Catalog resource for external table query.
- *
+ * <p>
  * create external resource jdbc_mysql
  * properties (
  * "type"="jdbc",
@@ -55,7 +57,7 @@ import java.util.Map;
  * "driver_url"="http://127.0.0.1:8888/mysql-connector-java-5.1.47.jar";,
  * "driver_class"="com.mysql.jdbc.Driver"
  * );
- *
+ * <p>
  * DROP RESOURCE "jdbc_mysql";
  */
 public class JdbcResource extends Resource {
@@ -94,7 +96,13 @@ public class JdbcResource extends Resource {
     public static final String TYPE = "type";
     public static final String ONLY_SPECIFIED_DATABASE = 
"only_specified_database";
     public static final String LOWER_CASE_TABLE_NAMES = 
"lower_case_table_names";
+    public static final String CONNECTION_POOL_MIN_SIZE = 
"connection_pool_min_size";
+    public static final String CONNECTION_POOL_MAX_SIZE = 
"connection_pool_max_size";
+    public static final String CONNECTION_POOL_MAX_WAIT_TIME = 
"connection_pool_max_wait_time";
+    public static final String CONNECTION_POOL_MAX_LIFE_TIME = 
"connection_pool_max_life_time";
+    public static final String CONNECTION_POOL_KEEP_ALIVE = 
"connection_pool_keep_alive";
     public static final String CHECK_SUM = "checksum";
+    public static final String CREATE_TIME = "create_time";
     private static final ImmutableList<String> ALL_PROPERTIES = new 
ImmutableList.Builder<String>().add(
             JDBC_URL,
             USER,
@@ -102,16 +110,27 @@ public class JdbcResource extends Resource {
             DRIVER_CLASS,
             DRIVER_URL,
             TYPE,
+            CREATE_TIME,
             ONLY_SPECIFIED_DATABASE,
             LOWER_CASE_TABLE_NAMES,
             INCLUDE_DATABASE_LIST,
-            EXCLUDE_DATABASE_LIST
+            EXCLUDE_DATABASE_LIST,
+            CONNECTION_POOL_MIN_SIZE,
+            CONNECTION_POOL_MAX_SIZE,
+            CONNECTION_POOL_MAX_LIFE_TIME,
+            CONNECTION_POOL_MAX_WAIT_TIME,
+            CONNECTION_POOL_KEEP_ALIVE
     ).build();
     private static final ImmutableList<String> OPTIONAL_PROPERTIES = new 
ImmutableList.Builder<String>().add(
             ONLY_SPECIFIED_DATABASE,
             LOWER_CASE_TABLE_NAMES,
             INCLUDE_DATABASE_LIST,
-            EXCLUDE_DATABASE_LIST
+            EXCLUDE_DATABASE_LIST,
+            CONNECTION_POOL_MIN_SIZE,
+            CONNECTION_POOL_MAX_SIZE,
+            CONNECTION_POOL_MAX_LIFE_TIME,
+            CONNECTION_POOL_MAX_WAIT_TIME,
+            CONNECTION_POOL_KEEP_ALIVE
     ).build();
 
     // The default value of optional properties
@@ -123,6 +142,11 @@ public class JdbcResource extends Resource {
         OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false");
         OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, "");
         OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, "");
+        OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MIN_SIZE, "1");
+        OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_SIZE, "10");
+        OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME, 
"1800000");
+        OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME, 
"5000");
+        OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE, 
"false");
     }
 
     // timeout for both connection and read. 10 seconds is long enough.
@@ -138,7 +162,7 @@ public class JdbcResource extends Resource {
         this(name, Maps.newHashMap());
     }
 
-    private JdbcResource(String name, Map<String, String> configs) {
+    public JdbcResource(String name, Map<String, String> configs) {
         super(name, ResourceType.JDBC);
         this.configs = configs;
     }
@@ -168,13 +192,11 @@ public class JdbcResource extends Resource {
     @Override
     protected void setProperties(Map<String, String> properties) throws 
DdlException {
         Preconditions.checkState(properties != null);
-        for (String key : properties.keySet()) {
-            if (!ALL_PROPERTIES.contains(key)) {
-                throw new DdlException("JDBC resource Property of " + key + " 
is unknown");
-            }
-        }
+        validateProperties(properties);
         configs = properties;
-        handleOptionalArguments();
+        applyDefaultProperties();
+        String currentDateTime = 
LocalDateTime.now(ZoneId.systemDefault()).toString().replace("T", " ");
+        configs.put(CREATE_TIME, currentDateTime);
         // check properties
         for (String property : ALL_PROPERTIES) {
             String value = configs.get(property);
@@ -190,7 +212,9 @@ public class JdbcResource extends Resource {
      * This function used to handle optional arguments
      * eg: only_specified_database、lower_case_table_names
      */
-    private void handleOptionalArguments() {
+
+    @Override
+    public void applyDefaultProperties() {
         for (String s : OPTIONAL_PROPERTIES) {
             if (!configs.containsKey(s)) {
                 configs.put(s, OPTIONAL_PROPERTIES_DEFAULT_VALUE.get(s));
@@ -384,4 +408,56 @@ public class JdbcResource extends Resource {
         }
     }
 
+    public static String getDefaultPropertyValue(String propertyName) {
+        return OPTIONAL_PROPERTIES_DEFAULT_VALUE.getOrDefault(propertyName, 
"");
+    }
+
+    public static void validateProperties(Map<String, String> properties) 
throws DdlException {
+        for (String key : properties.keySet()) {
+            if (!ALL_PROPERTIES.contains(key)) {
+                throw new DdlException("JDBC resource Property of " + key + " 
is unknown");
+            }
+        }
+    }
+
+    public static void checkBooleanProperty(String propertyName, String 
propertyValue) throws DdlException {
+        if (!propertyValue.equalsIgnoreCase("true") && 
!propertyValue.equalsIgnoreCase("false")) {
+            throw new DdlException(propertyName + " must be true or false");
+        }
+    }
+
+    public static void checkDatabaseListProperties(String 
onlySpecifiedDatabase,
+            Map<String, Boolean> includeDatabaseList, Map<String, Boolean> 
excludeDatabaseList) throws DdlException {
+        if (!onlySpecifiedDatabase.equalsIgnoreCase("true")) {
+            if ((includeDatabaseList != null && 
!includeDatabaseList.isEmpty()) || (excludeDatabaseList != null
+                    && !excludeDatabaseList.isEmpty())) {
+                throw new DdlException(
+                        "include_database_list and exclude_database_list "
+                                + "cannot be set when only_specified_database 
is false");
+            }
+        }
+    }
+
+    public static void checkConnectionPoolProperties(int minSize, int maxSize, 
int maxWaitTime, int maxLifeTime)
+            throws DdlException {
+        if (minSize < 0) {
+            throw new DdlException("connection_pool_min_size must be greater 
than or equal to 0");
+        }
+        if (maxSize < 1) {
+            throw new DdlException("connection_pool_max_size must be greater 
than or equal to 1");
+        }
+        if (maxSize < minSize) {
+            throw new DdlException(
+                    "connection_pool_max_size must be greater than or equal to 
connection_pool_min_size");
+        }
+        if (maxWaitTime < 0) {
+            throw new DdlException("connection_pool_max_wait_time must be 
greater than or equal to 0");
+        }
+        if (maxWaitTime > 30000) {
+            throw new DdlException("connection_pool_max_wait_time must be less 
than or equal to 30000");
+        }
+        if (maxLifeTime < 150000) {
+            throw new DdlException("connection_pool_max_life_time must be 
greater than or equal to 150000");
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
index 795f00e7bfe..511d67c91b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
@@ -47,6 +47,7 @@ import java.util.stream.Collectors;
 public class JdbcTable extends Table {
     private static final Logger LOG = LogManager.getLogger(JdbcTable.class);
 
+    private static final String CATALOG_ID = "catalog_id";
     private static final String TABLE = "table";
     private static final String REAL_DATABASE = "real_database";
     private static final String REAL_TABLE = "real_table";
@@ -75,6 +76,14 @@ public class JdbcTable extends Table {
     private String driverUrl;
     private String checkSum;
 
+    private long catalogId = -1;
+
+    private int connectionPoolMinSize;
+    private int connectionPoolMaxSize;
+    private int connectionPoolMaxWaitTime;
+    private int connectionPoolMaxLifeTime;
+    private boolean connectionPoolKeepAlive;
+
     static {
         Map<String, TOdbcTableType> tempMap = new CaseInsensitiveMap();
         tempMap.put("nebula", TOdbcTableType.NEBULA);
@@ -157,6 +166,35 @@ public class JdbcTable extends Table {
         return getFromJdbcResourceOrDefault(JdbcResource.DRIVER_URL, 
driverUrl);
     }
 
+    public long getCatalogId() {
+        return catalogId;
+    }
+
+    public int getConnectionPoolMinSize() {
+        return 
Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MIN_SIZE,
+                String.valueOf(connectionPoolMinSize)));
+    }
+
+    public int getConnectionPoolMaxSize() {
+        return 
Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_SIZE,
+                String.valueOf(connectionPoolMaxSize)));
+    }
+
+    public int getConnectionPoolMaxWaitTime() {
+        return 
Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME,
+                String.valueOf(connectionPoolMaxWaitTime)));
+    }
+
+    public int getConnectionPoolMaxLifeTime() {
+        return 
Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME,
+                String.valueOf(connectionPoolMaxLifeTime)));
+    }
+
+    public boolean isConnectionPoolKeepAlive() {
+        return 
Boolean.parseBoolean(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_KEEP_ALIVE,
+                String.valueOf(connectionPoolKeepAlive)));
+    }
+
     private String getFromJdbcResourceOrDefault(String key, String defaultVal) 
{
         if (Strings.isNullOrEmpty(resourceName)) {
             return defaultVal;
@@ -171,6 +209,7 @@ public class JdbcTable extends Table {
     @Override
     public TTableDescriptor toThrift() {
         TJdbcTable tJdbcTable = new TJdbcTable();
+        tJdbcTable.setCatalogId(catalogId);
         tJdbcTable.setJdbcUrl(getJdbcUrl());
         tJdbcTable.setJdbcUser(getJdbcUser());
         tJdbcTable.setJdbcPassword(getJdbcPasswd());
@@ -179,6 +218,11 @@ public class JdbcTable extends Table {
         tJdbcTable.setJdbcDriverUrl(getDriverUrl());
         tJdbcTable.setJdbcResourceName(resourceName);
         tJdbcTable.setJdbcDriverChecksum(checkSum);
+        tJdbcTable.setConnectionPoolMinSize(getConnectionPoolMinSize());
+        tJdbcTable.setConnectionPoolMaxSize(getConnectionPoolMaxSize());
+        
tJdbcTable.setConnectionPoolMaxWaitTime(getConnectionPoolMaxWaitTime());
+        
tJdbcTable.setConnectionPoolMaxLifeTime(getConnectionPoolMaxLifeTime());
+        tJdbcTable.setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());
         TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.JDBC_TABLE, fullSchema.size(), 0,
                 getName(), "");
         tTableDescriptor.setJdbcTable(tJdbcTable);
@@ -189,6 +233,7 @@ public class JdbcTable extends Table {
     public void write(DataOutput out) throws IOException {
         super.write(out);
         Map<String, String> serializeMap = Maps.newHashMap();
+        serializeMap.put(CATALOG_ID, String.valueOf(catalogId));
         serializeMap.put(TABLE, externalTableName);
         serializeMap.put(RESOURCE, resourceName);
         serializeMap.put(TABLE_TYPE, jdbcTypeName);
@@ -225,6 +270,7 @@ public class JdbcTable extends Table {
             String value = Text.readString(in);
             serializeMap.put(key, value);
         }
+        catalogId = serializeMap.get(CATALOG_ID) != null ? 
Long.parseLong(serializeMap.get(CATALOG_ID)) : -1;
         externalTableName = serializeMap.get(TABLE);
         resourceName = serializeMap.get(RESOURCE);
         jdbcTypeName = serializeMap.get(TABLE_TYPE);
@@ -340,6 +386,14 @@ public class JdbcTable extends Table {
         driverClass = jdbcResource.getProperty(DRIVER_CLASS);
         driverUrl = jdbcResource.getProperty(DRIVER_URL);
         checkSum = jdbcResource.getProperty(CHECK_SUM);
+        connectionPoolMinSize = 
Integer.parseInt(jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MIN_SIZE));
+        connectionPoolMaxSize = 
Integer.parseInt(jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_SIZE));
+        connectionPoolMaxWaitTime = Integer.parseInt(
+                
jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME));
+        connectionPoolMaxLifeTime = Integer.parseInt(
+                
jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME));
+        connectionPoolKeepAlive = Boolean.parseBoolean(
+                
jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_KEEP_ALIVE));
 
         String urlType = jdbcUrl.split(":")[1];
         if (!jdbcTypeName.equalsIgnoreCase(urlType)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
index 781c5fb3f61..1a51d42f57a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
@@ -296,4 +296,6 @@ public abstract class Resource implements Writable, 
GsonPostProcessable {
             }
         });
     }
+
+    public void applyDefaultProperties() {}
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
index 5696f1cd258..5718f8af07e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
@@ -95,6 +95,7 @@ public class ResourceMgr implements Writable {
     }
 
     public void replayCreateResource(Resource resource) {
+        resource.applyDefaultProperties();
         nameToResource.put(resource.getName(), resource);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
index a02c59080fc..05023591bfd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
@@ -87,6 +87,7 @@ public class JdbcExternalTable extends ExternalTable {
         JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog;
         String fullDbName = this.dbName + "." + this.name;
         JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, 
TableType.JDBC_EXTERNAL_TABLE);
+        jdbcTable.setCatalogId(jdbcCatalog.getId());
         jdbcTable.setExternalTableName(fullDbName);
         jdbcTable.setRealDatabaseName(((JdbcExternalCatalog) 
catalog).getJdbcClient().getRealDatabaseName(this.dbName));
         jdbcTable.setRealTableName(
@@ -99,6 +100,11 @@ public class JdbcExternalTable extends ExternalTable {
         jdbcTable.setDriverUrl(jdbcCatalog.getDriverUrl());
         jdbcTable.setResourceName(jdbcCatalog.getResource());
         jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
+        
jdbcTable.setConnectionPoolMinSize(jdbcCatalog.getConnectionPoolMinSize());
+        
jdbcTable.setConnectionPoolMaxSize(jdbcCatalog.getConnectionPoolMaxSize());
+        
jdbcTable.setConnectionPoolMaxLifeTime(jdbcCatalog.getConnectionPoolMaxLifeTime());
+        
jdbcTable.setConnectionPoolMaxWaitTime(jdbcCatalog.getConnectionPoolMaxWaitTime());
+        
jdbcTable.setConnectionPoolKeepAlive(jdbcCatalog.isConnectionPoolKeepAlive());
         return jdbcTable;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index daeb5feed73..c8dfb7dfe4d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -65,6 +65,17 @@ public class JdbcExternalCatalog extends ExternalCatalog {
                 throw new DdlException("Required property '" + 
requiredProperty + "' is missing");
             }
         }
+
+        Map<String, String> propertiesWithoutCheckSum = 
Maps.newHashMap(catalogProperty.getProperties());
+        propertiesWithoutCheckSum.remove(JdbcResource.CHECK_SUM);
+        JdbcResource.validateProperties(propertiesWithoutCheckSum);
+
+        
JdbcResource.checkBooleanProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, 
getOnlySpecifiedDatabase());
+        JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_TABLE_NAMES, 
getLowerCaseTableNames());
+        JdbcResource.checkDatabaseListProperties(getOnlySpecifiedDatabase(), 
getIncludeDatabaseMap(),
+                getExcludeDatabaseMap());
+        JdbcResource.checkConnectionPoolProperties(getConnectionPoolMinSize(), 
getConnectionPoolMaxSize(),
+                getConnectionPoolMaxWaitTime(), 
getConnectionPoolMaxLifeTime());
     }
 
     @Override
@@ -130,7 +141,8 @@ public class JdbcExternalCatalog extends ExternalCatalog {
     }
 
     public String getOnlySpecifiedDatabase() {
-        return 
catalogProperty.getOrDefault(JdbcResource.ONLY_SPECIFIED_DATABASE, "false");
+        return 
catalogProperty.getOrDefault(JdbcResource.ONLY_SPECIFIED_DATABASE, 
JdbcResource.getDefaultPropertyValue(
+                JdbcResource.ONLY_SPECIFIED_DATABASE));
     }
 
     public String getLowerCaseTableNames() {
@@ -140,7 +152,33 @@ public class JdbcExternalCatalog extends ExternalCatalog {
         }
 
         // Otherwise, it defaults to false
-        return 
catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, "false");
+        return 
catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, 
JdbcResource.getDefaultPropertyValue(
+                JdbcResource.LOWER_CASE_TABLE_NAMES));
+    }
+
+    public int getConnectionPoolMinSize() {
+        return 
Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MIN_SIZE,
 JdbcResource
+                
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MIN_SIZE)));
+    }
+
+    public int getConnectionPoolMaxSize() {
+        return 
Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_SIZE,
 JdbcResource
+                
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_SIZE)));
+    }
+
+    public int getConnectionPoolMaxWaitTime() {
+        return 
Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME,
 JdbcResource
+                
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME)));
+    }
+
+    public int getConnectionPoolMaxLifeTime() {
+        return 
Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME,
 JdbcResource
+                
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME)));
+    }
+
+    public boolean isConnectionPoolKeepAlive() {
+        return 
Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.CONNECTION_POOL_KEEP_ALIVE,
 JdbcResource
+                
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE)));
     }
 
     @Override
@@ -155,7 +193,12 @@ public class JdbcExternalCatalog extends ExternalCatalog {
                 .setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
                 .setIsLowerCaseTableNames(getLowerCaseTableNames())
                 .setIncludeDatabaseMap(getIncludeDatabaseMap())
-                .setExcludeDatabaseMap(getExcludeDatabaseMap());
+                .setExcludeDatabaseMap(getExcludeDatabaseMap())
+                .setConnectionPoolMinSize(getConnectionPoolMinSize())
+                .setConnectionPoolMaxSize(getConnectionPoolMaxSize())
+                .setConnectionPoolMaxLifeTime(getConnectionPoolMaxLifeTime())
+                .setConnectionPoolMaxWaitTime(getConnectionPoolMaxWaitTime())
+                .setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());
 
         jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig);
     }
@@ -193,22 +236,5 @@ public class JdbcExternalCatalog extends ExternalCatalog {
             properties.put(JdbcResource.CHECK_SUM,
                     
JdbcResource.computeObjectChecksum(properties.get(JdbcResource.DRIVER_URL)));
         }
-        String onlySpecifiedDatabase = getOnlySpecifiedDatabase();
-        if (!onlySpecifiedDatabase.equalsIgnoreCase("true") && 
!onlySpecifiedDatabase.equalsIgnoreCase("false")) {
-            throw new DdlException("only_specified_database must be true or 
false");
-        }
-        String lowerCaseTableNames = getLowerCaseTableNames();
-        if (!lowerCaseTableNames.equalsIgnoreCase("true") && 
!lowerCaseTableNames.equalsIgnoreCase("false")) {
-            throw new DdlException("lower_case_table_names must be true or 
false");
-        }
-        if (!onlySpecifiedDatabase.equalsIgnoreCase("true")) {
-            Map<String, Boolean> includeDatabaseList = getIncludeDatabaseMap();
-            Map<String, Boolean> excludeDatabaseList = getExcludeDatabaseMap();
-            if ((includeDatabaseList != null && !includeDatabaseList.isEmpty())
-                    || (excludeDatabaseList != null && 
!excludeDatabaseList.isEmpty())) {
-                throw new DdlException("include_database_list and 
exclude_database_list can not be set when "
-                        + "only_specified_database is false");
-            }
-        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
index 796d0db64fb..60daac66b6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.JdbcResource;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.Util;
 
@@ -109,17 +108,16 @@ public abstract class JdbcClient {
                 
Optional.ofNullable(jdbcClientConfig.getExcludeDatabaseMap()).orElse(Collections.emptyMap());
         String jdbcUrl = jdbcClientConfig.getJdbcUrl();
         this.dbType = parseDbType(jdbcUrl);
-        initializeDataSource(jdbcClientConfig.getPassword(), jdbcUrl, 
jdbcClientConfig.getDriverUrl(),
-                jdbcClientConfig.getDriverClass());
+        initializeDataSource(jdbcClientConfig);
     }
 
     // Initialize DruidDataSource
-    private void initializeDataSource(String password, String jdbcUrl, String 
driverUrl, String driverClass) {
+    private void initializeDataSource(JdbcClientConfig config) {
         ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             // TODO(ftw): The problem here is that the jar package is handled 
by FE
             //  and URLClassLoader may load the jar package directly into 
memory
-            URL[] urls = {new URL(JdbcResource.getFullDriverUrl(driverUrl))};
+            URL[] urls = {new 
URL(JdbcResource.getFullDriverUrl(config.getDriverUrl()))};
             // set parent ClassLoader to null, we can achieve class loading 
isolation.
             ClassLoader parent = getClass().getClassLoader();
             ClassLoader classLoader = URLClassLoader.newInstance(urls, parent);
@@ -128,24 +126,29 @@ public abstract class JdbcClient {
             Thread.currentThread().setContextClassLoader(classLoader);
             dataSource = new DruidDataSource();
             dataSource.setDriverClassLoader(classLoader);
-            dataSource.setDriverClassName(driverClass);
-            dataSource.setUrl(jdbcUrl);
-            dataSource.setUsername(jdbcUser);
-            dataSource.setPassword(password);
-            dataSource.setMinIdle(Config.jdbc_min_pool_size > 0 ? 1 : 0);
-            dataSource.setInitialSize(Config.jdbc_min_pool_size);
-            dataSource.setMaxActive(Config.jdbc_max_pool_size);
-            
dataSource.setTimeBetweenEvictionRunsMillis(Config.jdbc_max_idle_time * 2L);
-            
dataSource.setMinEvictableIdleTimeMillis(Config.jdbc_max_idle_time);
+            dataSource.setDriverClassName(config.getDriverClass());
+            dataSource.setUrl(config.getJdbcUrl());
+            dataSource.setUsername(config.getUser());
+            dataSource.setPassword(config.getPassword());
+            dataSource.setMinIdle(config.getConnectionPoolMinSize()); // 
default 1
+            dataSource.setInitialSize(config.getConnectionPoolMinSize()); // 
default 1
+            dataSource.setMaxActive(config.getConnectionPoolMaxSize()); // 
default 10
             // set connection timeout to 5s.
             // The default is 30s, which is too long.
             // Because when querying information_schema db, BE will call 
thrift rpc(default timeout is 30s)
             // to FE to get schema info, and may create connection here, if we 
set it too long and the url is invalid,
             // it may cause the thrift rpc timeout.
-            dataSource.setMaxWait(Config.jdbc_max_wait_time);
-            dataSource.setKeepAlive(Config.jdbc_keep_alive);
+            dataSource.setMaxWait(config.getConnectionPoolMaxWaitTime()); // 
default 5000
+            
dataSource.setTimeBetweenEvictionRunsMillis(config.getConnectionPoolMaxLifeTime()
 / 10L); // default 3 min
+            
dataSource.setMinEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime() 
/ 2L); // default 15 min
+            
dataSource.setMaxEvictableIdleTimeMillis(config.getConnectionPoolMaxLifeTime());
 // default 30 min
+            LOG.info("JdbcClient set"
+                    + " ConnectionPoolMinSize = " + 
config.getConnectionPoolMinSize()
+                    + ", ConnectionPoolMaxSize = " + 
config.getConnectionPoolMaxSize()
+                    + ", ConnectionPoolMaxWaitTime = " + 
config.getConnectionPoolMaxWaitTime()
+                    + ", ConnectionPoolMaxLifeTime = " + 
config.getConnectionPoolMaxLifeTime());
         } catch (MalformedURLException e) {
-            throw new JdbcClientException("MalformedURLException to load class 
about " + driverUrl, e);
+            throw new JdbcClientException("MalformedURLException to load class 
about " + config.getDriverUrl(), e);
         } finally {
             Thread.currentThread().setContextClassLoader(oldClassLoader);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
index 69dedd16d7b..4e8fa63ee33 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
@@ -18,6 +18,11 @@
 
 package org.apache.doris.datasource.jdbc.client;
 
+
+import org.apache.doris.catalog.JdbcResource;
+
+import com.google.common.collect.Maps;
+
 import java.util.Map;
 
 public class JdbcClientConfig {
@@ -31,6 +36,28 @@ public class JdbcClientConfig {
     private String isLowerCaseTableNames;
     private Map<String, Boolean> includeDatabaseMap;
     private Map<String, Boolean> excludeDatabaseMap;
+    private int connectionPoolMinSize;
+    private int connectionPoolMaxSize;
+    private int connectionPoolMaxWaitTime;
+    private int connectionPoolMaxLifeTime;
+    private boolean connectionPoolKeepAlive;
+
+    public JdbcClientConfig() {
+        this.onlySpecifiedDatabase = 
JdbcResource.getDefaultPropertyValue(JdbcResource.ONLY_SPECIFIED_DATABASE);
+        this.isLowerCaseTableNames = 
JdbcResource.getDefaultPropertyValue(JdbcResource.LOWER_CASE_TABLE_NAMES);
+        this.connectionPoolMinSize = Integer.parseInt(
+                
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MIN_SIZE));
+        this.connectionPoolMaxSize = Integer.parseInt(
+                
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_SIZE));
+        this.connectionPoolMaxWaitTime = Integer.parseInt(
+                
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_WAIT_TIME));
+        this.connectionPoolMaxLifeTime = Integer.parseInt(
+                
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MAX_LIFE_TIME));
+        this.connectionPoolKeepAlive = Boolean.parseBoolean(
+                
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE));
+        this.includeDatabaseMap = Maps.newHashMap();
+        this.excludeDatabaseMap = Maps.newHashMap();
+    }
 
     public String getCatalog() {
         return catalog;
@@ -104,6 +131,51 @@ public class JdbcClientConfig {
         return this;
     }
 
+    public int getConnectionPoolMinSize() {
+        return connectionPoolMinSize;
+    }
+
+    public JdbcClientConfig setConnectionPoolMinSize(int 
connectionPoolMinSize) {
+        this.connectionPoolMinSize = connectionPoolMinSize;
+        return this;
+    }
+
+    public int getConnectionPoolMaxSize() {
+        return connectionPoolMaxSize;
+    }
+
+    public JdbcClientConfig setConnectionPoolMaxSize(int 
connectionPoolMaxSize) {
+        this.connectionPoolMaxSize = connectionPoolMaxSize;
+        return this;
+    }
+
+    public int getConnectionPoolMaxLifeTime() {
+        return connectionPoolMaxLifeTime;
+    }
+
+    public JdbcClientConfig setConnectionPoolMaxLifeTime(int 
connectionPoolMaxLifeTime) {
+        this.connectionPoolMaxLifeTime = connectionPoolMaxLifeTime;
+        return this;
+    }
+
+    public int getConnectionPoolMaxWaitTime() {
+        return connectionPoolMaxWaitTime;
+    }
+
+    public JdbcClientConfig setConnectionPoolMaxWaitTime(int 
connectionPoolMaxWaitTime) {
+        this.connectionPoolMaxWaitTime = connectionPoolMaxWaitTime;
+        return this;
+    }
+
+    public boolean isConnectionPoolKeepAlive() {
+        return connectionPoolKeepAlive;
+    }
+
+    public JdbcClientConfig setConnectionPoolKeepAlive(boolean 
connectionPoolKeepAlive) {
+        this.connectionPoolKeepAlive = connectionPoolKeepAlive;
+        return this;
+    }
+
     public Map<String, Boolean> getIncludeDatabaseMap() {
         return includeDatabaseMap;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
index 1f85ac3feff..f55821953e2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
@@ -37,30 +37,19 @@ import java.util.List;
 public class JdbcTableSink extends DataSink {
     private static final Logger LOG = 
LogManager.getLogger(JdbcTableSink.class);
 
-    private final String resourceName;
     private final String externalTableName;
     private final String dorisTableName;
-    private final String jdbcUrl;
-    private final String jdbcUser;
-    private final String jdbcPasswd;
-    private final String driverClass;
-    private final String driverUrl;
-    private final String checkSum;
     private final TOdbcTableType jdbcType;
     private final boolean useTransaction;
     private String insertSql;
 
+    private JdbcTable jdbcTable;
+
     public JdbcTableSink(JdbcTable jdbcTable, List<String> insertCols) {
-        resourceName = jdbcTable.getResourceName();
+        this.jdbcTable = jdbcTable;
         jdbcType = jdbcTable.getJdbcTableType();
         externalTableName = jdbcTable.getProperRealFullTableName(jdbcType);
         useTransaction = 
ConnectContext.get().getSessionVariable().isEnableOdbcTransaction();
-        jdbcUrl = jdbcTable.getJdbcUrl();
-        jdbcUser = jdbcTable.getJdbcUser();
-        jdbcPasswd = jdbcTable.getJdbcPasswd();
-        driverClass = jdbcTable.getDriverClass();
-        driverUrl = jdbcTable.getDriverUrl();
-        checkSum = jdbcTable.getCheckSum();
         dorisTableName = jdbcTable.getName();
         insertSql = jdbcTable.getInsertSql(insertCols);
     }
@@ -81,20 +70,11 @@ public class JdbcTableSink extends DataSink {
     protected TDataSink toThrift() {
         TDataSink tDataSink = new TDataSink(TDataSinkType.JDBC_TABLE_SINK);
         TJdbcTableSink jdbcTableSink = new TJdbcTableSink();
-        TJdbcTable jdbcTable = new TJdbcTable();
+        TJdbcTable jdbcTable = this.jdbcTable.toThrift().getJdbcTable();
         jdbcTableSink.setJdbcTable(jdbcTable);
-        jdbcTableSink.jdbc_table.setJdbcUrl(jdbcUrl);
-        jdbcTableSink.jdbc_table.setJdbcUser(jdbcUser);
-        jdbcTableSink.jdbc_table.setJdbcPassword(jdbcPasswd);
-        jdbcTableSink.jdbc_table.setJdbcTableName(externalTableName);
-        jdbcTableSink.jdbc_table.setJdbcDriverUrl(driverUrl);
-        jdbcTableSink.jdbc_table.setJdbcDriverClass(driverClass);
-        jdbcTableSink.jdbc_table.setJdbcDriverChecksum(checkSum);
-        jdbcTableSink.jdbc_table.setJdbcResourceName(resourceName);
         jdbcTableSink.setInsertSql(insertSql);
         jdbcTableSink.setUseTransaction(useTransaction);
         jdbcTableSink.setTableType(jdbcType);
-
         tDataSink.setJdbcTableSink(jdbcTableSink);
         return tDataSink;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
index e91898fb82d..ee0d1949be2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
@@ -17,13 +17,154 @@
 
 package org.apache.doris.catalog;
 
+import org.apache.doris.analysis.AccessTestUtil;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.CreateResourceStmt;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.AccessControllerManager;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
 
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Map;
+
 public class JdbcResourceTest {
 
+    private final ResourceMgr resourceMgr = new ResourceMgr();
+
+    private Map<String, String> jdbcProperties;
+
+    private Analyzer analyzer;
+
+    @Before
+    public void setUp() {
+        FeConstants.runningUnitTest = true;
+        analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
+        jdbcProperties = Maps.newHashMap();
+        jdbcProperties.put("type", "jdbc");
+        jdbcProperties.put("user", "postgres");
+        jdbcProperties.put("password", "");
+        jdbcProperties.put("jdbc_url", 
"jdbc:postgresql://127.0.0.1:5432/postgres?currentSchema=doris_test");
+        jdbcProperties.put("driver_url", "postgresql-42.5.0.jar");
+        jdbcProperties.put("driver_class", "org.postgresql.Driver");
+        jdbcProperties.put("checksum", "20c8228267b6c9ce620fddb39467d3eb");
+    }
+
+    @Test
+    public void testJdbcResourceCreateWithDefaultProperties(@Mocked Env env,
+            @Injectable AccessControllerManager accessManager)
+            throws UserException {
+        new Expectations() {
+            {
+                env.getAccessManager();
+                result = accessManager;
+                accessManager.checkGlobalPriv((ConnectContext) any, 
PrivPredicate.ADMIN);
+                result = true;
+            }
+        };
+
+        jdbcProperties.remove("checksum");
+
+        CreateResourceStmt stmt = new CreateResourceStmt(true, false, 
"jdbc_resource_pg_14", jdbcProperties);
+
+        stmt.analyze(analyzer);
+
+        resourceMgr.createResource(stmt);
+
+        JdbcResource jdbcResource = (JdbcResource) 
resourceMgr.getResource("jdbc_resource_pg_14");
+
+
+        // Verify the default properties were applied during the replay
+        Map<String, String> properties = jdbcResource.getCopiedProperties();
+        Assert.assertEquals("1", properties.get("connection_pool_min_size"));
+        Assert.assertEquals("10", properties.get("connection_pool_max_size"));
+        Assert.assertEquals("1800000", 
properties.get("connection_pool_max_life_time"));
+        Assert.assertEquals("5000", 
properties.get("connection_pool_max_wait_time"));
+        Assert.assertEquals("false", 
properties.get("connection_pool_keep_alive"));
+    }
+
+    @Test
+    public void testJdbcResourceReplayWithDefaultProperties() {
+
+        JdbcResource jdbcResource = new JdbcResource("jdbc_resource_pg_14", 
jdbcProperties);
+
+        // Replay the resource creation to simulate the edit log replay
+        resourceMgr.replayCreateResource(jdbcResource);
+
+        // Retrieve the replayed resource
+        Resource replayedResource = 
resourceMgr.getResource("jdbc_resource_pg_14");
+
+        Assert.assertNotNull(replayedResource);
+        Assert.assertTrue(replayedResource instanceof JdbcResource);
+
+        // Verify the default properties were applied during the replay
+        Map<String, String> properties = 
replayedResource.getCopiedProperties();
+        Assert.assertEquals("1", properties.get("connection_pool_min_size"));
+        Assert.assertEquals("10", properties.get("connection_pool_max_size"));
+        Assert.assertEquals("1800000", 
properties.get("connection_pool_max_life_time"));
+        Assert.assertEquals("5000", 
properties.get("connection_pool_max_wait_time"));
+        Assert.assertEquals("false", 
properties.get("connection_pool_keep_alive"));
+    }
+
+    @Test
+    public void testJdbcResourceReplayWithSetProperties() {
+
+        // Add some properties to the JDBC properties
+        jdbcProperties.put("connection_pool_min_size", "2");
+        jdbcProperties.put("connection_pool_max_size", "20");
+        jdbcProperties.put("connection_pool_max_life_time", "3600000");
+        jdbcProperties.put("connection_pool_max_wait_time", "10000");
+        jdbcProperties.put("connection_pool_keep_alive", "true");
+
+        JdbcResource jdbcResource = new JdbcResource("jdbc_resource_pg_14", 
jdbcProperties);
+
+        // Replay the resource creation to simulate the edit log replay
+        resourceMgr.replayCreateResource(jdbcResource);
+
+        // Retrieve the replayed resource
+        Resource replayedResource = 
resourceMgr.getResource("jdbc_resource_pg_14");
+
+        Assert.assertNotNull(replayedResource);
+        Assert.assertTrue(replayedResource instanceof JdbcResource);
+
+        // Verify the modified properties were applied during the replay
+        Map<String, String> properties = 
replayedResource.getCopiedProperties();
+        Assert.assertEquals("2", properties.get("connection_pool_min_size"));
+        Assert.assertEquals("20", properties.get("connection_pool_max_size"));
+        Assert.assertEquals("3600000", 
properties.get("connection_pool_max_life_time"));
+        Assert.assertEquals("10000", 
properties.get("connection_pool_max_wait_time"));
+        Assert.assertEquals("true", 
properties.get("connection_pool_keep_alive"));
+    }
+
+    @Test
+    public void testJdbcResourceReplayWithModifiedAfterSetDefaultProperties() 
throws DdlException {
+        JdbcResource jdbcResource = new JdbcResource("jdbc_resource_pg_14", 
jdbcProperties);
+
+        // Replay the resource creation to simulate the edit log replay
+        resourceMgr.replayCreateResource(jdbcResource);
+
+        // Retrieve the replayed resource
+        Resource replayedResource = 
resourceMgr.getResource("jdbc_resource_pg_14");
+        Map<String, String> newProperties = Maps.newHashMap();
+        newProperties.put(JdbcResource.CONNECTION_POOL_MIN_SIZE, "2");
+        replayedResource.modifyProperties(newProperties);
+        Map<String, String> properties = 
replayedResource.getCopiedProperties();
+        Assert.assertEquals("2", properties.get("connection_pool_min_size"));
+        resourceMgr.replayCreateResource(replayedResource);
+        Resource replayedResource2 = 
resourceMgr.getResource("jdbc_resource_pg_14");
+        Map<String, String> properties2 = 
replayedResource2.getCopiedProperties();
+        Assert.assertEquals("2", properties2.get("connection_pool_min_size"));
+    }
+
     @Test
     public void testHandleJdbcUrlForMySql() throws DdlException {
         String inputUrl = "jdbc:mysql://127.0.0.1:3306/test";
@@ -36,7 +177,7 @@ public class JdbcResourceTest {
 
     @Test
     public void testHandleJdbcUrlForSqlServerWithoutParams() throws 
DdlException {
-        String inputUrl = 
"jdbc:sqlserver://43.129.237.12:1433;databaseName=doris_test";
+        String inputUrl = 
"jdbc:sqlserver://127.0.0.1:1433;databaseName=doris_test";
         String resultUrl = JdbcResource.handleJdbcUrl(inputUrl);
 
         // Ensure that the result URL for SQL Server doesn't have '?' or '&'
@@ -49,7 +190,8 @@ public class JdbcResourceTest {
 
     @Test
     public void testHandleJdbcUrlForSqlServerWithParams() throws DdlException {
-        String inputUrl = 
"jdbc:sqlserver://43.129.237.12:1433;encrypt=false;databaseName=doris_test;trustServerCertificate=false";
+        String inputUrl
+                = 
"jdbc:sqlserver://127.0.0.1:1433;encrypt=false;databaseName=doris_test;trustServerCertificate=false";
         String resultUrl = JdbcResource.handleJdbcUrl(inputUrl);
 
         // Ensure that the result URL for SQL Server doesn't have '?' or '&'
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
index 0f2977a9886..8394daf0682 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
@@ -20,10 +20,12 @@ package org.apache.doris.datasource.jdbc;
 import org.apache.doris.catalog.JdbcResource;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.datasource.CatalogFactory;
 
+import com.google.common.collect.Maps;
 import org.junit.Assert;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -31,32 +33,54 @@ import java.util.Map;
 public class JdbcExternalCatalogTest {
     private JdbcExternalCatalog jdbcExternalCatalog;
 
-    @BeforeEach
+    @Before
     public void setUp() throws DdlException {
         FeConstants.runningUnitTest = true;
         Map<String, String> properties = new HashMap<>();
+        properties.put("type", "jdbc");
         properties.put(JdbcResource.DRIVER_URL, "ojdbc8.jar");
         properties.put(JdbcResource.JDBC_URL, 
"jdbc:oracle:thin:@127.0.0.1:1521:XE");
         properties.put(JdbcResource.DRIVER_CLASS, 
"oracle.jdbc.driver.OracleDriver");
-        jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog", 
"testResource", properties, "testComment");
+        jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog", null, 
properties, "testComment");
     }
 
     @Test
-    public void setDefaultPropsWhenCreatingTest() {
+    public void replayJdbcCatalogTest() throws DdlException {
+        
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.CONNECTION_POOL_MIN_SIZE,
 "1");
+        JdbcExternalCatalog replayJdbcCatalog = (JdbcExternalCatalog) 
CatalogFactory.createFromLog(
+                jdbcExternalCatalog.constructEditLog());
+        Map<String, String> properties = replayJdbcCatalog.getProperties();
+        Assert.assertEquals("1", properties.get("connection_pool_min_size"));
+        Map<String, String> newProperties = Maps.newHashMap();
+        newProperties.put(JdbcResource.CONNECTION_POOL_MIN_SIZE, "2");
+        
jdbcExternalCatalog.getCatalogProperty().modifyCatalogProps(newProperties);
+        jdbcExternalCatalog.notifyPropertiesUpdated(newProperties);
+        JdbcExternalCatalog replayJdbcCatalog2 = (JdbcExternalCatalog) 
CatalogFactory.createFromLog(
+                jdbcExternalCatalog.constructEditLog());
+        Map<String, String> properties2 = replayJdbcCatalog2.getProperties();
+        Assert.assertEquals("2", properties2.get("connection_pool_min_size"));
+    }
+
+    @Test
+    public void checkPropertiesTest() {
         
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.ONLY_SPECIFIED_DATABASE,
 "1");
-        Exception exception1 = Assert.assertThrows(DdlException.class, () -> 
jdbcExternalCatalog.setDefaultPropsWhenCreating(false));
-        Assert.assertEquals("errCode = 2, detailMessage = 
only_specified_database must be true or false", exception1.getMessage());
+        Exception exception1 = Assert.assertThrows(DdlException.class, () -> 
jdbcExternalCatalog.checkProperties());
+        Assert.assertEquals("errCode = 2, detailMessage = 
only_specified_database must be true or false",
+                exception1.getMessage());
 
         
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.ONLY_SPECIFIED_DATABASE,
 "true");
         
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.LOWER_CASE_TABLE_NAMES,
 "1");
-        Exception exception2 = Assert.assertThrows(DdlException.class, () -> 
jdbcExternalCatalog.setDefaultPropsWhenCreating(false));
-        Assert.assertEquals("errCode = 2, detailMessage = 
lower_case_table_names must be true or false", exception2.getMessage());
+        Exception exception2 = Assert.assertThrows(DdlException.class, () -> 
jdbcExternalCatalog.checkProperties());
+        Assert.assertEquals("errCode = 2, detailMessage = 
lower_case_table_names must be true or false",
+                exception2.getMessage());
 
         
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.ONLY_SPECIFIED_DATABASE,
 "false");
         
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.LOWER_CASE_TABLE_NAMES,
 "false");
         
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.INCLUDE_DATABASE_LIST,
 "db1,db2");
-        DdlException exceptione3 = Assert.assertThrows(DdlException.class, () 
-> jdbcExternalCatalog.setDefaultPropsWhenCreating(false));
-        Assert.assertEquals("errCode = 2, detailMessage = 
include_database_list and exclude_database_list can not be set when 
only_specified_database is false", exceptione3.getMessage());
+        DdlException exceptione3 = Assert.assertThrows(DdlException.class, () 
-> jdbcExternalCatalog.checkProperties());
+        Assert.assertEquals(
+                "errCode = 2, detailMessage = include_database_list and 
exclude_database_list cannot be set when only_specified_database is false",
+                exceptione3.getMessage());
 
     }
 }
diff --git a/fe/pom.xml b/fe/pom.xml
index fa994bcadcb..695230bde6b 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -251,7 +251,7 @@ under the License.
         <commons-io.version>2.7</commons-io.version>
         <json-simple.version>1.1.1</json-simple.version>
         <junit.version>5.8.2</junit.version>
-        <druid.version>1.2.5</druid.version>
+        <druid.version>1.2.20</druid.version>
         <clickhouse.version>0.4.6</clickhouse.version>
         <thrift.version>0.16.0</thrift.version>
         <tomcat-embed-core.version>8.5.86</tomcat-embed-core.version>
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 37adc3d4274..6507d04c2dd 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -318,7 +318,12 @@ struct TJdbcTable {
   6: optional string jdbc_resource_name
   7: optional string jdbc_driver_class
   8: optional string jdbc_driver_checksum
-  
+  9: optional i32 connection_pool_min_size
+  10: optional i32 connection_pool_max_size
+  11: optional i32 connection_pool_max_wait_time
+  12: optional i32 connection_pool_max_life_time
+  13: optional bool connection_pool_keep_alive
+  14: optional i64 catalog_id
 }
 
 struct TMCTable {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 25a26b575b6..fccf90e173d 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -421,6 +421,14 @@ struct TJdbcExecutorCtorParams {
   8: optional string driver_path
 
   9: optional TOdbcTableType table_type
+
+  10: optional i32 connection_pool_min_size
+  11: optional i32 connection_pool_max_size
+  12: optional i32 connection_pool_max_wait_time
+  13: optional i32 connection_pool_max_life_time
+  14: optional i32 connection_pool_cache_clear_time
+  15: optional bool connection_pool_keep_alive
+  16: optional i64 catalog_id
 }
 
 struct TJavaUdfExecutorCtorParams {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to