This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 6617e3fb72 Revert "datasource test and sql task Remove connection pool
issues is #14179 (#14193)" (#14626)
6617e3fb72 is described below
commit 6617e3fb72ad48c37c17900debc9fbf773429de7
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Jul 24 17:33:43 2023 +0800
Revert "datasource test and sql task Remove connection pool issues is
#14179 (#14193)" (#14626)
This reverts commit e4fb5b30a45b5e2380841e8b37dff2f0408fc5a7.
---
.../api/client/CommonDataSourceClient.java | 42 ++++++++-----------
.../azuresql/AzureSQLDataSourceClient.java | 34 ++++++++++++++++
.../datasource/hive/HiveDataSourceClient.java | 30 ++++++++++++++
.../datasource/kyuubi/KyuubiDataSourceClient.java | 27 +++++++++++++
.../kyuubi/KyuubiDataSourceClientTest.java | 8 ++++
.../redshift/RedshiftDataSourceClient.java | 47 ++++++++++++++++++++++
6 files changed, 162 insertions(+), 26 deletions(-)
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
index 37d783b640..c87b3453a1 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.datasource.api.client;
+import
org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -24,13 +25,15 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.jdbc.core.JdbcTemplate;
+
import com.google.common.base.Stopwatch;
+import com.zaxxer.hikari.HikariDataSource;
@Slf4j
public class CommonDataSourceClient implements DataSourceClient {
@@ -39,7 +42,8 @@ public class CommonDataSourceClient implements
DataSourceClient {
public static final String COMMON_VALIDATION_QUERY = "select 1";
protected final BaseConnectionParam baseConnectionParam;
- protected Connection connection;
+ protected HikariDataSource dataSource;
+ protected JdbcTemplate jdbcTemplate;
public CommonDataSourceClient(BaseConnectionParam baseConnectionParam,
DbType dbType) {
this.baseConnectionParam = baseConnectionParam;
@@ -59,7 +63,8 @@ public class CommonDataSourceClient implements
DataSourceClient {
}
protected void initClient(BaseConnectionParam baseConnectionParam, DbType
dbType) {
- this.connection = buildConn(baseConnectionParam);
+ this.dataSource =
JDBCDataSourceProvider.createJdbcDataSource(baseConnectionParam, dbType);
+ this.jdbcTemplate = new JdbcTemplate(dataSource);
}
protected void checkUser(BaseConnectionParam baseConnectionParam) {
@@ -68,20 +73,6 @@ public class CommonDataSourceClient implements
DataSourceClient {
}
}
- private Connection buildConn(BaseConnectionParam baseConnectionParam) {
- Connection conn = null;
- try {
- Class.forName(baseConnectionParam.getDriverClassName());
- conn =
DriverManager.getConnection(baseConnectionParam.getJdbcUrl(),
baseConnectionParam.getUser(),
- baseConnectionParam.getPassword());
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Driver load fail", e);
- } catch (SQLException e) {
- throw new RuntimeException("JDBC connect failed", e);
- }
- return conn;
- }
-
protected void setDefaultUsername(BaseConnectionParam baseConnectionParam)
{
baseConnectionParam.setUser(COMMON_USER);
}
@@ -101,7 +92,7 @@ public class CommonDataSourceClient implements
DataSourceClient {
// Checking data source client
Stopwatch stopwatch = Stopwatch.createStarted();
try {
-
this.connection.prepareStatement(this.baseConnectionParam.getValidationQuery()).executeQuery();
+
this.jdbcTemplate.execute(this.baseConnectionParam.getValidationQuery());
} catch (Exception e) {
throw new RuntimeException("JDBC connect failed", e);
} finally {
@@ -113,21 +104,20 @@ public class CommonDataSourceClient implements
DataSourceClient {
@Override
public Connection getConnection() {
try {
- return connection.isClosed() ? buildConn(baseConnectionParam) :
connection;
+ return this.dataSource.getConnection();
} catch (SQLException e) {
- throw new RuntimeException("get conn is fail", e);
+ log.error("get druidDataSource Connection fail SQLException: {}",
e.getMessage(), e);
+ return null;
}
}
@Override
public void close() {
- log.info("do close connection {}.", baseConnectionParam.getDatabase());
- try {
- connection.close();
- } catch (SQLException e) {
- log.info("colse connection fail");
- throw new RuntimeException(e);
+ log.info("do close dataSource {}.", baseConnectionParam.getDatabase());
+ try (HikariDataSource closedDatasource = dataSource) {
+ // only close the resource
}
+ this.jdbcTemplate = null;
}
}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java
index 53af3946b8..cf7db2e3b2 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java
@@ -25,9 +25,14 @@ import
org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import com.google.common.base.Stopwatch;
+
@Slf4j
public class AzureSQLDataSourceClient extends CommonDataSourceClient {
@@ -44,4 +49,33 @@ public class AzureSQLDataSourceClient extends
CommonDataSourceClient {
return AzureSQLDataSourceProcessor.tokenGetConnection(connectionParam);
}
+ @Override
+ public void checkClient() {
+
+ AzureSQLConnectionParam connectionParam = (AzureSQLConnectionParam)
this.baseConnectionParam;
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ String validationQuery = this.baseConnectionParam.getValidationQuery();
+ if (!connectionParam.getMode().equals(AzureSQLAuthMode.ACCESSTOKEN)) {
+ // Checking data source client
+ try {
+ this.jdbcTemplate.execute(validationQuery);
+ } catch (Exception e) {
+ throw new RuntimeException("JDBC connect failed", e);
+ } finally {
+ log.info("Time to execute check jdbc client with sql {} for {}
ms ",
+ this.baseConnectionParam.getValidationQuery(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+ } else {
+ try (Statement statement = getConnection().createStatement()) {
+ if (!statement.execute(validationQuery)) {
+ throw new SQLException("execute check azure sql token
client failed : " + validationQuery);
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ log.info("Time to execute check azure sql token client with
sql {} for {} ms ",
+ this.baseConnectionParam.getValidationQuery(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+ }
+ }
}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
index 3c28551e44..15270f60a3 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
@@ -23,6 +23,7 @@ import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SEC
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import
org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
+import
org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
import
org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -32,9 +33,13 @@ import sun.security.krb5.Config;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.SQLException;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.jdbc.core.JdbcTemplate;
+
@Slf4j
public class HiveDataSourceClient extends CommonDataSourceClient {
@@ -47,6 +52,17 @@ public class HiveDataSourceClient extends
CommonDataSourceClient {
log.info("PreInit in {}", getClass().getName());
}
+ @Override
+ protected void initClient(BaseConnectionParam baseConnectionParam, DbType
dbType) {
+ log.info("Create UserGroupInformation.");
+ UserGroupInformationFactory.login(baseConnectionParam.getUser());
+ log.info("Create ugi success.");
+
+ this.dataSource =
JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam,
dbType);
+ this.jdbcTemplate = new JdbcTemplate(dataSource);
+ log.info("Init {} success.", getClass().getName());
+ }
+
@Override
protected void checkEnv(BaseConnectionParam baseConnectionParam) {
super.checkEnv(baseConnectionParam);
@@ -70,6 +86,20 @@ public class HiveDataSourceClient extends
CommonDataSourceClient {
}
}
+ @Override
+ public Connection getConnection() {
+ Connection connection = null;
+ while (connection == null) {
+ try {
+ connection = dataSource.getConnection();
+ } catch (SQLException e) {
+
UserGroupInformationFactory.logout(baseConnectionParam.getUser());
+
UserGroupInformationFactory.login(baseConnectionParam.getUser());
+ }
+ }
+ return connection;
+ }
+
@Override
public void close() {
try {
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java
index 2d3954fff3..3e0af69577 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java
@@ -18,11 +18,17 @@
package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
import
org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
+import
org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
+import java.sql.Connection;
+import java.sql.SQLException;
+
import lombok.extern.slf4j.Slf4j;
+import org.springframework.jdbc.core.JdbcTemplate;
+
@Slf4j
public class KyuubiDataSourceClient extends CommonDataSourceClient {
@@ -35,11 +41,32 @@ public class KyuubiDataSourceClient extends
CommonDataSourceClient {
log.info("PreInit in {}", getClass().getName());
}
+ @Override
+ protected void initClient(BaseConnectionParam baseConnectionParam, DbType
dbType) {
+
+ this.dataSource =
JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam,
dbType);
+ this.jdbcTemplate = new JdbcTemplate(dataSource);
+ log.info("Init {} success.", getClass().getName());
+ }
+
@Override
protected void checkEnv(BaseConnectionParam baseConnectionParam) {
super.checkEnv(baseConnectionParam);
}
+ @Override
+ public Connection getConnection() {
+ Connection connection = null;
+ while (connection == null) {
+ try {
+ connection = dataSource.getConnection();
+ } catch (SQLException e) {
+ log.error("Failed to get Kyuubi Connection.", e);
+ }
+ }
+ return connection;
+ }
+
@Override
public void close() {
super.close();
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java
index 03b6f44c24..041420cc48 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
import
org.apache.dolphinscheduler.plugin.datasource.kyuubi.param.KyuubiConnectionParam;
+import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
@@ -48,6 +49,13 @@ public class KyuubiDataSourceClientTest {
Mockito.verify(kyuubiDataSourceClient).checkEnv(kyuubiConnectionParam);
}
+ @Test
+ public void testInitClient() {
+ KyuubiConnectionParam kyuubiConnectionParam = new
KyuubiConnectionParam();
+ kyuubiDataSourceClient.initClient(kyuubiConnectionParam,
DbType.KYUUBI);
+
Mockito.verify(kyuubiDataSourceClient).initClient(kyuubiConnectionParam,
DbType.KYUUBI);
+ }
+
@Test
public void testCheckClient() {
kyuubiDataSourceClient.checkClient();
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java
index 74aeb30ee2..186e5afd19 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java
@@ -18,11 +18,21 @@
package org.apache.dolphinscheduler.plugin.datasource.redshift;
import
org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
+import
org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftAuthMode;
+import
org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftConnectionParam;
+import
org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftDataSourceProcessor;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+
import lombok.extern.slf4j.Slf4j;
+import com.google.common.base.Stopwatch;
+
@Slf4j
public class RedshiftDataSourceClient extends CommonDataSourceClient {
@@ -30,4 +40,41 @@ public class RedshiftDataSourceClient extends
CommonDataSourceClient {
super(baseConnectionParam, dbType);
}
+ @Override
+ public Connection getConnection() {
+ RedshiftConnectionParam connectionParam = (RedshiftConnectionParam)
this.baseConnectionParam;
+ if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
+ return super.getConnection();
+ }
+ return RedshiftDataSourceProcessor.getConnectionByIAM(connectionParam);
+ }
+
+ @Override
+ public void checkClient() {
+ RedshiftConnectionParam connectionParam = (RedshiftConnectionParam)
this.baseConnectionParam;
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ String validationQuery = this.baseConnectionParam.getValidationQuery();
+ if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
+ // Checking data source client
+ try {
+ this.jdbcTemplate.execute(validationQuery);
+ } catch (Exception e) {
+ throw new RuntimeException("JDBC connect failed", e);
+ } finally {
+ log.info("Time to execute check jdbc client with sql {} for {}
ms ",
+ this.baseConnectionParam.getValidationQuery(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+ } else {
+ try (Statement statement = getConnection().createStatement()) {
+ if (!statement.execute(validationQuery)) {
+ throw new SQLException("execute check redshift access key
failed : " + validationQuery);
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ log.info("Time to execute check redshift access key with sql
{} for {} ms ",
+ this.baseConnectionParam.getValidationQuery(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+ }
+ }
}