This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
     new d370d053a [feture-2141,2194] Change dbcp to druid in JDBC engine and 
reservation for multiple data source implementation. (#2196)
d370d053a is described below

commit d370d053a9047a42318680bdc2fdcd652959b96f
Author: weixiao <[email protected]>
AuthorDate: Wed Jun 1 11:17:25 2022 +0800

    [feture-2141,2194] Change dbcp to druid in JDBC engine and reservation for 
multiple data source implementation. (#2196)
    
    * [feture-2141,2194] Change dbcp to dbcp2 in JDBC engine and reservation 
for multiple data source implementation.
    
    * [feture-2141,2194] Change dbcp to druid in JDBC engine and reservation 
for multiple data source implementation.
---
 db/linkis_dml.sql                                  |   3 +-
 .../engineconn-plugins/jdbc/pom.xml                |  11 +
 .../engineplugin/jdbc/ConnectionManager.java       | 424 +++++++++++++--------
 .../jdbc/JDBCDataSourceConfigurations.java         |  52 +++
 ...ConnConstant.java => JDBCPropertiesParser.java} |  26 +-
 .../engineplugin/jdbc/PropertiesParser.java        |  48 +++
 .../jdbc/constant/JDBCEngineConnConstant.java      |  31 ++
 .../engineplugin/jdbc/conf/JDBCConfiguration.scala |   4 -
 .../jdbc/executer/JDBCEngineConnExecutor.scala     | 200 ++++++----
 .../engineplugin/jdbc/ConnectionManagerTest.java   |  45 +--
 pom.xml                                            |   2 +-
 11 files changed, 559 insertions(+), 287 deletions(-)

diff --git a/db/linkis_dml.sql b/db/linkis_dml.sql
index 2c16736bc..1dda5ca8f 100644
--- a/db/linkis_dml.sql
+++ b/db/linkis_dml.sql
@@ -104,7 +104,8 @@ INSERT INTO `linkis_ps_configuration_config_key` (`key`, 
`description`, `name`,
 INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, 
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, 
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES 
('pipeline.output.shuffle.null.type', '取值范围:NULL或者BLANK', '空值替换','NULL', 'OFT', 
'[\"NULL\",\"BLANK\"]', '0', '0', '1', 'pipeline引擎设置', 'pipeline');
 -- jdbc
 insert into `linkis_ps_configuration_config_key` (`key`, `description`, 
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, 
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES 
('wds.linkis.jdbc.connect.url', '例如:jdbc:hive2://127.0.0.1:10000', 'jdbc连接地址', 
'jdbc:hive2://127.0.0.1:10000', 'Regex', 
'^\\s*jdbc:\\w+://([^:]+)(:\\d+)(/[^\\?]+)?(\\?\\S*)?$', '0', '0', '1', 
'数据源配置', 'jdbc');
-insert into `linkis_ps_configuration_config_key` (`key`, `description`, 
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, 
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES 
('wds.linkis.jdbc.version', '取值范围:jdbc3,jdbc4', 'jdbc版本','jdbc4', 'OFT', 
'[\"jdbc3\",\"jdbc4\"]', '0', '0', '1', '数据源配置', 'jdbc');
+insert into `linkis_ps_configuration_config_key` (`key`, `description`, 
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, 
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES 
('wds.linkis.jdbc.driver', '例如:org.apache.hive.jdbc.HiveDriver', 'jdbc连接驱动', 
'', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
+insert into `linkis_ps_configuration_config_key` (`key`, `description`, 
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, 
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES 
('wds.linkis.jdbc.version', '取值范围:jdbc3,jdbc4', 'jdbc版本','jdbc4', 'OFT', 
'[\"jdbc3\",\"jdbc4\"]', '0', '0', '1', '用户配置', 'jdbc');
 insert into `linkis_ps_configuration_config_key` (`key`, `description`, 
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, 
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES 
('wds.linkis.jdbc.username', 'username', '数据库连接用户名', '', 'None', '', '0', '0', 
'1', '用户配置', 'jdbc');
 insert into `linkis_ps_configuration_config_key` (`key`, `description`, 
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, 
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES 
('wds.linkis.jdbc.password', 'password', '数据库连接密码', '', 'None', '', '0', '0', 
'1', '用户配置', 'jdbc');
 insert into `linkis_ps_configuration_config_key` (`key`, `description`, 
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, 
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES 
('wds.linkis.jdbc.connect.max', '范围:1-20,单位:个', 'jdbc引擎最大连接数', '10', 
'NumInterval', '[1,20]', '0', '0', '1', '数据源配置', 'jdbc');
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
index f6e0cdc5f..f5fd4f0e1 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
@@ -66,6 +66,12 @@
             <artifactId>linkis-common</artifactId>
             <version>${linkis.version}</version>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-dbcp</groupId>
+                    <artifactId>commons-dbcp</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -144,6 +150,11 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>${druid.version}</version>
+        </dependency>
 
     </dependencies>
     <build>
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
index 95e2cc47b..885faa188 100644
--- 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
@@ -18,18 +18,20 @@
 package org.apache.linkis.manager.engineplugin.jdbc;
 
 import org.apache.linkis.hadoop.common.utils.KerberosUtils;
-import org.apache.linkis.manager.engineplugin.jdbc.conf.JDBCConfiguration;
 import 
org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant;
+import 
org.apache.linkis.manager.engineplugin.jdbc.exception.JDBCParamsIllegalException;
 
 import org.apache.commons.dbcp.BasicDataSource;
-import org.apache.commons.dbcp.BasicDataSourceFactory;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 
 import javax.sql.DataSource;
 
+import com.alibaba.druid.pool.DruidDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.security.PrivilegedExceptionAction;
 import java.sql.*;
 import java.util.*;
 import java.util.concurrent.Callable;
@@ -41,19 +43,19 @@ import static 
org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType.*;
 
 public class ConnectionManager {
 
-    Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionManager.class);
 
-    private final Map<String, DataSource> databaseToDataSources = new 
HashMap<String, DataSource>();
-
-    private final Map<String, String> supportedDBs = new HashMap<String, 
String>();
-    private final List<String> supportedDBNames = new ArrayList<String>();
-    private final Map<String, String> supportedDBsValidQuery = new 
HashMap<String, String>();
+    private final Map<String, DataSource> dataSourceFactories;
+    private final JDBCDataSourceConfigurations jdbcDataSourceConfigurations;
 
     private static volatile ConnectionManager connectionManager;
     private ScheduledExecutorService scheduledExecutorService;
     private Integer kinitFailCount = 0;
 
-    private ConnectionManager() {}
+    private ConnectionManager() {
+        jdbcDataSourceConfigurations = new JDBCDataSourceConfigurations();
+        dataSourceFactories = new HashMap<>();
+    }
 
     public static ConnectionManager getInstance() {
         if (connectionManager == null) {
@@ -66,192 +68,297 @@ public class ConnectionManager {
         return connectionManager;
     }
 
-    {
-        String supportedDBString = 
JDBCConfiguration.JDBC_SUPPORT_DBS().getValue();
-        String[] supportedDBs = supportedDBString.split(",");
-        for (String supportedDB : supportedDBs) {
-            String[] supportedDBInfo = supportedDB.split("=>");
-            if (supportedDBInfo.length != 2) {
-                throw new IllegalArgumentException("Illegal driver info " + 
supportedDB);
-            }
-            try {
-                Class.forName(supportedDBInfo[1]);
-            } catch (ClassNotFoundException e) {
-                logger.info("Load " + supportedDBInfo[0] + " driver failed", 
e);
-            }
-            supportedDBNames.add(supportedDBInfo[0]);
-            this.supportedDBs.put(supportedDBInfo[0], supportedDBInfo[1]);
+    public void initTaskStatementMap() {
+        try {
+            jdbcDataSourceConfigurations.initTaskIdStatementMap();
+        } catch (Exception e) {
+            LOG.error("Error while closing taskIdStatementMap statement...", 
e);
         }
     }
 
-    {
-        String supportedDBValidQueryString =
-                
JDBCConfiguration.JDBC_SUPPORT_DBS_VALIDATION_QUERY().getValue();
-        String[] supportedDBsValidQuery = 
supportedDBValidQueryString.split(",");
-        for (String supportedDBValidQuery : supportedDBsValidQuery) {
-            String[] supportedDBValidQueryInfo = 
supportedDBValidQuery.split("=>");
-            if (supportedDBValidQueryInfo.length != 2) {
-                throw new IllegalArgumentException(
-                        "Illegal validation query info " + 
supportedDBValidQuery);
-            }
-            this.supportedDBsValidQuery.put(
-                    supportedDBValidQueryInfo[0], 
supportedDBValidQueryInfo[1]);
-        }
+    public void saveStatement(String taskId, Statement statement) {
+        jdbcDataSourceConfigurations.saveStatement(taskId, statement);
     }
 
-    private void validateURL(String url) {
-        if (StringUtils.isEmpty(url)) {
-            throw new NullPointerException(JDBCEngineConnConstant.JDBC_URL + " 
cannot be null.");
+    public void removeStatement(String taskId) {
+        jdbcDataSourceConfigurations.removeStatement(taskId);
+    }
+
+    public void cancelStatement(String taskId) {
+        try {
+            jdbcDataSourceConfigurations.cancelStatement(taskId);
+        } catch (SQLException e) {
+            LOG.error("Error while cancelling task is {} ...", taskId, e);
         }
-        if (!url.matches("jdbc:\\w+://\\S+:[0-9]{2,6}(/\\S*)?") && 
!url.startsWith("jdbc:h2")) {
-            throw new IllegalArgumentException("Unknown the jdbc url: " + url);
+    }
+
+    public void close() {
+        try {
+            initTaskStatementMap();
+        } catch (Exception e) {
+            LOG.error("Error while closing...", e);
         }
-        for (String supportedDBName : supportedDBNames) {
-            if (url.indexOf(supportedDBName) > 0) {
-                return;
+        for (DataSource dataSource : this.dataSourceFactories.values()) {
+            try {
+                ((BasicDataSource) dataSource).close();
+            } catch (SQLException e) {
+                LOG.error("Error while closing datasource...", e);
             }
         }
-        throw new IllegalArgumentException(
-                "Illegal url or not supported url type (url: " + url + ").");
     }
 
-    private String getRealURL(String url) {
-        int index = url.indexOf("?");
-        if (index < 0) {
-            index = url.length();
+    protected DataSource buildDataSource(String dbUrl, Map<String, String> 
properties)
+            throws JDBCParamsIllegalException {
+        String driverClassName =
+                JDBCPropertiesParser.getString(properties, 
JDBCEngineConnConstant.JDBC_DRIVER, "");
+
+        if (StringUtils.isBlank(driverClassName)) {
+            LOG.error("The driver class name is not empty.");
+            throw new JDBCParamsIllegalException("The driver class name is not 
empty.");
         }
-        return url.substring(0, index);
-    }
 
-    protected DataSource createDataSources(Map<String, String> properties) 
throws SQLException {
-        String url = getJdbcUrl(properties);
-        String username = 
properties.getOrDefault(JDBCEngineConnConstant.JDBC_USERNAME, "");
+        String username =
+                JDBCPropertiesParser.getString(
+                        properties, JDBCEngineConnConstant.JDBC_USERNAME, "");
         String password =
-                
StringUtils.trim(properties.getOrDefault(JDBCEngineConnConstant.JDBC_PASSWORD, 
""));
-        int index = url.indexOf(":") + 1;
-        String dbType = url.substring(index, url.indexOf(":", index));
-        Properties props = new Properties();
-        props.put("driverClassName", supportedDBs.get(dbType));
-        props.put("url", url);
-        props.put("maxIdle", 5);
-        props.put("minIdle", 0);
-        props.put("maxActive", 20);
-        props.put("initialSize", 1);
-        props.put("testOnBorrow", false);
-        props.put("testWhileIdle", true);
-        props.put("validationQuery", this.supportedDBsValidQuery.get(dbType));
-
-        if (isKerberosAuthType(properties)) {
-            String jdbcProxyUser = 
properties.get(JDBCEngineConnConstant.JDBC_PROXY_USER);
-            // need proxy user
-            String proxyUserProperty =
-                    
properties.get(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY);
-            if (StringUtils.isNotBlank(proxyUserProperty)) {
-                url = url.concat(";").concat(proxyUserProperty + "=" + 
jdbcProxyUser);
-                props.put("url", url);
-                logger.info(
-                        String.format(
-                                "Try to Create a new %s JDBC DBCP with 
url(%s), kerberos, proxyUser(%s).",
-                                dbType, url, jdbcProxyUser));
-            } else {
-                logger.info(
-                        String.format(
-                                "Try to Create a new %s JDBC DBCP with 
url(%s), kerberos.",
-                                dbType, url));
-            }
+                JDBCPropertiesParser.getString(
+                        properties, JDBCEngineConnConstant.JDBC_PASSWORD, "");
+        JdbcAuthType jdbcAuthType = getJdbcAuthType(properties);
+        switch (jdbcAuthType) {
+            case USERNAME:
+                if (StringUtils.isBlank(username)) {
+                    throw new JDBCParamsIllegalException("The jdbc username is 
not empty.");
+                }
+                if (StringUtils.isBlank(password)) {
+                    throw new JDBCParamsIllegalException("The jdbc password is 
not empty.");
+                }
+                break;
+            case SIMPLE:
+                LOG.info("The jdbc auth type is simple.");
+                break;
+            case KERBEROS:
+                LOG.info("The jdbc auth type is kerberos.");
+                break;
+            default:
+                throw new JDBCParamsIllegalException(
+                        "Unsupported jdbc authentication types " + 
jdbcAuthType.getAuthType());
         }
 
-        if (isUsernameAuthType(properties)) {
-            logger.info(
-                    String.format(
-                            "Try to Create a new %s JDBC DBCP with url(%s), 
username(%s), password(%s).",
-                            dbType, url, username, password));
-            props.put("username", username);
-            props.put("password", password);
-        }
-        BasicDataSource dataSource;
-        try {
-            dataSource = (BasicDataSource) 
BasicDataSourceFactory.createDataSource(props);
-        } catch (Exception e) {
-            throw new SQLException(e);
+        boolean testOnBorrow =
+                JDBCPropertiesParser.getBool(
+                        properties, 
JDBCEngineConnConstant.JDBC_POOL_TEST_ON_BORROW, false);
+        boolean testOnReturn =
+                JDBCPropertiesParser.getBool(
+                        properties, 
JDBCEngineConnConstant.JDBC_POOL_TEST_ON_RETURN, false);
+        boolean testWhileIdle =
+                JDBCPropertiesParser.getBool(
+                        properties, 
JDBCEngineConnConstant.JDBC_POOL_TEST_WHILE_IDLE, true);
+        int minEvictableIdleTimeMillis =
+                JDBCPropertiesParser.getInt(
+                        properties,
+                        
JDBCEngineConnConstant.JDBC_POOL_TIME_BETWEEN_MIN_EVIC_IDLE_MS,
+                        300000);
+        long timeBetweenEvictionRunsMillis =
+                JDBCPropertiesParser.getLong(
+                        properties,
+                        
JDBCEngineConnConstant.JDBC_POOL_TIME_BETWEEN_EVIC_RUNS_MS,
+                        60000);
+
+        long maxWait =
+                JDBCPropertiesParser.getLong(
+                        properties, JDBCEngineConnConstant.JDBC_POOL_MAX_WAIT, 
6000);
+        int maxActive =
+                JDBCPropertiesParser.getInt(
+                        properties, 
JDBCEngineConnConstant.JDBC_POOL_MAX_ACTIVE, 20);
+        int minIdle =
+                JDBCPropertiesParser.getInt(
+                        properties, JDBCEngineConnConstant.JDBC_POOL_MIN_IDLE, 
1);
+        int initialSize =
+                JDBCPropertiesParser.getInt(
+                        properties, 
JDBCEngineConnConstant.JDBC_POOL_INIT_SIZE, 1);
+        String validationQuery =
+                JDBCPropertiesParser.getString(
+                        properties,
+                        JDBCEngineConnConstant.JDBC_POOL_VALIDATION_QUERY,
+                        
JDBCEngineConnConstant.JDBC_POOL_DEFAULT_VALIDATION_QUERY);
+
+        boolean poolPreparedStatements =
+                JDBCPropertiesParser.getBool(
+                        properties, 
JDBCEngineConnConstant.JDBC_POOL_PREPARED_STATEMENTS, true);
+        boolean removeAbandoned =
+                JDBCPropertiesParser.getBool(
+                        properties,
+                        
JDBCEngineConnConstant.JDBC_POOL_REMOVE_ABANDONED_ENABLED,
+                        true);
+        int removeAbandonedTimeout =
+                JDBCPropertiesParser.getInt(
+                        properties, 
JDBCEngineConnConstant.JDBC_POOL_REMOVE_ABANDONED_TIMEOUT, 300);
+
+        DruidDataSource datasource = new DruidDataSource();
+        LOG.info("Database connection address information(数据库连接地址信息)=" + 
dbUrl);
+        datasource.setUrl(dbUrl);
+        datasource.setUsername(username);
+        datasource.setPassword(password);
+        datasource.setDriverClassName(driverClassName);
+        datasource.setInitialSize(initialSize);
+        datasource.setMinIdle(minIdle);
+        datasource.setMaxActive(maxActive);
+        datasource.setMaxWait(maxWait);
+        
datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
+        datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
+        datasource.setValidationQuery(validationQuery);
+        datasource.setTestWhileIdle(testWhileIdle);
+        datasource.setTestOnBorrow(testOnBorrow);
+        datasource.setTestOnReturn(testOnReturn);
+        datasource.setPoolPreparedStatements(poolPreparedStatements);
+        datasource.setRemoveAbandoned(removeAbandoned);
+        datasource.setRemoveAbandonedTimeout(removeAbandonedTimeout);
+        return datasource;
+    }
+
+    private Connection getConnectionFromDataSource(
+            String dataSourceName, String url, Map<String, String> prop)
+            throws SQLException, JDBCParamsIllegalException {
+        DataSource dataSource = dataSourceFactories.get(dataSourceName);
+        if (dataSource == null) {
+            synchronized (dataSourceFactories) {
+                if (dataSource == null) {
+                    dataSource = buildDataSource(url, prop);
+                    dataSourceFactories.put(dataSourceName, dataSource);
+                }
+            }
         }
-        return dataSource;
+        return dataSource.getConnection();
     }
 
-    public Connection getConnection(Map<String, String> properties) throws 
SQLException {
-        String url = getJdbcUrl(properties);
-        logger.info("The jdbc url is: {}", url);
-        JdbcAuthType jdbcAuthType = getJdbcAuthType(properties);
+    public Connection getConnection(String dataSourceName, Map<String, String> 
propperties)
+            throws SQLException, JDBCParamsIllegalException {
+        String execUser =
+                JDBCPropertiesParser.getString(
+                        propperties, 
JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, "");
+        if (StringUtils.isBlank(execUser)) {
+            LOG.warn("No such execUser: {}", execUser);
+            throw new JDBCParamsIllegalException("No execUser");
+        }
         Connection connection = null;
+        final String jdbcUrl = getJdbcUrl(propperties);
+        JdbcAuthType jdbcAuthType = getJdbcAuthType(propperties);
         switch (jdbcAuthType) {
             case SIMPLE:
-                connection = getConnection(url, properties);
+            case USERNAME:
+                connection = getConnectionFromDataSource(dataSourceName, 
jdbcUrl, propperties);
                 break;
             case KERBEROS:
+                LOG.debug(
+                        "Calling createKerberosSecureConfiguration(); this 
will do loginUserFromKeytab() if required");
                 final String keytab =
-                        properties.get(
-                                
JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION);
+                        JDBCPropertiesParser.getString(
+                                propperties,
+                                
JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION,
+                                "");
                 final String principal =
-                        
properties.get(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL);
+                        JDBCPropertiesParser.getString(
+                                propperties,
+                                
JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL,
+                                "");
                 KerberosUtils.createKerberosSecureConfiguration(keytab, 
principal);
-                connection = getConnection(url, properties);
-                break;
-            case USERNAME:
-                if 
(StringUtils.isEmpty(properties.get(JDBCEngineConnConstant.JDBC_USERNAME))) {
-                    throw new 
SQLException(JDBCEngineConnConstant.JDBC_USERNAME + " is not empty.");
-                }
-                if 
(StringUtils.isEmpty(properties.get(JDBCEngineConnConstant.JDBC_PASSWORD))) {
-                    throw new 
SQLException(JDBCEngineConnConstant.JDBC_PASSWORD + " is not empty.");
+                LOG.debug("createKerberosSecureConfiguration() returned");
+                boolean isProxyEnabled =
+                        JDBCPropertiesParser.getBool(
+                                propperties,
+                                
JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_PROXY_ENABLE,
+                                true);
+
+                if (isProxyEnabled) {
+                    final String jdbcUrlWithProxyUser =
+                            appendProxyUserToJDBCUrl(jdbcUrl, execUser, 
propperties);
+                    LOG.info(
+                            String.format(
+                                    "Try to Create a new %s JDBC with url(%s), 
kerberos, proxyUser(%s).",
+                                    dataSourceName, jdbcUrlWithProxyUser, 
execUser));
+                    connection =
+                            getConnectionFromDataSource(
+                                    dataSourceName, jdbcUrlWithProxyUser, 
propperties);
+                } else {
+                    UserGroupInformation ugi;
+                    try {
+                        ugi =
+                                UserGroupInformation.createProxyUser(
+                                        execUser, 
UserGroupInformation.getCurrentUser());
+                    } catch (Exception e) {
+                        LOG.error("Error in getCurrentUser", e);
+                        throw new JDBCParamsIllegalException("Error in 
getCurrentUser");
+                    }
+
+                    try {
+                        connection =
+                                ugi.doAs(
+                                        (PrivilegedExceptionAction<Connection>)
+                                                () ->
+                                                        
getConnectionFromDataSource(
+                                                                dataSourceName,
+                                                                jdbcUrl,
+                                                                propperties));
+                    } catch (Exception e) {
+                        throw new JDBCParamsIllegalException(
+                                "Error in doAs to get one connection.");
+                    }
                 }
-                connection = getConnection(url, properties);
                 break;
             default:
-                break;
+                throw new JDBCParamsIllegalException(
+                        "Unsupported jdbc authentication types " + 
jdbcAuthType.getAuthType());
         }
         return connection;
     }
 
-    public void close() {
-        for (DataSource dataSource : this.databaseToDataSources.values()) {
-            try {
-                // DataSources.destroy(dataSource);
-                ((BasicDataSource) dataSource).close();
-            } catch (SQLException e) {
-            }
+    private String getJdbcUrl(Map<String, String> properties) throws 
SQLException {
+        String url = properties.get(JDBCEngineConnConstant.JDBC_URL);
+        if (StringUtils.isBlank(url)) {
+            throw new SQLException(JDBCEngineConnConstant.JDBC_URL + " is not 
empty.");
         }
+        url = clearJDBCUrl(url);
+        validateJDBCUrl(url);
+        return url.trim();
     }
 
-    private Connection getConnection(String url, Map<String, String> 
properties)
-            throws SQLException {
-        String key = getRealURL(url);
-        DataSource dataSource = databaseToDataSources.get(key);
-        if (dataSource == null) {
-            synchronized (databaseToDataSources) {
-                if (dataSource == null) {
-                    dataSource = createDataSources(properties);
-                    databaseToDataSources.put(key, dataSource);
-                }
-            }
+    private String clearJDBCUrl(String url) {
+        if (url.startsWith("\"") && url.endsWith("\"")) {
+            url = url.trim();
+            return url.substring(1, url.length() - 1);
         }
-        return dataSource.getConnection();
+        return url;
     }
 
-    private String getJdbcUrl(Map<String, String> properties) throws 
SQLException {
-        String url = properties.get(JDBCEngineConnConstant.JDBC_URL);
+    private void validateJDBCUrl(String url) {
         if (StringUtils.isEmpty(url)) {
-            throw new SQLException(JDBCEngineConnConstant.JDBC_URL + " is not 
empty.");
+            throw new NullPointerException(JDBCEngineConnConstant.JDBC_URL + " 
cannot be null.");
+        }
+        if (!url.matches("jdbc:\\w+://\\S+:[0-9]{2,6}(/\\S*)?") && 
!url.startsWith("jdbc:h2")) {
+            throw new IllegalArgumentException("JDBC url format error!" + url);
         }
-        url = clearUrl(url);
-        validateURL(url);
-        return url.trim();
     }
 
-    private boolean isUsernameAuthType(Map<String, String> properties) {
-        return USERNAME == getJdbcAuthType(properties);
-    }
+    private String appendProxyUserToJDBCUrl(
+            String jdbcUrl, String execUser, Map<String, String> properties) {
+        StringBuilder jdbcUrlSb = new StringBuilder(jdbcUrl);
+        String proxyUserProperty =
+                JDBCPropertiesParser.getString(
+                        properties, 
JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY, "");
+        if (execUser != null
+                && 
!JDBCEngineConnConstant.JDBC_PROXY_ANONYMOUS_USER.equals(execUser)
+                && StringUtils.isNotBlank(proxyUserProperty)) {
+
+            int lastIndexOfUrl = jdbcUrl.indexOf("?");
+            if (lastIndexOfUrl == -1) {
+                lastIndexOfUrl = jdbcUrl.length();
+            }
+            LOG.info("Using proxy user as: {}", execUser);
+            LOG.info("Using proxy property for user as: {}", 
proxyUserProperty);
+            jdbcUrlSb.insert(lastIndexOfUrl, ";" + proxyUserProperty + "=" + 
execUser + ";");
+        }
 
-    private boolean isKerberosAuthType(Map<String, String> properties) {
-        return KERBEROS == getJdbcAuthType(properties);
+        return jdbcUrlSb.toString();
     }
 
     private JdbcAuthType getJdbcAuthType(Map<String, String> properties) {
@@ -269,9 +376,9 @@ public class ConnectionManager {
                     @Override
                     public Object call() throws Exception {
                         if (KerberosUtils.runRefreshKerberosLogin()) {
-                            logger.info("Ran runRefreshKerberosLogin command 
successfully.");
+                            LOG.info("Ran runRefreshKerberosLogin command 
successfully.");
                             kinitFailCount = 0;
-                            logger.info(
+                            LOG.info(
                                     "Scheduling Kerberos ticket refresh thread 
with interval {} ms",
                                     
KerberosUtils.getKerberosRefreshInterval());
                             scheduledExecutorService.schedule(
@@ -280,13 +387,12 @@ public class ConnectionManager {
                                     TimeUnit.MILLISECONDS);
                         } else {
                             kinitFailCount++;
-                            logger.info(
+                            LOG.info(
                                     "runRefreshKerberosLogin failed for {} 
time(s).",
                                     kinitFailCount);
                             if (kinitFailCount >= 
KerberosUtils.kinitFailTimesThreshold()) {
-                                logger.error(
+                                LOG.error(
                                         "runRefreshKerberosLogin failed for 
max attempts, calling close executor.");
-                                // close();
                             } else {
                                 // wait for 1 second before calling 
runRefreshKerberosLogin() again
                                 scheduledExecutorService.schedule(this, 1, 
TimeUnit.SECONDS);
@@ -303,12 +409,4 @@ public class ConnectionManager {
             scheduledExecutorService.shutdown();
         }
     }
-
-    private String clearUrl(String url) {
-        if (url.startsWith("\"") && url.endsWith("\"")) {
-            url = url.trim();
-            return url.substring(1, url.length() - 1);
-        }
-        return url;
-    }
 }
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
new file mode 100644
index 000000000..8b922f2dc
--- /dev/null
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class JDBCDataSourceConfigurations {
+    private final Map<String, Statement> taskIdStatementMap;
+
+    public JDBCDataSourceConfigurations() {
+        taskIdStatementMap = new ConcurrentHashMap<>();
+    }
+
+    public void initTaskIdStatementMap() throws SQLException {
+        for (Statement statement : taskIdStatementMap.values()) {
+            if (statement != null && !statement.isClosed()) {
+                statement.close();
+            }
+        }
+        taskIdStatementMap.clear();
+    }
+
+    public void saveStatement(String taskId, Statement statement) {
+        taskIdStatementMap.put(taskId, statement);
+    }
+
+    public void cancelStatement(String taskId) throws SQLException {
+        taskIdStatementMap.get(taskId).cancel();
+    }
+
+    public void removeStatement(String taskId) {
+        taskIdStatementMap.remove(taskId);
+    }
+}
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCPropertiesParser.java
similarity index 50%
copy from 
linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
copy to 
linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCPropertiesParser.java
index dae6edaac..4885baed3 100644
--- 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCPropertiesParser.java
@@ -15,16 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.jdbc.constant;
+package org.apache.linkis.manager.engineplugin.jdbc;
 
-public class JDBCEngineConnConstant {
-    public static final String JDBC_URL = "wds.linkis.jdbc.connect.url";
-    public static final String JDBC_USERNAME = "wds.linkis.jdbc.username";
-    public static final String JDBC_PASSWORD = "wds.linkis.jdbc.password";
-    public static final String JDBC_AUTH_TYPE = "wds.linkis.jdbc.auth.type";
-    public static final String JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL = 
"wds.linkis.jdbc.principal";
-    public static final String JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION =
-            "wds.linkis.jdbc.keytab.location";
-    public static final String JDBC_PROXY_USER_PROPERTY = 
"wds.linkis.jdbc.proxy.user.property";
-    public static final String JDBC_PROXY_USER = "wds.linkis.jdbc.proxy.user";
+import java.util.Map;
+
+public class JDBCPropertiesParser extends PropertiesParser {
+    public static long getLong(Map<String, String> prop, String key, long 
defaultValue) {
+        return getValue(prop, key, defaultValue, Long::parseLong);
+    }
+
+    public static int getInt(Map<String, String> prop, String key, int 
defaultValue) {
+        return getValue(prop, key, defaultValue, Integer::parseInt);
+    }
+
+    public static boolean getBool(Map<String, String> prop, String key, 
boolean defaultValue) {
+        return getValue(prop, key, defaultValue, "true"::equalsIgnoreCase);
+    }
 }
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
new file mode 100644
index 000000000..5bc15dcab
--- /dev/null
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Map;
+
+public abstract class PropertiesParser {
+    interface TypeConversion<T> {
+        T convertTo(String oriV);
+    }
+
+    public static String getString(Map<String, String> prop, String key, 
String defaultValue) {
+        return prop.getOrDefault(key, defaultValue);
+    }
+
+    public static <T> T getValue(
+            Map<String, String> prop,
+            String key,
+            T defaultValue,
+            TypeConversion<T> typeConversion) {
+        String valueStr = getString(prop, key, "");
+        if (StringUtils.isBlank(valueStr)) {
+            return defaultValue;
+        }
+        try {
+            return typeConversion.convertTo(valueStr);
+        } catch (Exception e) {
+            return defaultValue;
+        }
+    }
+}
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
index dae6edaac..0ccc4b49e 100644
--- 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
@@ -18,13 +18,44 @@
 package org.apache.linkis.manager.engineplugin.jdbc.constant;
 
 public class JDBCEngineConnConstant {
+    private JDBCEngineConnConstant() {}
+
+    public static final String JDBC_DEFAULT_DATASOURCE_TAG = "jdbc";
+    public static final String JDBC_PROXY_ANONYMOUS_USER = "anonymous";
     public static final String JDBC_URL = "wds.linkis.jdbc.connect.url";
+    public static final String JDBC_DRIVER = "wds.linkis.jdbc.driver";
     public static final String JDBC_USERNAME = "wds.linkis.jdbc.username";
     public static final String JDBC_PASSWORD = "wds.linkis.jdbc.password";
     public static final String JDBC_AUTH_TYPE = "wds.linkis.jdbc.auth.type";
     public static final String JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL = 
"wds.linkis.jdbc.principal";
     public static final String JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION =
             "wds.linkis.jdbc.keytab.location";
+    public static final String JDBC_KERBEROS_AUTH_PROXY_ENABLE =
+            "wds.linkis.jdbc.auth.kerberos.proxy.enable";
     public static final String JDBC_PROXY_USER_PROPERTY = 
"wds.linkis.jdbc.proxy.user.property";
     public static final String JDBC_PROXY_USER = "wds.linkis.jdbc.proxy.user";
+    public static final String JDBC_SCRIPTS_EXEC_USER = "execUser";
+    public static final String JDBC_ENGINE_RUN_TIME_DS = 
"wds.linkis.engine.runtime.datasource";
+
+    public static final String JDBC_POOL_TEST_ON_BORROW = 
"wds.linkis.jdbc.pool.testOnBorrow";
+    public static final String JDBC_POOL_TEST_ON_RETURN = 
"wds.linkis.jdbc.pool.testOnReturn";
+    public static final String JDBC_POOL_TEST_WHILE_IDLE = 
"wds.linkis.jdbc.pool.testWhileIdle";
+    public static final String JDBC_POOL_VALIDATION_QUERY = 
"wds.linkis.jdbc.pool.validationQuery";
+    public static final String JDBC_POOL_DEFAULT_VALIDATION_QUERY = "SELECT 1";
+    public static final String JDBC_POOL_TIME_BETWEEN_MIN_EVIC_IDLE_MS =
+            "wds.linkis.jdbc.pool.minEvictableIdleTimeMillis";
+    public static final String JDBC_POOL_TIME_BETWEEN_EVIC_RUNS_MS =
+            "wds.linkis.jdbc.pool.timeBetweenEvictionRunsMillis";
+    public static final String JDBC_POOL_MAX_WAIT = 
"wds.linkis.jdbc.pool.maxWaitMillis";
+    public static final String JDBC_POOL_MAX_ACTIVE = 
"wds.linkis.jdbc.pool.maxActive";
+    public static final String JDBC_POOL_INIT_SIZE = 
"wds.linkis.jdbc.pool.initialSize";
+    public static final String JDBC_POOL_MIN_IDLE = 
"wds.linkis.jdbc.pool.minIdle";
+    public static final String JDBC_POOL_PREPARED_STATEMENTS =
+            "wds.linkis.jdbc.pool.poolPreparedStatements";
+    public static final String JDBC_POOL_REMOVE_ABANDONED_ENABLED =
+            "wds.linkis.jdbc.pool.remove.abandoned.enabled";
+    public static final String JDBC_POOL_REMOVE_ABANDONED_TIMEOUT =
+            "wds.linkis.jdbc.pool.remove.abandoned.timeout";
+
+    public static final String JDBC_ENGINE_MEMORY_UNIT = "g";
 }
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/conf/JDBCConfiguration.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/conf/JDBCConfiguration.scala
index 8a7411def..3d896a019 100644
--- 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/conf/JDBCConfiguration.scala
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/conf/JDBCConfiguration.scala
@@ -27,10 +27,6 @@ object JDBCConfiguration {
 
   val JDBC_QUERY_TIMEOUT = CommonVars("wds.linkis.jdbc.query.timeout", 1800)
 
-  val JDBC_SUPPORT_DBS = CommonVars("wds.linkis.jdbc.support.dbs", 
"h2=>org.h2.Driver,mysql=>com.mysql.jdbc.Driver,postgresql=>org.postgresql.Driver,oracle=>oracle.jdbc.driver.OracleDriver,hive2=>org.apache.hive.jdbc.HiveDriver,presto=>com.facebook.presto.jdbc.PrestoDriver,clickhouse=>ru.yandex.clickhouse.ClickHouseDriver")
-
-  val JDBC_SUPPORT_DBS_VALIDATION_QUERY = 
CommonVars("wds.linkis.jdbc.support.dbs.validation.query", "h2=>SELECT 
1,mysql=>SELECT 1,postgresql=>SELECT 1,oracle=>SELECT 1 FROM dual,hive2=>SELECT 
1,presto=>SELECT 1")
-
   val JDBC_CONCURRENT_LIMIT = 
CommonVars[Int]("wds.linkis.engineconn.jdbc.concurrent.limit", 100)
 
   val JDBC_KERBEROS_ENABLE = CommonVars[Boolean]("wds.linkis.keytab.enable", 
false)
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
index 51ad30a61..3bb05236a 100644
--- 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 package org.apache.linkis.manager.engineplugin.jdbc.executer
 
-import java.sql.{Connection, Statement}
+import java.sql.{Connection, ResultSet, SQLException, Statement}
 import java.util
 import org.apache.linkis.common.utils.{OverloadUtils, Utils}
 import 
org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor,
 EngineExecutionContext}
@@ -34,6 +34,7 @@ import org.apache.linkis.storage.domain.{Column, DataType}
 import org.apache.linkis.storage.resultset.ResultSetFactory
 import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
 import org.apache.commons.io.IOUtils
+import org.apache.commons.lang.StringUtils
 import org.apache.linkis.common.conf.Configuration
 import 
org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig, 
ResponseQueryConfig}
 import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, 
UserCreatorLabel}
@@ -49,13 +50,11 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: 
Int, val id: Int) ex
 
 
   private val connectionManager = ConnectionManager.getInstance()
-
-  private var statement: Statement = null
-
   private val executorLabels: util.List[Label[_]] = new 
util.ArrayList[Label[_]](2)
-  private var connection: Connection = null
+
 
   override def init(): Unit = {
+    info("jdbc executor start init.")
     setCodeParser(new SQLCodeParser)
     super.init()
     if (JDBCConfiguration.JDBC_KERBEROS_ENABLE.getValue) {
@@ -64,111 +63,151 @@ class JDBCEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int) ex
   }
 
   override def executeLine(engineExecutorContext: EngineExecutionContext, 
code: String): ExecuteResponse = {
+    val execSqlUser = getExecSqlUser(engineExecutorContext)
     val realCode = code.trim()
+    val taskId = engineExecutorContext.getJobId.get
     val properties = 
engineExecutorContext.getProperties.asInstanceOf[util.Map[String, String]]
+    var dataSourceName = 
properties.getOrDefault(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS, "")
 
     if (properties.get(JDBCEngineConnConstant.JDBC_URL) == null) {
-      info(s"jdbc url is empty, adding now...")
+      info(s"The jdbc url is empty, adding now...")
       val globalConfig = 
Utils.tryAndWarn(JDBCEngineConfig.getCacheMap(engineExecutorContext.getLabels))
+      if (StringUtils.isNotBlank(dataSourceName)) {
+        info("Start getting data source connection parameters from the data 
source hub.")
+        // todo get data source info by data source client
+      }
       properties.put(JDBCEngineConnConstant.JDBC_URL, 
globalConfig.get(JDBCEngineConnConstant.JDBC_URL))
+      properties.put(JDBCEngineConnConstant.JDBC_DRIVER, 
globalConfig.get(JDBCEngineConnConstant.JDBC_DRIVER))
       properties.put(JDBCEngineConnConstant.JDBC_USERNAME, 
globalConfig.get(JDBCEngineConnConstant.JDBC_USERNAME))
       properties.put(JDBCEngineConnConstant.JDBC_PASSWORD, 
globalConfig.get(JDBCEngineConnConstant.JDBC_PASSWORD))
+      properties.put(JDBCEngineConnConstant.JDBC_POOL_VALIDATION_QUERY, 
globalConfig.getOrDefault(JDBCEngineConnConstant.JDBC_POOL_VALIDATION_QUERY, 
JDBCEngineConnConstant.JDBC_POOL_DEFAULT_VALIDATION_QUERY))
       properties.put(JDBCEngineConnConstant.JDBC_AUTH_TYPE, 
globalConfig.get(JDBCEngineConnConstant.JDBC_AUTH_TYPE))
       properties.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL, 
globalConfig.get(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL))
       
properties.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION, 
globalConfig.get(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION))
       properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY, 
globalConfig.getOrDefault(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY, ""))
-      properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER, 
globalConfig.getOrDefault(JDBCEngineConnConstant.JDBC_PROXY_USER, 
EngineConnObject.getEngineCreationContext.getUser))
+      properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER, 
globalConfig.getOrDefault(JDBCEngineConnConstant.JDBC_PROXY_USER, execSqlUser))
+      properties.put(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, 
execSqlUser)
+    }
+    if (StringUtils.isBlank(dataSourceName)) {
+      dataSourceName = JDBCEngineConnConstant.JDBC_DEFAULT_DATASOURCE_TAG;
     }
+    info(s"The data source name is [$dataSourceName], and the jdbc client 
begins to run jdbc code:\n ${realCode.trim}")
+    var connection: Connection = null
+    var statement: Statement = null
+    var resultSet: ResultSet = null
 
-    info(s"jdbc client begins to run jdbc code:\n ${realCode.trim}")
-    connection = connectionManager.getConnection(properties)
-    statement = connection.createStatement()
-    info(s"create statement is:  $statement")
-    val isResultSetAvailable = statement.execute(code)
-    info(s"Is ResultSet available ? : $isResultSetAvailable")
-    if (isResultSetAvailable) {
-      info("ResultSet is available")
-      val JDBCResultSet = statement.getResultSet
-      if (isDDLCommand(statement.getUpdateCount(), 
JDBCResultSet.getMetaData().getColumnCount)) {
-        info(s"current result is a ResultSet Object , but there are no more 
results :${code} ")
-        Utils.tryQuietly {
-          JDBCResultSet.close()
-          statement.close()
-          connection.close()
+    Utils.tryCatch({
+      connection = connectionManager.getConnection(dataSourceName, properties)
+    }) {
+      case e: Exception => return ErrorExecuteResponse("created data source 
connection error.", e)
+    }
+
+    try {
+      statement.setQueryTimeout(JDBCConfiguration.JDBC_QUERY_TIMEOUT.getValue)
+      statement = connection.createStatement()
+      statement.setFetchSize(outputPrintLimit)
+      statement.setMaxRows(outputPrintLimit)
+      info(s"create statement is:  $statement")
+      connectionManager.saveStatement(taskId, statement)
+      val isResultSetAvailable = statement.execute(code)
+      info(s"Is ResultSet available ? : $isResultSetAvailable")
+      try {
+        if (isResultSetAvailable) {
+          info("ResultSet is available")
+          resultSet = statement.getResultSet
+          return getExecResultSetOutput(engineExecutorContext, statement, 
resultSet)
+        } else {
+          val updateCount = statement.getUpdateCount
+          info(s"only return affect rows : $updateCount")
+          engineExecutorContext.appendStdout(s"only return affect rows : 
$updateCount")
+          return SuccessExecuteResponse()
         }
-        SuccessExecuteResponse()
-      } else {
-        val md = JDBCResultSet.getMetaData
-        val metaArrayBuffer = new ArrayBuffer[Tuple2[String, String]]()
-        for (i <- 1 to md.getColumnCount) {
-          metaArrayBuffer.add(Tuple2(md.getColumnName(i), 
JDBCHelper.getTypeStr(md.getColumnType(i))))
+      } finally {
+        if (resultSet != null) {
+          Utils.tryCatch({ resultSet.close() }) { case e: SQLException => 
warn(e.getMessage) }
         }
-        val columns = metaArrayBuffer.map { c => Column(c._1, 
DataType.toDataType(c._2), "") }.toArray[Column]
-        val metaData = new TableMetaData(columns)
-        val resultSetWriter = 
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
-        resultSetWriter.addMetaData(metaData)
-        var count = 0
-        Utils.tryCatch({
-          while (count < outputPrintLimit && JDBCResultSet.next()) {
-            val r: Array[Any] = columns.indices.map { i =>
-              val data = JDBCResultSet.getObject(i + 1) match {
-                case value: Any => JDBCResultSet.getString(i + 1)
-                case _ => null
-              }
-              data
-            }.toArray
-            resultSetWriter.addRecord(new TableRecord(r))
-            count += 1
-          }
-        }) {
-          case e: Exception => return ErrorExecuteResponse("query jdbc 
failed", e)
+        if (statement != null) {
+          Utils.tryCatch({ statement.close() }) { case e: SQLException => 
warn(e.getMessage) }
         }
-        val output = if (resultSetWriter != null) resultSetWriter.toString 
else null
-        Utils.tryQuietly {
-          JDBCResultSet.close()
-          statement.close()
+      }
+    } catch {
+      case e: Throwable => error(s"Cannot run $code", e)
+    } finally {
+      if (connection != null) {
+        try {
+          if (!connection.getAutoCommit) connection.commit()
           connection.close()
-          IOUtils.closeQuietly(resultSetWriter)
+        } catch {
+          case e: SQLException => warn("close connection error.", e)
         }
-        info("sql execute completed")
-        AliasOutputExecuteResponse(null, output)
       }
+      connectionManager.removeStatement(taskId)
+    }
+    SuccessExecuteResponse()
+  }
+
+  private def getExecResultSetOutput(engineExecutorContext: 
EngineExecutionContext, statement: Statement, resultSet: ResultSet): 
ExecuteResponse = {
+    if (isDDLCommand(statement.getUpdateCount, 
resultSet.getMetaData.getColumnCount)) {
+      info(s"current result is a ResultSet Object , but there are no more 
results!")
+      engineExecutorContext.appendStdout("Query executed successfully.")
+      SuccessExecuteResponse()
     } else {
-      info(s"only return affect rows : ${statement.getUpdateCount}")
+      val md = resultSet.getMetaData
+      val metaArrayBuffer = new ArrayBuffer[(String, String)]()
+      for (i <- 1 to md.getColumnCount) {
+        metaArrayBuffer.add(Tuple2(md.getColumnName(i), 
JDBCHelper.getTypeStr(md.getColumnType(i))))
+      }
+      val columns = metaArrayBuffer.map { c => Column(c._1, 
DataType.toDataType(c._2), "") }.toArray[Column]
+      val metaData = new TableMetaData(columns)
+      val resultSetWriter = 
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
+      resultSetWriter.addMetaData(metaData)
+      var count = 0
+      Utils.tryCatch({
+        while (count < outputPrintLimit && resultSet.next()) {
+          val r: Array[Any] = columns.indices.map { i =>
+            val data = resultSet.getObject(i + 1) match {
+              case value: Any => resultSet.getString(i + 1)
+              case _ => null
+            }
+            data
+          }.toArray
+          resultSetWriter.addRecord(new TableRecord(r))
+          count += 1
+        }
+      }) {
+        case e: Exception => return ErrorExecuteResponse("query jdbc failed", 
e)
+      }
+      val output = if (resultSetWriter != null) resultSetWriter.toString else 
null
       Utils.tryQuietly {
-        statement.close()
-        connection.close()
+        IOUtils.closeQuietly(resultSetWriter)
       }
-      SuccessExecuteResponse()
+      info("sql executed completed.")
+      AliasOutputExecuteResponse(null, output)
     }
   }
 
+  private def getExecSqlUser(engineExecutionContext: EngineExecutionContext): 
String = {
+    val userCreatorLabel = 
engineExecutionContext.getLabels.find(_.isInstanceOf[UserCreatorLabel]).get.asInstanceOf[UserCreatorLabel]
+    userCreatorLabel.getUser
+  }
+
   protected def isDDLCommand(updatedCount: Int, columnCount: Int): Boolean = {
-    if (updatedCount < 0 && columnCount <= 0) {
-      true
-    } else {
-      false
-    }
+    updatedCount < 0 && columnCount <= 0
   }
 
   override def getProgressInfo(taskID: String): Array[JobProgressInfo] = 
Array.empty[JobProgressInfo]
 
   override protected def callback(): Unit = {}
 
-  override def progress(taskID: String): Float = {
-    0
-  }
+  override def progress(taskID: String): Float = 0
 
   override def close(): Unit = {
-    if (statement != null) {
-      statement.close()
-    }
-    if (connection != null) {
-      connection.close()
-    }
+    info("Start closing the jdbc engine.")
+    connectionManager.close()
     if (JDBCConfiguration.JDBC_KERBEROS_ENABLE.getValue) {
       connectionManager.shutdownRefreshKerberosLoginService()
     }
+    info("The jdbc engine has closed successfully.")
   }
 
   override def executeCompletely(engineExecutorContext: 
EngineExecutionContext, code: String, completedLine: String): ExecuteResponse = 
null
@@ -188,8 +227,8 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: 
Int, val id: Int) ex
     val properties = EngineConnObject.getEngineCreationContext.getOptions
     if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
       val settingClientMemory = 
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
-      if (!settingClientMemory.toLowerCase().endsWith("g")) {
-        properties.put(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key, 
settingClientMemory + "g")
+      if 
(!settingClientMemory.toLowerCase().endsWith(JDBCEngineConnConstant.JDBC_ENGINE_MEMORY_UNIT))
 {
+        properties.put(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key, 
settingClientMemory + JDBCEngineConnConstant.JDBC_ENGINE_MEMORY_UNIT)
       }
     }
     val resource = new CommonNodeResource
@@ -200,12 +239,21 @@ class JDBCEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int) ex
 
   override def supportCallBackLogs(): Boolean = false
 
-  override def getId(): String = Sender.getThisServiceInstance.getInstance + 
s"_$id"
+  override def getId: String = Sender.getThisServiceInstance.getInstance + 
s"_$id"
 
   override def getConcurrentLimit: Int = 
JDBCConfiguration.JDBC_CONCURRENT_LIMIT.getValue
 
   override def killAll(): Unit = {
+    info("Killing all query task.")
+    connectionManager.initTaskStatementMap()
+    info("All query task has killed successfully.")
+  }
 
+  override def killTask(taskId: String): Unit = {
+    info(s"Killing jdbc query task $taskId")
+    connectionManager.cancelStatement(taskId)
+    super.killTask(taskId)
+    info(s"The query task $taskId has killed successfully.")
   }
 }
 
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
index 811b633ce..bb448d853 100644
--- 
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
@@ -18,10 +18,9 @@
 package org.apache.linkis.manager.engineplugin.jdbc;
 
 import 
org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant;
+import 
org.apache.linkis.manager.engineplugin.jdbc.exception.JDBCParamsIllegalException;
 
-import org.apache.commons.dbcp.BasicDataSource;
-import org.apache.commons.dbcp.BasicDataSourceFactory;
-
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 
@@ -31,25 +30,28 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 
 public class ConnectionManagerTest {
     @Test
     @DisplayName("testCreateJdbcConnAndExecSql")
-    public void testCreateJdbcConnAndExecSql() throws SQLException {
+    public void testCreateJdbcConnAndExecSql()
+            throws SQLException, JDBCParamsIllegalException, 
ClassNotFoundException {
         Map<String, String> properties = new HashMap<>(8);
         properties.put(
                 JDBCEngineConnConstant.JDBC_URL,
                 "jdbc:h2:mem:linkis_db;MODE=MySQL;DATABASE_TO_LOWER=TRUE");
+        properties.put(JDBCEngineConnConstant.JDBC_DRIVER, "org.h2.Driver");
         properties.put(JDBCEngineConnConstant.JDBC_USERNAME, "user");
         properties.put(JDBCEngineConnConstant.JDBC_PASSWORD, "password");
+        
properties.put(JDBCEngineConnConstant.JDBC_POOL_DEFAULT_VALIDATION_QUERY, 
"SELECT 1");
         properties.put(JDBCEngineConnConstant.JDBC_AUTH_TYPE, "USERNAME");
         
properties.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL, "");
         
properties.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION, 
"");
         properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY, "");
         properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER, "");
+        properties.put(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, 
"leo_jie");
         ConnectionManager connectionManager = ConnectionManager.getInstance();
-        Connection conn = connectionManager.getConnection(properties);
+        Connection conn = connectionManager.getConnection("jdbc", properties);
         Statement statement = conn.createStatement();
         ResultSet rs = statement.executeQuery("show databases;");
         while (rs.next()) {
@@ -61,30 +63,11 @@ public class ConnectionManagerTest {
     }
 
     @Test
-    @DisplayName("testExecSql")
-    public void testExecSql() throws Exception {
-        Properties properties = new Properties();
-        properties.put("driverClassName", "org.h2.Driver");
-        properties.put("url", 
"jdbc:h2:mem:linkis_db;MODE=MySQL;DATABASE_TO_LOWER=TRUE");
-        properties.put("username", "user");
-        properties.put("password", "password");
-        properties.put("maxIdle", 20);
-        properties.put("minIdle", 0);
-        properties.put("initialSize", 1);
-        properties.put("testOnBorrow", false);
-        properties.put("testWhileIdle", true);
-        properties.put("validationQuery", "select 1");
-        BasicDataSource dataSource =
-                (BasicDataSource) 
BasicDataSourceFactory.createDataSource(properties);
-        Connection conn = dataSource.getConnection();
-        Statement statement = conn.createStatement();
-        ResultSet rs = statement.executeQuery("show databases;");
-        while (rs.next()) {
-            System.out.println(rs.getObject(1));
-        }
-        rs.close();
-        statement.close();
-        conn.close();
-        dataSource.close();
+    @DisplayName("testCreateJdbcConnAndExecSql")
+    public void testJDBCPropertiesParserGetLong() {
+        Map<String, String> properties = new HashMap<>(1);
+        properties.put("key", "10");
+        long v = JDBCPropertiesParser.getLong(properties, "key", 0);
+        Assertions.assertEquals(10, v);
     }
 }
diff --git a/pom.xml b/pom.xml
index 0bb1f9595..63b56b336 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,7 +145,7 @@
         <mockito.version>3.9.0</mockito.version>
         <assertj.version>3.17.2</assertj.version>
         <h2.version>1.4.200</h2.version>
-
+        <dbcp2.version>2.0.1</dbcp2.version>
         <apache-rat-plugin.version>0.13</apache-rat-plugin.version>
         <assembly.package.rootpath>${basedir}</assembly.package.rootpath>
         <maven.compiler.source>1.8</maven.compiler.source>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to