This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
new d370d053a [feture-2141,2194] Change dbcp to druid in JDBC engine and
reservation for multiple data source implementation. (#2196)
d370d053a is described below
commit d370d053a9047a42318680bdc2fdcd652959b96f
Author: weixiao <[email protected]>
AuthorDate: Wed Jun 1 11:17:25 2022 +0800
[feture-2141,2194] Change dbcp to druid in JDBC engine and reservation for
multiple data source implementation. (#2196)
* [feture-2141,2194] Change dbcp to dbcp2 in JDBC engine and reservation
for multiple data source implementation.
* [feture-2141,2194] Change dbcp to druid in JDBC engine and reservation
for multiple data source implementation.
---
db/linkis_dml.sql | 3 +-
.../engineconn-plugins/jdbc/pom.xml | 11 +
.../engineplugin/jdbc/ConnectionManager.java | 424 +++++++++++++--------
.../jdbc/JDBCDataSourceConfigurations.java | 52 +++
...ConnConstant.java => JDBCPropertiesParser.java} | 26 +-
.../engineplugin/jdbc/PropertiesParser.java | 48 +++
.../jdbc/constant/JDBCEngineConnConstant.java | 31 ++
.../engineplugin/jdbc/conf/JDBCConfiguration.scala | 4 -
.../jdbc/executer/JDBCEngineConnExecutor.scala | 200 ++++++----
.../engineplugin/jdbc/ConnectionManagerTest.java | 45 +--
pom.xml | 2 +-
11 files changed, 559 insertions(+), 287 deletions(-)
diff --git a/db/linkis_dml.sql b/db/linkis_dml.sql
index 2c16736bc..1dda5ca8f 100644
--- a/db/linkis_dml.sql
+++ b/db/linkis_dml.sql
@@ -104,7 +104,8 @@ INSERT INTO `linkis_ps_configuration_config_key` (`key`,
`description`, `name`,
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`,
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES
('pipeline.output.shuffle.null.type', '取值范围:NULL或者BLANK', '空值替换','NULL', 'OFT',
'[\"NULL\",\"BLANK\"]', '0', '0', '1', 'pipeline引擎设置', 'pipeline');
-- jdbc
insert into `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`,
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES
('wds.linkis.jdbc.connect.url', '例如:jdbc:hive2://127.0.0.1:10000', 'jdbc连接地址',
'jdbc:hive2://127.0.0.1:10000', 'Regex',
'^\\s*jdbc:\\w+://([^:]+)(:\\d+)(/[^\\?]+)?(\\?\\S*)?$', '0', '0', '1',
'数据源配置', 'jdbc');
-insert into `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`,
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES
('wds.linkis.jdbc.version', '取值范围:jdbc3,jdbc4', 'jdbc版本','jdbc4', 'OFT',
'[\"jdbc3\",\"jdbc4\"]', '0', '0', '1', '数据源配置', 'jdbc');
+insert into `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`,
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES
('wds.linkis.jdbc.driver', '例如:org.apache.hive.jdbc.HiveDriver', 'jdbc连接驱动',
'', 'None', '', '0', '0', '1', '用户配置', 'jdbc');
+insert into `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`,
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES
('wds.linkis.jdbc.version', '取值范围:jdbc3,jdbc4', 'jdbc版本','jdbc4', 'OFT',
'[\"jdbc3\",\"jdbc4\"]', '0', '0', '1', '用户配置', 'jdbc');
insert into `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`,
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES
('wds.linkis.jdbc.username', 'username', '数据库连接用户名', '', 'None', '', '0', '0',
'1', '用户配置', 'jdbc');
insert into `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`,
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES
('wds.linkis.jdbc.password', 'password', '数据库连接密码', '', 'None', '', '0', '0',
'1', '用户配置', 'jdbc');
insert into `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`,
`is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES
('wds.linkis.jdbc.connect.max', '范围:1-20,单位:个', 'jdbc引擎最大连接数', '10',
'NumInterval', '[1,20]', '0', '0', '1', '数据源配置', 'jdbc');
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
index f6e0cdc5f..f5fd4f0e1 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
@@ -66,6 +66,12 @@
<artifactId>linkis-common</artifactId>
<version>${linkis.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-dbcp</groupId>
+ <artifactId>commons-dbcp</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -144,6 +150,11 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>druid</artifactId>
+ <version>${druid.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
index 95e2cc47b..885faa188 100644
---
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
+++
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
@@ -18,18 +18,20 @@
package org.apache.linkis.manager.engineplugin.jdbc;
import org.apache.linkis.hadoop.common.utils.KerberosUtils;
-import org.apache.linkis.manager.engineplugin.jdbc.conf.JDBCConfiguration;
import
org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant;
+import
org.apache.linkis.manager.engineplugin.jdbc.exception.JDBCParamsIllegalException;
import org.apache.commons.dbcp.BasicDataSource;
-import org.apache.commons.dbcp.BasicDataSourceFactory;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import javax.sql.DataSource;
+import com.alibaba.druid.pool.DruidDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.security.PrivilegedExceptionAction;
import java.sql.*;
import java.util.*;
import java.util.concurrent.Callable;
@@ -41,19 +43,19 @@ import static
org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType.*;
public class ConnectionManager {
- Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ConnectionManager.class);
- private final Map<String, DataSource> databaseToDataSources = new
HashMap<String, DataSource>();
-
- private final Map<String, String> supportedDBs = new HashMap<String,
String>();
- private final List<String> supportedDBNames = new ArrayList<String>();
- private final Map<String, String> supportedDBsValidQuery = new
HashMap<String, String>();
+ private final Map<String, DataSource> dataSourceFactories;
+ private final JDBCDataSourceConfigurations jdbcDataSourceConfigurations;
private static volatile ConnectionManager connectionManager;
private ScheduledExecutorService scheduledExecutorService;
private Integer kinitFailCount = 0;
- private ConnectionManager() {}
+ private ConnectionManager() {
+ jdbcDataSourceConfigurations = new JDBCDataSourceConfigurations();
+ dataSourceFactories = new HashMap<>();
+ }
public static ConnectionManager getInstance() {
if (connectionManager == null) {
@@ -66,192 +68,297 @@ public class ConnectionManager {
return connectionManager;
}
- {
- String supportedDBString =
JDBCConfiguration.JDBC_SUPPORT_DBS().getValue();
- String[] supportedDBs = supportedDBString.split(",");
- for (String supportedDB : supportedDBs) {
- String[] supportedDBInfo = supportedDB.split("=>");
- if (supportedDBInfo.length != 2) {
- throw new IllegalArgumentException("Illegal driver info " +
supportedDB);
- }
- try {
- Class.forName(supportedDBInfo[1]);
- } catch (ClassNotFoundException e) {
- logger.info("Load " + supportedDBInfo[0] + " driver failed",
e);
- }
- supportedDBNames.add(supportedDBInfo[0]);
- this.supportedDBs.put(supportedDBInfo[0], supportedDBInfo[1]);
+ public void initTaskStatementMap() {
+ try {
+ jdbcDataSourceConfigurations.initTaskIdStatementMap();
+ } catch (Exception e) {
+ LOG.error("Error while closing taskIdStatementMap statement...",
e);
}
}
- {
- String supportedDBValidQueryString =
-
JDBCConfiguration.JDBC_SUPPORT_DBS_VALIDATION_QUERY().getValue();
- String[] supportedDBsValidQuery =
supportedDBValidQueryString.split(",");
- for (String supportedDBValidQuery : supportedDBsValidQuery) {
- String[] supportedDBValidQueryInfo =
supportedDBValidQuery.split("=>");
- if (supportedDBValidQueryInfo.length != 2) {
- throw new IllegalArgumentException(
- "Illegal validation query info " +
supportedDBValidQuery);
- }
- this.supportedDBsValidQuery.put(
- supportedDBValidQueryInfo[0],
supportedDBValidQueryInfo[1]);
- }
+ public void saveStatement(String taskId, Statement statement) {
+ jdbcDataSourceConfigurations.saveStatement(taskId, statement);
}
- private void validateURL(String url) {
- if (StringUtils.isEmpty(url)) {
- throw new NullPointerException(JDBCEngineConnConstant.JDBC_URL + "
cannot be null.");
+ public void removeStatement(String taskId) {
+ jdbcDataSourceConfigurations.removeStatement(taskId);
+ }
+
+ public void cancelStatement(String taskId) {
+ try {
+ jdbcDataSourceConfigurations.cancelStatement(taskId);
+ } catch (SQLException e) {
+ LOG.error("Error while cancelling task is {} ...", taskId, e);
}
- if (!url.matches("jdbc:\\w+://\\S+:[0-9]{2,6}(/\\S*)?") &&
!url.startsWith("jdbc:h2")) {
- throw new IllegalArgumentException("Unknown the jdbc url: " + url);
+ }
+
+ public void close() {
+ try {
+ initTaskStatementMap();
+ } catch (Exception e) {
+ LOG.error("Error while closing...", e);
}
- for (String supportedDBName : supportedDBNames) {
- if (url.indexOf(supportedDBName) > 0) {
- return;
+ for (DataSource dataSource : this.dataSourceFactories.values()) {
+ try {
+ ((BasicDataSource) dataSource).close();
+ } catch (SQLException e) {
+ LOG.error("Error while closing datasource...", e);
}
}
- throw new IllegalArgumentException(
- "Illegal url or not supported url type (url: " + url + ").");
}
- private String getRealURL(String url) {
- int index = url.indexOf("?");
- if (index < 0) {
- index = url.length();
+ protected DataSource buildDataSource(String dbUrl, Map<String, String>
properties)
+ throws JDBCParamsIllegalException {
+ String driverClassName =
+ JDBCPropertiesParser.getString(properties,
JDBCEngineConnConstant.JDBC_DRIVER, "");
+
+ if (StringUtils.isBlank(driverClassName)) {
+ LOG.error("The driver class name is not empty.");
+ throw new JDBCParamsIllegalException("The driver class name is not
empty.");
}
- return url.substring(0, index);
- }
- protected DataSource createDataSources(Map<String, String> properties)
throws SQLException {
- String url = getJdbcUrl(properties);
- String username =
properties.getOrDefault(JDBCEngineConnConstant.JDBC_USERNAME, "");
+ String username =
+ JDBCPropertiesParser.getString(
+ properties, JDBCEngineConnConstant.JDBC_USERNAME, "");
String password =
-
StringUtils.trim(properties.getOrDefault(JDBCEngineConnConstant.JDBC_PASSWORD,
""));
- int index = url.indexOf(":") + 1;
- String dbType = url.substring(index, url.indexOf(":", index));
- Properties props = new Properties();
- props.put("driverClassName", supportedDBs.get(dbType));
- props.put("url", url);
- props.put("maxIdle", 5);
- props.put("minIdle", 0);
- props.put("maxActive", 20);
- props.put("initialSize", 1);
- props.put("testOnBorrow", false);
- props.put("testWhileIdle", true);
- props.put("validationQuery", this.supportedDBsValidQuery.get(dbType));
-
- if (isKerberosAuthType(properties)) {
- String jdbcProxyUser =
properties.get(JDBCEngineConnConstant.JDBC_PROXY_USER);
- // need proxy user
- String proxyUserProperty =
-
properties.get(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY);
- if (StringUtils.isNotBlank(proxyUserProperty)) {
- url = url.concat(";").concat(proxyUserProperty + "=" +
jdbcProxyUser);
- props.put("url", url);
- logger.info(
- String.format(
- "Try to Create a new %s JDBC DBCP with
url(%s), kerberos, proxyUser(%s).",
- dbType, url, jdbcProxyUser));
- } else {
- logger.info(
- String.format(
- "Try to Create a new %s JDBC DBCP with
url(%s), kerberos.",
- dbType, url));
- }
+ JDBCPropertiesParser.getString(
+ properties, JDBCEngineConnConstant.JDBC_PASSWORD, "");
+ JdbcAuthType jdbcAuthType = getJdbcAuthType(properties);
+ switch (jdbcAuthType) {
+ case USERNAME:
+ if (StringUtils.isBlank(username)) {
+ throw new JDBCParamsIllegalException("The jdbc username is
not empty.");
+ }
+ if (StringUtils.isBlank(password)) {
+ throw new JDBCParamsIllegalException("The jdbc password is
not empty.");
+ }
+ break;
+ case SIMPLE:
+ LOG.info("The jdbc auth type is simple.");
+ break;
+ case KERBEROS:
+ LOG.info("The jdbc auth type is kerberos.");
+ break;
+ default:
+ throw new JDBCParamsIllegalException(
+ "Unsupported jdbc authentication types " +
jdbcAuthType.getAuthType());
}
- if (isUsernameAuthType(properties)) {
- logger.info(
- String.format(
- "Try to Create a new %s JDBC DBCP with url(%s),
username(%s), password(%s).",
- dbType, url, username, password));
- props.put("username", username);
- props.put("password", password);
- }
- BasicDataSource dataSource;
- try {
- dataSource = (BasicDataSource)
BasicDataSourceFactory.createDataSource(props);
- } catch (Exception e) {
- throw new SQLException(e);
+ boolean testOnBorrow =
+ JDBCPropertiesParser.getBool(
+ properties,
JDBCEngineConnConstant.JDBC_POOL_TEST_ON_BORROW, false);
+ boolean testOnReturn =
+ JDBCPropertiesParser.getBool(
+ properties,
JDBCEngineConnConstant.JDBC_POOL_TEST_ON_RETURN, false);
+ boolean testWhileIdle =
+ JDBCPropertiesParser.getBool(
+ properties,
JDBCEngineConnConstant.JDBC_POOL_TEST_WHILE_IDLE, true);
+ int minEvictableIdleTimeMillis =
+ JDBCPropertiesParser.getInt(
+ properties,
+
JDBCEngineConnConstant.JDBC_POOL_TIME_BETWEEN_MIN_EVIC_IDLE_MS,
+ 300000);
+ long timeBetweenEvictionRunsMillis =
+ JDBCPropertiesParser.getLong(
+ properties,
+
JDBCEngineConnConstant.JDBC_POOL_TIME_BETWEEN_EVIC_RUNS_MS,
+ 60000);
+
+ long maxWait =
+ JDBCPropertiesParser.getLong(
+ properties, JDBCEngineConnConstant.JDBC_POOL_MAX_WAIT,
6000);
+ int maxActive =
+ JDBCPropertiesParser.getInt(
+ properties,
JDBCEngineConnConstant.JDBC_POOL_MAX_ACTIVE, 20);
+ int minIdle =
+ JDBCPropertiesParser.getInt(
+ properties, JDBCEngineConnConstant.JDBC_POOL_MIN_IDLE,
1);
+ int initialSize =
+ JDBCPropertiesParser.getInt(
+ properties,
JDBCEngineConnConstant.JDBC_POOL_INIT_SIZE, 1);
+ String validationQuery =
+ JDBCPropertiesParser.getString(
+ properties,
+ JDBCEngineConnConstant.JDBC_POOL_VALIDATION_QUERY,
+
JDBCEngineConnConstant.JDBC_POOL_DEFAULT_VALIDATION_QUERY);
+
+ boolean poolPreparedStatements =
+ JDBCPropertiesParser.getBool(
+ properties,
JDBCEngineConnConstant.JDBC_POOL_PREPARED_STATEMENTS, true);
+ boolean removeAbandoned =
+ JDBCPropertiesParser.getBool(
+ properties,
+
JDBCEngineConnConstant.JDBC_POOL_REMOVE_ABANDONED_ENABLED,
+ true);
+ int removeAbandonedTimeout =
+ JDBCPropertiesParser.getInt(
+ properties,
JDBCEngineConnConstant.JDBC_POOL_REMOVE_ABANDONED_TIMEOUT, 300);
+
+ DruidDataSource datasource = new DruidDataSource();
+ LOG.info("Database connection address information(数据库连接地址信息)=" +
dbUrl);
+ datasource.setUrl(dbUrl);
+ datasource.setUsername(username);
+ datasource.setPassword(password);
+ datasource.setDriverClassName(driverClassName);
+ datasource.setInitialSize(initialSize);
+ datasource.setMinIdle(minIdle);
+ datasource.setMaxActive(maxActive);
+ datasource.setMaxWait(maxWait);
+
datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
+ datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
+ datasource.setValidationQuery(validationQuery);
+ datasource.setTestWhileIdle(testWhileIdle);
+ datasource.setTestOnBorrow(testOnBorrow);
+ datasource.setTestOnReturn(testOnReturn);
+ datasource.setPoolPreparedStatements(poolPreparedStatements);
+ datasource.setRemoveAbandoned(removeAbandoned);
+ datasource.setRemoveAbandonedTimeout(removeAbandonedTimeout);
+ return datasource;
+ }
+
+ private Connection getConnectionFromDataSource(
+ String dataSourceName, String url, Map<String, String> prop)
+ throws SQLException, JDBCParamsIllegalException {
+ DataSource dataSource = dataSourceFactories.get(dataSourceName);
+ if (dataSource == null) {
+ synchronized (dataSourceFactories) {
+ if (dataSource == null) {
+ dataSource = buildDataSource(url, prop);
+ dataSourceFactories.put(dataSourceName, dataSource);
+ }
+ }
}
- return dataSource;
+ return dataSource.getConnection();
}
- public Connection getConnection(Map<String, String> properties) throws
SQLException {
- String url = getJdbcUrl(properties);
- logger.info("The jdbc url is: {}", url);
- JdbcAuthType jdbcAuthType = getJdbcAuthType(properties);
+ public Connection getConnection(String dataSourceName, Map<String, String>
propperties)
+ throws SQLException, JDBCParamsIllegalException {
+ String execUser =
+ JDBCPropertiesParser.getString(
+ propperties,
JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, "");
+ if (StringUtils.isBlank(execUser)) {
+ LOG.warn("No such execUser: {}", execUser);
+ throw new JDBCParamsIllegalException("No execUser");
+ }
Connection connection = null;
+ final String jdbcUrl = getJdbcUrl(propperties);
+ JdbcAuthType jdbcAuthType = getJdbcAuthType(propperties);
switch (jdbcAuthType) {
case SIMPLE:
- connection = getConnection(url, properties);
+ case USERNAME:
+ connection = getConnectionFromDataSource(dataSourceName,
jdbcUrl, propperties);
break;
case KERBEROS:
+ LOG.debug(
+ "Calling createKerberosSecureConfiguration(); this
will do loginUserFromKeytab() if required");
final String keytab =
- properties.get(
-
JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION);
+ JDBCPropertiesParser.getString(
+ propperties,
+
JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION,
+ "");
final String principal =
-
properties.get(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL);
+ JDBCPropertiesParser.getString(
+ propperties,
+
JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL,
+ "");
KerberosUtils.createKerberosSecureConfiguration(keytab,
principal);
- connection = getConnection(url, properties);
- break;
- case USERNAME:
- if
(StringUtils.isEmpty(properties.get(JDBCEngineConnConstant.JDBC_USERNAME))) {
- throw new
SQLException(JDBCEngineConnConstant.JDBC_USERNAME + " is not empty.");
- }
- if
(StringUtils.isEmpty(properties.get(JDBCEngineConnConstant.JDBC_PASSWORD))) {
- throw new
SQLException(JDBCEngineConnConstant.JDBC_PASSWORD + " is not empty.");
+ LOG.debug("createKerberosSecureConfiguration() returned");
+ boolean isProxyEnabled =
+ JDBCPropertiesParser.getBool(
+ propperties,
+
JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_PROXY_ENABLE,
+ true);
+
+ if (isProxyEnabled) {
+ final String jdbcUrlWithProxyUser =
+ appendProxyUserToJDBCUrl(jdbcUrl, execUser,
propperties);
+ LOG.info(
+ String.format(
+ "Try to Create a new %s JDBC with url(%s),
kerberos, proxyUser(%s).",
+ dataSourceName, jdbcUrlWithProxyUser,
execUser));
+ connection =
+ getConnectionFromDataSource(
+ dataSourceName, jdbcUrlWithProxyUser,
propperties);
+ } else {
+ UserGroupInformation ugi;
+ try {
+ ugi =
+ UserGroupInformation.createProxyUser(
+ execUser,
UserGroupInformation.getCurrentUser());
+ } catch (Exception e) {
+ LOG.error("Error in getCurrentUser", e);
+ throw new JDBCParamsIllegalException("Error in
getCurrentUser");
+ }
+
+ try {
+ connection =
+ ugi.doAs(
+ (PrivilegedExceptionAction<Connection>)
+ () ->
+
getConnectionFromDataSource(
+ dataSourceName,
+ jdbcUrl,
+ propperties));
+ } catch (Exception e) {
+ throw new JDBCParamsIllegalException(
+ "Error in doAs to get one connection.");
+ }
}
- connection = getConnection(url, properties);
break;
default:
- break;
+ throw new JDBCParamsIllegalException(
+ "Unsupported jdbc authentication types " +
jdbcAuthType.getAuthType());
}
return connection;
}
- public void close() {
- for (DataSource dataSource : this.databaseToDataSources.values()) {
- try {
- // DataSources.destroy(dataSource);
- ((BasicDataSource) dataSource).close();
- } catch (SQLException e) {
- }
+ private String getJdbcUrl(Map<String, String> properties) throws
SQLException {
+ String url = properties.get(JDBCEngineConnConstant.JDBC_URL);
+ if (StringUtils.isBlank(url)) {
+ throw new SQLException(JDBCEngineConnConstant.JDBC_URL + " is not
empty.");
}
+ url = clearJDBCUrl(url);
+ validateJDBCUrl(url);
+ return url.trim();
}
- private Connection getConnection(String url, Map<String, String>
properties)
- throws SQLException {
- String key = getRealURL(url);
- DataSource dataSource = databaseToDataSources.get(key);
- if (dataSource == null) {
- synchronized (databaseToDataSources) {
- if (dataSource == null) {
- dataSource = createDataSources(properties);
- databaseToDataSources.put(key, dataSource);
- }
- }
+ private String clearJDBCUrl(String url) {
+ if (url.startsWith("\"") && url.endsWith("\"")) {
+ url = url.trim();
+ return url.substring(1, url.length() - 1);
}
- return dataSource.getConnection();
+ return url;
}
- private String getJdbcUrl(Map<String, String> properties) throws
SQLException {
- String url = properties.get(JDBCEngineConnConstant.JDBC_URL);
+ private void validateJDBCUrl(String url) {
if (StringUtils.isEmpty(url)) {
- throw new SQLException(JDBCEngineConnConstant.JDBC_URL + " is not
empty.");
+ throw new NullPointerException(JDBCEngineConnConstant.JDBC_URL + "
cannot be null.");
+ }
+ if (!url.matches("jdbc:\\w+://\\S+:[0-9]{2,6}(/\\S*)?") &&
!url.startsWith("jdbc:h2")) {
+ throw new IllegalArgumentException("JDBC url format error!" + url);
}
- url = clearUrl(url);
- validateURL(url);
- return url.trim();
}
- private boolean isUsernameAuthType(Map<String, String> properties) {
- return USERNAME == getJdbcAuthType(properties);
- }
+ private String appendProxyUserToJDBCUrl(
+ String jdbcUrl, String execUser, Map<String, String> properties) {
+ StringBuilder jdbcUrlSb = new StringBuilder(jdbcUrl);
+ String proxyUserProperty =
+ JDBCPropertiesParser.getString(
+ properties,
JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY, "");
+ if (execUser != null
+ &&
!JDBCEngineConnConstant.JDBC_PROXY_ANONYMOUS_USER.equals(execUser)
+ && StringUtils.isNotBlank(proxyUserProperty)) {
+
+ int lastIndexOfUrl = jdbcUrl.indexOf("?");
+ if (lastIndexOfUrl == -1) {
+ lastIndexOfUrl = jdbcUrl.length();
+ }
+ LOG.info("Using proxy user as: {}", execUser);
+ LOG.info("Using proxy property for user as: {}",
proxyUserProperty);
+ jdbcUrlSb.insert(lastIndexOfUrl, ";" + proxyUserProperty + "=" +
execUser + ";");
+ }
- private boolean isKerberosAuthType(Map<String, String> properties) {
- return KERBEROS == getJdbcAuthType(properties);
+ return jdbcUrlSb.toString();
}
private JdbcAuthType getJdbcAuthType(Map<String, String> properties) {
@@ -269,9 +376,9 @@ public class ConnectionManager {
@Override
public Object call() throws Exception {
if (KerberosUtils.runRefreshKerberosLogin()) {
- logger.info("Ran runRefreshKerberosLogin command
successfully.");
+ LOG.info("Ran runRefreshKerberosLogin command
successfully.");
kinitFailCount = 0;
- logger.info(
+ LOG.info(
"Scheduling Kerberos ticket refresh thread
with interval {} ms",
KerberosUtils.getKerberosRefreshInterval());
scheduledExecutorService.schedule(
@@ -280,13 +387,12 @@ public class ConnectionManager {
TimeUnit.MILLISECONDS);
} else {
kinitFailCount++;
- logger.info(
+ LOG.info(
"runRefreshKerberosLogin failed for {}
time(s).",
kinitFailCount);
if (kinitFailCount >=
KerberosUtils.kinitFailTimesThreshold()) {
- logger.error(
+ LOG.error(
"runRefreshKerberosLogin failed for
max attempts, calling close executor.");
- // close();
} else {
// wait for 1 second before calling
runRefreshKerberosLogin() again
scheduledExecutorService.schedule(this, 1,
TimeUnit.SECONDS);
@@ -303,12 +409,4 @@ public class ConnectionManager {
scheduledExecutorService.shutdown();
}
}
-
- private String clearUrl(String url) {
- if (url.startsWith("\"") && url.endsWith("\"")) {
- url = url.trim();
- return url.substring(1, url.length() - 1);
- }
- return url;
- }
}
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
new file mode 100644
index 000000000..8b922f2dc
--- /dev/null
+++
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class JDBCDataSourceConfigurations {
+ private final Map<String, Statement> taskIdStatementMap;
+
+ public JDBCDataSourceConfigurations() {
+ taskIdStatementMap = new ConcurrentHashMap<>();
+ }
+
+ public void initTaskIdStatementMap() throws SQLException {
+ for (Statement statement : taskIdStatementMap.values()) {
+ if (statement != null && !statement.isClosed()) {
+ statement.close();
+ }
+ }
+ taskIdStatementMap.clear();
+ }
+
+ public void saveStatement(String taskId, Statement statement) {
+ taskIdStatementMap.put(taskId, statement);
+ }
+
+ public void cancelStatement(String taskId) throws SQLException {
+ taskIdStatementMap.get(taskId).cancel();
+ }
+
+ public void removeStatement(String taskId) {
+ taskIdStatementMap.remove(taskId);
+ }
+}
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCPropertiesParser.java
similarity index 50%
copy from
linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
copy to
linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCPropertiesParser.java
index dae6edaac..4885baed3 100644
---
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
+++
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCPropertiesParser.java
@@ -15,16 +15,20 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.jdbc.constant;
+package org.apache.linkis.manager.engineplugin.jdbc;
-public class JDBCEngineConnConstant {
- public static final String JDBC_URL = "wds.linkis.jdbc.connect.url";
- public static final String JDBC_USERNAME = "wds.linkis.jdbc.username";
- public static final String JDBC_PASSWORD = "wds.linkis.jdbc.password";
- public static final String JDBC_AUTH_TYPE = "wds.linkis.jdbc.auth.type";
- public static final String JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL =
"wds.linkis.jdbc.principal";
- public static final String JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION =
- "wds.linkis.jdbc.keytab.location";
- public static final String JDBC_PROXY_USER_PROPERTY =
"wds.linkis.jdbc.proxy.user.property";
- public static final String JDBC_PROXY_USER = "wds.linkis.jdbc.proxy.user";
+import java.util.Map;
+
+public class JDBCPropertiesParser extends PropertiesParser {
+ public static long getLong(Map<String, String> prop, String key, long
defaultValue) {
+ return getValue(prop, key, defaultValue, Long::parseLong);
+ }
+
+ public static int getInt(Map<String, String> prop, String key, int
defaultValue) {
+ return getValue(prop, key, defaultValue, Integer::parseInt);
+ }
+
+ public static boolean getBool(Map<String, String> prop, String key,
boolean defaultValue) {
+ return getValue(prop, key, defaultValue, "true"::equalsIgnoreCase);
+ }
}
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
new file mode 100644
index 000000000..5bc15dcab
--- /dev/null
+++
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Map;
+
+public abstract class PropertiesParser {
+ interface TypeConversion<T> {
+ T convertTo(String oriV);
+ }
+
+ public static String getString(Map<String, String> prop, String key,
String defaultValue) {
+ return prop.getOrDefault(key, defaultValue);
+ }
+
+ public static <T> T getValue(
+ Map<String, String> prop,
+ String key,
+ T defaultValue,
+ TypeConversion<T> typeConversion) {
+ String valueStr = getString(prop, key, "");
+ if (StringUtils.isBlank(valueStr)) {
+ return defaultValue;
+ }
+ try {
+ return typeConversion.convertTo(valueStr);
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+}
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
index dae6edaac..0ccc4b49e 100644
---
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
+++
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
@@ -18,13 +18,44 @@
package org.apache.linkis.manager.engineplugin.jdbc.constant;
public class JDBCEngineConnConstant {
+ private JDBCEngineConnConstant() {}
+
+ public static final String JDBC_DEFAULT_DATASOURCE_TAG = "jdbc";
+ public static final String JDBC_PROXY_ANONYMOUS_USER = "anonymous";
public static final String JDBC_URL = "wds.linkis.jdbc.connect.url";
+ public static final String JDBC_DRIVER = "wds.linkis.jdbc.driver";
public static final String JDBC_USERNAME = "wds.linkis.jdbc.username";
public static final String JDBC_PASSWORD = "wds.linkis.jdbc.password";
public static final String JDBC_AUTH_TYPE = "wds.linkis.jdbc.auth.type";
public static final String JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL =
"wds.linkis.jdbc.principal";
public static final String JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION =
"wds.linkis.jdbc.keytab.location";
+ public static final String JDBC_KERBEROS_AUTH_PROXY_ENABLE =
+ "wds.linkis.jdbc.auth.kerberos.proxy.enable";
public static final String JDBC_PROXY_USER_PROPERTY =
"wds.linkis.jdbc.proxy.user.property";
public static final String JDBC_PROXY_USER = "wds.linkis.jdbc.proxy.user";
+ public static final String JDBC_SCRIPTS_EXEC_USER = "execUser";
+ public static final String JDBC_ENGINE_RUN_TIME_DS =
"wds.linkis.engine.runtime.datasource";
+
+ public static final String JDBC_POOL_TEST_ON_BORROW =
"wds.linkis.jdbc.pool.testOnBorrow";
+ public static final String JDBC_POOL_TEST_ON_RETURN =
"wds.linkis.jdbc.pool.testOnReturn";
+ public static final String JDBC_POOL_TEST_WHILE_IDLE =
"wds.linkis.jdbc.pool.testWhileIdle";
+ public static final String JDBC_POOL_VALIDATION_QUERY =
"wds.linkis.jdbc.pool.validationQuery";
+ public static final String JDBC_POOL_DEFAULT_VALIDATION_QUERY = "SELECT 1";
+ public static final String JDBC_POOL_TIME_BETWEEN_MIN_EVIC_IDLE_MS =
+ "wds.linkis.jdbc.pool.minEvictableIdleTimeMillis";
+ public static final String JDBC_POOL_TIME_BETWEEN_EVIC_RUNS_MS =
+ "wds.linkis.jdbc.pool.timeBetweenEvictionRunsMillis";
+ public static final String JDBC_POOL_MAX_WAIT =
"wds.linkis.jdbc.pool.maxWaitMillis";
+ public static final String JDBC_POOL_MAX_ACTIVE =
"wds.linkis.jdbc.pool.maxActive";
+ public static final String JDBC_POOL_INIT_SIZE =
"wds.linkis.jdbc.pool.initialSize";
+ public static final String JDBC_POOL_MIN_IDLE =
"wds.linkis.jdbc.pool.minIdle";
+ public static final String JDBC_POOL_PREPARED_STATEMENTS =
+ "wds.linkis.jdbc.pool.poolPreparedStatements";
+ public static final String JDBC_POOL_REMOVE_ABANDONED_ENABLED =
+ "wds.linkis.jdbc.pool.remove.abandoned.enabled";
+ public static final String JDBC_POOL_REMOVE_ABANDONED_TIMEOUT =
+ "wds.linkis.jdbc.pool.remove.abandoned.timeout";
+
+ public static final String JDBC_ENGINE_MEMORY_UNIT = "g";
}
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/conf/JDBCConfiguration.scala
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/conf/JDBCConfiguration.scala
index 8a7411def..3d896a019 100644
---
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/conf/JDBCConfiguration.scala
+++
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/conf/JDBCConfiguration.scala
@@ -27,10 +27,6 @@ object JDBCConfiguration {
val JDBC_QUERY_TIMEOUT = CommonVars("wds.linkis.jdbc.query.timeout", 1800)
- val JDBC_SUPPORT_DBS = CommonVars("wds.linkis.jdbc.support.dbs",
"h2=>org.h2.Driver,mysql=>com.mysql.jdbc.Driver,postgresql=>org.postgresql.Driver,oracle=>oracle.jdbc.driver.OracleDriver,hive2=>org.apache.hive.jdbc.HiveDriver,presto=>com.facebook.presto.jdbc.PrestoDriver,clickhouse=>ru.yandex.clickhouse.ClickHouseDriver")
-
- val JDBC_SUPPORT_DBS_VALIDATION_QUERY =
CommonVars("wds.linkis.jdbc.support.dbs.validation.query", "h2=>SELECT
1,mysql=>SELECT 1,postgresql=>SELECT 1,oracle=>SELECT 1 FROM dual,hive2=>SELECT
1,presto=>SELECT 1")
-
val JDBC_CONCURRENT_LIMIT =
CommonVars[Int]("wds.linkis.engineconn.jdbc.concurrent.limit", 100)
val JDBC_KERBEROS_ENABLE = CommonVars[Boolean]("wds.linkis.keytab.enable",
false)
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
index 51ad30a61..3bb05236a 100644
---
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
package org.apache.linkis.manager.engineplugin.jdbc.executer
-import java.sql.{Connection, Statement}
+import java.sql.{Connection, ResultSet, SQLException, Statement}
import java.util
import org.apache.linkis.common.utils.{OverloadUtils, Utils}
import
org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor,
EngineExecutionContext}
@@ -34,6 +34,7 @@ import org.apache.linkis.storage.domain.{Column, DataType}
import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import org.apache.commons.io.IOUtils
+import org.apache.commons.lang.StringUtils
import org.apache.linkis.common.conf.Configuration
import
org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfig,
ResponseQueryConfig}
import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel,
UserCreatorLabel}
@@ -49,13 +50,11 @@ class JDBCEngineConnExecutor(override val outputPrintLimit:
Int, val id: Int) ex
private val connectionManager = ConnectionManager.getInstance()
-
- private var statement: Statement = null
-
private val executorLabels: util.List[Label[_]] = new
util.ArrayList[Label[_]](2)
- private var connection: Connection = null
+
override def init(): Unit = {
+ info("jdbc executor start init.")
setCodeParser(new SQLCodeParser)
super.init()
if (JDBCConfiguration.JDBC_KERBEROS_ENABLE.getValue) {
@@ -64,111 +63,151 @@ class JDBCEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int) ex
}
override def executeLine(engineExecutorContext: EngineExecutionContext,
code: String): ExecuteResponse = {
+ val execSqlUser = getExecSqlUser(engineExecutorContext)
val realCode = code.trim()
+ val taskId = engineExecutorContext.getJobId.get
val properties =
engineExecutorContext.getProperties.asInstanceOf[util.Map[String, String]]
+ var dataSourceName =
properties.getOrDefault(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS, "")
if (properties.get(JDBCEngineConnConstant.JDBC_URL) == null) {
- info(s"jdbc url is empty, adding now...")
+ info(s"The jdbc url is empty, adding now...")
val globalConfig =
Utils.tryAndWarn(JDBCEngineConfig.getCacheMap(engineExecutorContext.getLabels))
+ if (StringUtils.isNotBlank(dataSourceName)) {
+ info("Start getting data source connection parameters from the data
source hub.")
+ // todo get data source info by data source client
+ }
properties.put(JDBCEngineConnConstant.JDBC_URL,
globalConfig.get(JDBCEngineConnConstant.JDBC_URL))
+ properties.put(JDBCEngineConnConstant.JDBC_DRIVER,
globalConfig.get(JDBCEngineConnConstant.JDBC_DRIVER))
properties.put(JDBCEngineConnConstant.JDBC_USERNAME,
globalConfig.get(JDBCEngineConnConstant.JDBC_USERNAME))
properties.put(JDBCEngineConnConstant.JDBC_PASSWORD,
globalConfig.get(JDBCEngineConnConstant.JDBC_PASSWORD))
+ properties.put(JDBCEngineConnConstant.JDBC_POOL_VALIDATION_QUERY,
globalConfig.getOrDefault(JDBCEngineConnConstant.JDBC_POOL_VALIDATION_QUERY,
JDBCEngineConnConstant.JDBC_POOL_DEFAULT_VALIDATION_QUERY))
properties.put(JDBCEngineConnConstant.JDBC_AUTH_TYPE,
globalConfig.get(JDBCEngineConnConstant.JDBC_AUTH_TYPE))
properties.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL,
globalConfig.get(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL))
properties.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION,
globalConfig.get(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION))
properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY,
globalConfig.getOrDefault(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY, ""))
- properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER,
globalConfig.getOrDefault(JDBCEngineConnConstant.JDBC_PROXY_USER,
EngineConnObject.getEngineCreationContext.getUser))
+ properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER,
globalConfig.getOrDefault(JDBCEngineConnConstant.JDBC_PROXY_USER, execSqlUser))
+ properties.put(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER,
execSqlUser)
+ }
+ if (StringUtils.isBlank(dataSourceName)) {
+ dataSourceName = JDBCEngineConnConstant.JDBC_DEFAULT_DATASOURCE_TAG;
}
+ info(s"The data source name is [$dataSourceName], and the jdbc client
begins to run jdbc code:\n ${realCode.trim}")
+ var connection: Connection = null
+ var statement: Statement = null
+ var resultSet: ResultSet = null
- info(s"jdbc client begins to run jdbc code:\n ${realCode.trim}")
- connection = connectionManager.getConnection(properties)
- statement = connection.createStatement()
- info(s"create statement is: $statement")
- val isResultSetAvailable = statement.execute(code)
- info(s"Is ResultSet available ? : $isResultSetAvailable")
- if (isResultSetAvailable) {
- info("ResultSet is available")
- val JDBCResultSet = statement.getResultSet
- if (isDDLCommand(statement.getUpdateCount(),
JDBCResultSet.getMetaData().getColumnCount)) {
- info(s"current result is a ResultSet Object , but there are no more
results :${code} ")
- Utils.tryQuietly {
- JDBCResultSet.close()
- statement.close()
- connection.close()
+ Utils.tryCatch({
+ connection = connectionManager.getConnection(dataSourceName, properties)
+ }) {
+ case e: Exception => return ErrorExecuteResponse("created data source
connection error.", e)
+ }
+
+ try {
+ statement.setQueryTimeout(JDBCConfiguration.JDBC_QUERY_TIMEOUT.getValue)
+ statement = connection.createStatement()
+ statement.setFetchSize(outputPrintLimit)
+ statement.setMaxRows(outputPrintLimit)
+ info(s"create statement is: $statement")
+ connectionManager.saveStatement(taskId, statement)
+ val isResultSetAvailable = statement.execute(code)
+ info(s"Is ResultSet available ? : $isResultSetAvailable")
+ try {
+ if (isResultSetAvailable) {
+ info("ResultSet is available")
+ resultSet = statement.getResultSet
+ return getExecResultSetOutput(engineExecutorContext, statement,
resultSet)
+ } else {
+ val updateCount = statement.getUpdateCount
+ info(s"only return affect rows : $updateCount")
+ engineExecutorContext.appendStdout(s"only return affect rows :
$updateCount")
+ return SuccessExecuteResponse()
}
- SuccessExecuteResponse()
- } else {
- val md = JDBCResultSet.getMetaData
- val metaArrayBuffer = new ArrayBuffer[Tuple2[String, String]]()
- for (i <- 1 to md.getColumnCount) {
- metaArrayBuffer.add(Tuple2(md.getColumnName(i),
JDBCHelper.getTypeStr(md.getColumnType(i))))
+ } finally {
+ if (resultSet != null) {
+ Utils.tryCatch({ resultSet.close() }) { case e: SQLException =>
warn(e.getMessage) }
}
- val columns = metaArrayBuffer.map { c => Column(c._1,
DataType.toDataType(c._2), "") }.toArray[Column]
- val metaData = new TableMetaData(columns)
- val resultSetWriter =
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
- resultSetWriter.addMetaData(metaData)
- var count = 0
- Utils.tryCatch({
- while (count < outputPrintLimit && JDBCResultSet.next()) {
- val r: Array[Any] = columns.indices.map { i =>
- val data = JDBCResultSet.getObject(i + 1) match {
- case value: Any => JDBCResultSet.getString(i + 1)
- case _ => null
- }
- data
- }.toArray
- resultSetWriter.addRecord(new TableRecord(r))
- count += 1
- }
- }) {
- case e: Exception => return ErrorExecuteResponse("query jdbc
failed", e)
+ if (statement != null) {
+ Utils.tryCatch({ statement.close() }) { case e: SQLException =>
warn(e.getMessage) }
}
- val output = if (resultSetWriter != null) resultSetWriter.toString
else null
- Utils.tryQuietly {
- JDBCResultSet.close()
- statement.close()
+ }
+ } catch {
+ case e: Throwable => error(s"Cannot run $code", e)
+ } finally {
+ if (connection != null) {
+ try {
+ if (!connection.getAutoCommit) connection.commit()
connection.close()
- IOUtils.closeQuietly(resultSetWriter)
+ } catch {
+ case e: SQLException => warn("close connection error.", e)
}
- info("sql execute completed")
- AliasOutputExecuteResponse(null, output)
}
+ connectionManager.removeStatement(taskId)
+ }
+ SuccessExecuteResponse()
+ }
+
+ private def getExecResultSetOutput(engineExecutorContext:
EngineExecutionContext, statement: Statement, resultSet: ResultSet):
ExecuteResponse = {
+ if (isDDLCommand(statement.getUpdateCount,
resultSet.getMetaData.getColumnCount)) {
+ info(s"current result is a ResultSet Object , but there are no more
results!")
+ engineExecutorContext.appendStdout("Query executed successfully.")
+ SuccessExecuteResponse()
} else {
- info(s"only return affect rows : ${statement.getUpdateCount}")
+ val md = resultSet.getMetaData
+ val metaArrayBuffer = new ArrayBuffer[(String, String)]()
+ for (i <- 1 to md.getColumnCount) {
+ metaArrayBuffer.add(Tuple2(md.getColumnName(i),
JDBCHelper.getTypeStr(md.getColumnType(i))))
+ }
+ val columns = metaArrayBuffer.map { c => Column(c._1,
DataType.toDataType(c._2), "") }.toArray[Column]
+ val metaData = new TableMetaData(columns)
+ val resultSetWriter =
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
+ resultSetWriter.addMetaData(metaData)
+ var count = 0
+ Utils.tryCatch({
+ while (count < outputPrintLimit && resultSet.next()) {
+ val r: Array[Any] = columns.indices.map { i =>
+ val data = resultSet.getObject(i + 1) match {
+ case value: Any => resultSet.getString(i + 1)
+ case _ => null
+ }
+ data
+ }.toArray
+ resultSetWriter.addRecord(new TableRecord(r))
+ count += 1
+ }
+ }) {
+ case e: Exception => return ErrorExecuteResponse("query jdbc failed",
e)
+ }
+ val output = if (resultSetWriter != null) resultSetWriter.toString else
null
Utils.tryQuietly {
- statement.close()
- connection.close()
+ IOUtils.closeQuietly(resultSetWriter)
}
- SuccessExecuteResponse()
+ info("sql executed completed.")
+ AliasOutputExecuteResponse(null, output)
}
}
+ private def getExecSqlUser(engineExecutionContext: EngineExecutionContext):
String = {
+ val userCreatorLabel =
engineExecutionContext.getLabels.find(_.isInstanceOf[UserCreatorLabel]).get.asInstanceOf[UserCreatorLabel]
+ userCreatorLabel.getUser
+ }
+
protected def isDDLCommand(updatedCount: Int, columnCount: Int): Boolean = {
- if (updatedCount < 0 && columnCount <= 0) {
- true
- } else {
- false
- }
+ updatedCount < 0 && columnCount <= 0
}
override def getProgressInfo(taskID: String): Array[JobProgressInfo] =
Array.empty[JobProgressInfo]
override protected def callback(): Unit = {}
- override def progress(taskID: String): Float = {
- 0
- }
+ override def progress(taskID: String): Float = 0
override def close(): Unit = {
- if (statement != null) {
- statement.close()
- }
- if (connection != null) {
- connection.close()
- }
+ info("Start closing the jdbc engine.")
+ connectionManager.close()
if (JDBCConfiguration.JDBC_KERBEROS_ENABLE.getValue) {
connectionManager.shutdownRefreshKerberosLoginService()
}
+ info("The jdbc engine has closed successfully.")
}
override def executeCompletely(engineExecutorContext:
EngineExecutionContext, code: String, completedLine: String): ExecuteResponse =
null
@@ -188,8 +227,8 @@ class JDBCEngineConnExecutor(override val outputPrintLimit:
Int, val id: Int) ex
val properties = EngineConnObject.getEngineCreationContext.getOptions
if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
val settingClientMemory =
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
- if (!settingClientMemory.toLowerCase().endsWith("g")) {
- properties.put(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
settingClientMemory + "g")
+ if
(!settingClientMemory.toLowerCase().endsWith(JDBCEngineConnConstant.JDBC_ENGINE_MEMORY_UNIT))
{
+ properties.put(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
settingClientMemory + JDBCEngineConnConstant.JDBC_ENGINE_MEMORY_UNIT)
}
}
val resource = new CommonNodeResource
@@ -200,12 +239,21 @@ class JDBCEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int) ex
override def supportCallBackLogs(): Boolean = false
- override def getId(): String = Sender.getThisServiceInstance.getInstance +
s"_$id"
+ override def getId: String = Sender.getThisServiceInstance.getInstance +
s"_$id"
override def getConcurrentLimit: Int =
JDBCConfiguration.JDBC_CONCURRENT_LIMIT.getValue
override def killAll(): Unit = {
+ info("Killing all query task.")
+ connectionManager.initTaskStatementMap()
+ info("All query task has killed successfully.")
+ }
+ override def killTask(taskId: String): Unit = {
+ info(s"Killing jdbc query task $taskId")
+ connectionManager.cancelStatement(taskId)
+ super.killTask(taskId)
+ info(s"The query task $taskId has killed successfully.")
}
}
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
index 811b633ce..bb448d853 100644
---
a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
+++
b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
@@ -18,10 +18,9 @@
package org.apache.linkis.manager.engineplugin.jdbc;
import
org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant;
+import
org.apache.linkis.manager.engineplugin.jdbc.exception.JDBCParamsIllegalException;
-import org.apache.commons.dbcp.BasicDataSource;
-import org.apache.commons.dbcp.BasicDataSourceFactory;
-
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
@@ -31,25 +30,28 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
public class ConnectionManagerTest {
@Test
@DisplayName("testCreateJdbcConnAndExecSql")
- public void testCreateJdbcConnAndExecSql() throws SQLException {
+ public void testCreateJdbcConnAndExecSql()
+ throws SQLException, JDBCParamsIllegalException,
ClassNotFoundException {
Map<String, String> properties = new HashMap<>(8);
properties.put(
JDBCEngineConnConstant.JDBC_URL,
"jdbc:h2:mem:linkis_db;MODE=MySQL;DATABASE_TO_LOWER=TRUE");
+ properties.put(JDBCEngineConnConstant.JDBC_DRIVER, "org.h2.Driver");
properties.put(JDBCEngineConnConstant.JDBC_USERNAME, "user");
properties.put(JDBCEngineConnConstant.JDBC_PASSWORD, "password");
+
properties.put(JDBCEngineConnConstant.JDBC_POOL_DEFAULT_VALIDATION_QUERY,
"SELECT 1");
properties.put(JDBCEngineConnConstant.JDBC_AUTH_TYPE, "USERNAME");
properties.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL, "");
properties.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION,
"");
properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY, "");
properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER, "");
+ properties.put(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER,
"leo_jie");
ConnectionManager connectionManager = ConnectionManager.getInstance();
- Connection conn = connectionManager.getConnection(properties);
+ Connection conn = connectionManager.getConnection("jdbc", properties);
Statement statement = conn.createStatement();
ResultSet rs = statement.executeQuery("show databases;");
while (rs.next()) {
@@ -61,30 +63,11 @@ public class ConnectionManagerTest {
}
@Test
- @DisplayName("testExecSql")
- public void testExecSql() throws Exception {
- Properties properties = new Properties();
- properties.put("driverClassName", "org.h2.Driver");
- properties.put("url",
"jdbc:h2:mem:linkis_db;MODE=MySQL;DATABASE_TO_LOWER=TRUE");
- properties.put("username", "user");
- properties.put("password", "password");
- properties.put("maxIdle", 20);
- properties.put("minIdle", 0);
- properties.put("initialSize", 1);
- properties.put("testOnBorrow", false);
- properties.put("testWhileIdle", true);
- properties.put("validationQuery", "select 1");
- BasicDataSource dataSource =
- (BasicDataSource)
BasicDataSourceFactory.createDataSource(properties);
- Connection conn = dataSource.getConnection();
- Statement statement = conn.createStatement();
- ResultSet rs = statement.executeQuery("show databases;");
- while (rs.next()) {
- System.out.println(rs.getObject(1));
- }
- rs.close();
- statement.close();
- conn.close();
- dataSource.close();
+ @DisplayName("testCreateJdbcConnAndExecSql")
+ public void testJDBCPropertiesParserGetLong() {
+ Map<String, String> properties = new HashMap<>(1);
+ properties.put("key", "10");
+ long v = JDBCPropertiesParser.getLong(properties, "key", 0);
+ Assertions.assertEquals(10, v);
}
}
diff --git a/pom.xml b/pom.xml
index 0bb1f9595..63b56b336 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,7 +145,7 @@
<mockito.version>3.9.0</mockito.version>
<assertj.version>3.17.2</assertj.version>
<h2.version>1.4.200</h2.version>
-
+ <dbcp2.version>2.0.1</dbcp2.version>
<apache-rat-plugin.version>0.13</apache-rat-plugin.version>
<assembly.package.rootpath>${basedir}</assembly.package.rootpath>
<maven.compiler.source>1.8</maven.compiler.source>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]