This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.1.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.8-prepare by this push:
     new 8e058fc227 Use adhoc connection in sql/procedure task (#14670)
8e058fc227 is described below

commit 8e058fc227c2ac958e4cd345eb855b1e4d69a2d9
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Jul 31 22:44:43 2023 +0800

    Use adhoc connection in sql/procedure task (#14670)
---
 .../api/service/impl/DataSourceServiceImpl.java    |  57 ++++----
 .../api/service/DataSourceServiceTest.java         | 159 ++++++++++++---------
 .../plugin/datasource/api/utils/CommonUtils.java   |   4 +-
 .../plugin/task/datax/DataxTask.java               |  49 ++++---
 .../plugin/task/procedure/ProcedureTask.java       |   3 +-
 .../dolphinscheduler/plugin/task/sql/SqlTask.java  |  61 ++++----
 6 files changed, 183 insertions(+), 150 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
index 7080b83646..309d2ce01f 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
@@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
 import 
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
-import 
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@@ -70,8 +69,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
-
-
 /**
  * data source service impl
  */
@@ -86,7 +83,6 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
     @Autowired
     private DataSourceUserMapper datasourceUserMapper;
 
-
     private static final String TABLE = "TABLE";
     private static final String VIEW = "VIEW";
     private static final String[] TABLE_TYPES = new String[]{TABLE, VIEW};
@@ -105,7 +101,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
     public Result<Object> createDataSource(User loginUser, 
BaseDataSourceParamDTO datasourceParam) {
         DataSourceUtils.checkDatasourceParam(datasourceParam);
         Result<Object> result = new Result<>();
-        if (!canOperatorPermissions(loginUser,null, 
AuthorizationType.DATASOURCE, 
ApiFuncIdentificationConstant.DATASOURCE_CREATE_DATASOURCE)) {
+        if (!canOperatorPermissions(loginUser, null, 
AuthorizationType.DATASOURCE,
+                ApiFuncIdentificationConstant.DATASOURCE_CREATE_DATASOURCE)) {
             putMsg(result, Status.USER_NO_OPERATION_PERM);
             return result;
         }
@@ -114,7 +111,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
             putMsg(result, Status.DATASOURCE_EXIST);
             return result;
         }
-        if(checkDescriptionLength(datasourceParam.getNote())){
+        if (checkDescriptionLength(datasourceParam.getNote())) {
             putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
             return result;
         }
@@ -141,7 +138,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
         try {
             dataSourceMapper.insert(dataSource);
             putMsg(result, Status.SUCCESS);
-            permissionPostHandle(AuthorizationType.DATASOURCE, 
loginUser.getId(), Collections.singletonList(dataSource.getId()), logger);
+            permissionPostHandle(AuthorizationType.DATASOURCE, 
loginUser.getId(),
+                    Collections.singletonList(dataSource.getId()), logger);
         } catch (DuplicateKeyException ex) {
             logger.error("Create datasource error.", ex);
             putMsg(result, Status.DATASOURCE_EXIST);
@@ -168,22 +166,24 @@ public class DataSourceServiceImpl extends 
BaseServiceImpl implements DataSource
             return result;
         }
 
-        if (!canOperatorPermissions(loginUser,new 
Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE, DATASOURCE_UPDATE)) 
{
+        if (!canOperatorPermissions(loginUser, new 
Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE,
+                DATASOURCE_UPDATE)) {
             putMsg(result, Status.USER_NO_OPERATION_PERM);
             return result;
         }
 
-        //check name can use or not
+        // check name can use or not
         if (!dataSource.getName().trim().equals(dataSource.getName()) && 
checkName(dataSource.getName())) {
             putMsg(result, Status.DATASOURCE_EXIST);
             return result;
         }
-        if(checkDescriptionLength(dataSourceParam.getNote())){
+        if (checkDescriptionLength(dataSourceParam.getNote())) {
             putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
             return result;
         }
-        //check password,if the password is not updated, set to the old 
password.
-        BaseConnectionParam connectionParam = (BaseConnectionParam) 
DataSourceUtils.buildConnectionParams(dataSourceParam);
+        // check password,if the password is not updated, set to the old 
password.
+        BaseConnectionParam connectionParam =
+                (BaseConnectionParam) 
DataSourceUtils.buildConnectionParams(dataSourceParam);
         String password = connectionParam.getPassword();
         if (StringUtils.isBlank(password)) {
             String oldConnectionParams = dataSource.getConnectionParams();
@@ -262,9 +262,11 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
         Page<DataSource> dataSourcePage = new Page<>(pageNo, pageSize);
         PageInfo<DataSource> pageInfo = new PageInfo<>(pageNo, pageSize);
         if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
-            dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, 
UserType.ADMIN_USER.equals(loginUser.getUserType()) ? 0 : loginUser.getId(), 
searchVal);
+            dataSourceList = dataSourceMapper.selectPaging(dataSourcePage,
+                    UserType.ADMIN_USER.equals(loginUser.getUserType()) ? 0 : 
loginUser.getId(), searchVal);
         } else {
-            Set<Integer> ids = 
resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
 loginUser.getId(), logger);
+            Set<Integer> ids = resourcePermissionCheckService
+                    
.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, 
loginUser.getId(), logger);
             if (ids.isEmpty()) {
                 result.setData(pageInfo);
                 putMsg(result, Status.SUCCESS);
@@ -318,13 +320,15 @@ public class DataSourceServiceImpl extends 
BaseServiceImpl implements DataSource
         if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
             datasourceList = dataSourceMapper.queryDataSourceByType(0, type);
         } else {
-            Set<Integer> ids = 
resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
 loginUser.getId(), logger);
+            Set<Integer> ids = resourcePermissionCheckService
+                    
.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, 
loginUser.getId(), logger);
             if (ids.isEmpty()) {
                 result.put(Constants.DATA_LIST, Collections.emptyList());
                 putMsg(result, Status.SUCCESS);
                 return result;
             }
-            datasourceList = 
dataSourceMapper.selectBatchIds(ids).stream().filter(dataSource -> 
dataSource.getType().getCode() == type).collect(Collectors.toList());
+            datasourceList = dataSourceMapper.selectBatchIds(ids).stream()
+                    .filter(dataSource -> dataSource.getType().getCode() == 
type).collect(Collectors.toList());
         }
         result.put(Constants.DATA_LIST, datasourceList);
         putMsg(result, Status.SUCCESS);
@@ -361,7 +365,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
     @Override
     public Result<Object> checkConnection(DbType type, ConnectionParam 
connectionParam) {
         Result<Object> result = new Result<>();
-        try (Connection connection = 
DataSourceClientProvider.getInstance().getConnection(type, connectionParam)) {
+        try (Connection connection = DataSourceUtils.getConnection(type, 
connectionParam)) {
             if (connection == null) {
                 putMsg(result, Status.CONNECTION_TEST_FAILURE);
                 return result;
@@ -372,7 +376,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
             String message = Optional.of(e).map(Throwable::getCause)
                     .map(Throwable::getMessage)
                     .orElse(e.getMessage());
-            logger.error("datasource test connection error, dbType:{}, 
connectionParam:{}, message:{}.", type, connectionParam, message);
+            logger.error("datasource test connection error, dbType:{}, 
connectionParam:{}, message:{}.", type,
+                    connectionParam, message);
             return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(), 
message);
         }
     }
@@ -391,7 +396,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
             putMsg(result, Status.RESOURCE_NOT_EXIST);
             return result;
         }
-        return checkConnection(dataSource.getType(), 
DataSourceUtils.buildConnectionParams(dataSource.getType(), 
dataSource.getConnectionParams()));
+        return checkConnection(dataSource.getType(),
+                DataSourceUtils.buildConnectionParams(dataSource.getType(), 
dataSource.getConnectionParams()));
     }
 
     /**
@@ -406,14 +412,15 @@ public class DataSourceServiceImpl extends 
BaseServiceImpl implements DataSource
     public Result<Object> delete(User loginUser, int datasourceId) {
         Result<Object> result = new Result<>();
         try {
-            //query datasource by id
+            // query datasource by id
             DataSource dataSource = dataSourceMapper.selectById(datasourceId);
             if (dataSource == null) {
                 logger.error("resource id {} not exist", datasourceId);
                 putMsg(result, Status.RESOURCE_NOT_EXIST);
                 return result;
             }
-            if (!canOperatorPermissions(loginUser, new 
Object[]{dataSource.getId()},AuthorizationType.DATASOURCE,DATASOURCE_DELETE)) {
+            if (!canOperatorPermissions(loginUser, new 
Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE,
+                    DATASOURCE_DELETE)) {
                 putMsg(result, Status.USER_NO_OPERATION_PERM);
                 return result;
             }
@@ -438,7 +445,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
     public Map<String, Object> unauthDatasource(User loginUser, Integer 
userId) {
         Map<String, Object> result = new HashMap<>();
         List<DataSource> datasourceList;
-        if 
(canOperatorPermissions(loginUser,null,AuthorizationType.DATASOURCE,null)) {
+        if (canOperatorPermissions(loginUser, null, 
AuthorizationType.DATASOURCE, null)) {
             // admin gets all data sources except userId
             datasourceList = 
dataSourceMapper.queryDatasourceExceptUserId(userId);
         } else {
@@ -519,7 +526,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
 
             tables = metaData.getTables(
                     connectionParam.getDatabase(),
-                    
getDbSchemaPattern(dataSource.getType(),schema,connectionParam),
+                    getDbSchemaPattern(dataSource.getType(), schema, 
connectionParam),
                     "%", TABLE_TYPES);
             if (null == tables) {
                 putMsg(result, Status.GET_DATASOURCE_TABLES_ERROR);
@@ -549,7 +556,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
     }
 
     @Override
-    public Map<String, Object> getTableColumns(Integer datasourceId,String 
tableName) {
+    public Map<String, Object> getTableColumns(Integer datasourceId, String 
tableName) {
         Map<String, Object> result = new HashMap<>();
 
         DataSource dataSource = dataSourceMapper.selectById(datasourceId);
@@ -615,7 +622,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl 
implements DataSource
         return options;
     }
 
-    private String getDbSchemaPattern(DbType dbType,String 
schema,BaseConnectionParam connectionParam) {
+    private String getDbSchemaPattern(DbType dbType, String schema, 
BaseConnectionParam connectionParam) {
         if (dbType == null) {
             return null;
         }
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
index 2c46170618..30b9cbd150 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
@@ -21,15 +21,14 @@ import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
 import static 
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DATASOURCE_LIST;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+import 
org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
 import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.DataSourceServiceImpl;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.constants.DataSourceConstants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@@ -39,10 +38,8 @@ import 
org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
 import 
org.apache.dolphinscheduler.plugin.datasource.hive.param.HiveDataSourceParamDTO;
-import 
org.apache.dolphinscheduler.plugin.datasource.mysql.param.MySQLDataSourceParamDTO;
 import 
org.apache.dolphinscheduler.plugin.datasource.oracle.param.OracleDataSourceParamDTO;
 import 
org.apache.dolphinscheduler.plugin.datasource.postgresql.param.PostgreSQLDataSourceParamDTO;
-import 
org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbConnectType;
 import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -51,9 +48,7 @@ import org.apache.commons.collections.CollectionUtils;
 
 import java.sql.Connection;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -61,11 +56,9 @@ import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
-import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -81,6 +74,7 @@ import org.slf4j.LoggerFactory;
 @PowerMockIgnore({"sun.security.*", "javax.net.*"})
 @PrepareForTest({DataSourceUtils.class, CommonUtils.class, 
DataSourceClientProvider.class, PasswordUtils.class})
 public class DataSourceServiceTest {
+
     private static final Logger baseServiceLogger = 
LoggerFactory.getLogger(BaseServiceImpl.class);
     private static final Logger logger = 
LoggerFactory.getLogger(DataSourceServiceTest.class);
     private static final Logger dataSourceServiceLogger = 
LoggerFactory.getLogger(DataSourceServiceImpl.class);
@@ -124,22 +118,26 @@ public class DataSourceServiceTest {
         DbType dataSourceType = postgreSqlDatasourceParam.getType();
         // data source exits
         
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
-        Result connectionResult = new 
Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), 
Status.DATASOURCE_CONNECT_FAILED.getMsg());
-        //PowerMockito.when(dataSourceService.checkConnection(dataSourceType, 
parameter)).thenReturn(connectionResult);
-        
PowerMockito.doReturn(connectionResult).when(dataSourceService).checkConnection(dataSourceType,
 connectionParam);
+        Result connectionResult =
+                new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), 
Status.DATASOURCE_CONNECT_FAILED.getMsg());
+        // PowerMockito.when(dataSourceService.checkConnection(dataSourceType, 
parameter)).thenReturn(connectionResult);
+        
PowerMockito.doReturn(connectionResult).when(dataSourceService).checkConnection(dataSourceType,
+                connectionParam);
         Result connectFailedResult = 
dataSourceService.createDataSource(loginUser, postgreSqlDatasourceParam);
         Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(), 
connectFailedResult.getCode().intValue());
 
         // data source exits
         
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
         connectionResult = new Result(Status.SUCCESS.getCode(), 
Status.SUCCESS.getMsg());
-        PowerMockito.when(dataSourceService.checkConnection(dataSourceType, 
connectionParam)).thenReturn(connectionResult);
+        PowerMockito.when(dataSourceService.checkConnection(dataSourceType, 
connectionParam))
+                .thenReturn(connectionResult);
         Result notValidError = dataSourceService.createDataSource(loginUser, 
postgreSqlDatasourceParam);
         Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), 
notValidError.getCode().intValue());
 
         // success
         
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
-        PowerMockito.when(dataSourceService.checkConnection(dataSourceType, 
connectionParam)).thenReturn(connectionResult);
+        PowerMockito.when(dataSourceService.checkConnection(dataSourceType, 
connectionParam))
+                .thenReturn(connectionResult);
         Result success = dataSourceService.createDataSource(loginUser, 
postgreSqlDatasourceParam);
         Assert.assertEquals(Status.SUCCESS.getCode(), 
success.getCode().intValue());
     }
@@ -162,13 +160,15 @@ public class DataSourceServiceTest {
 
         // data source not exits
         
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null);
-        Result resourceNotExits = 
dataSourceService.updateDataSource(dataSourceId, loginUser, 
postgreSqlDatasourceParam);
+        Result resourceNotExits =
+                dataSourceService.updateDataSource(dataSourceId, loginUser, 
postgreSqlDatasourceParam);
         Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getCode(), 
resourceNotExits.getCode().intValue());
         // user no operation perm
         DataSource dataSource = new DataSource();
         dataSource.setUserId(0);
         
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
-        Result userNoOperationPerm = 
dataSourceService.updateDataSource(dataSourceId, loginUser, 
postgreSqlDatasourceParam);
+        Result userNoOperationPerm =
+                dataSourceService.updateDataSource(dataSourceId, loginUser, 
postgreSqlDatasourceParam);
         Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(), 
userNoOperationPerm.getCode().intValue());
 
         // data source name exits
@@ -177,7 +177,8 @@ public class DataSourceServiceTest {
         dataSourceList.add(dataSource);
         
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
         
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(dataSourceList);
-        Result dataSourceNameExist = 
dataSourceService.updateDataSource(dataSourceId, loginUser, 
postgreSqlDatasourceParam);
+        Result dataSourceNameExist =
+                dataSourceService.updateDataSource(dataSourceId, loginUser, 
postgreSqlDatasourceParam);
         Assert.assertEquals(Status.DATASOURCE_EXIST.getCode(), 
dataSourceNameExist.getCode().intValue());
 
         // data source connect failed
@@ -186,15 +187,18 @@ public class DataSourceServiceTest {
         
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
         
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null);
         Result connectionResult = new Result(Status.SUCCESS.getCode(), 
Status.SUCCESS.getMsg());
-        PowerMockito.when(dataSourceService.checkConnection(dataSourceType, 
connectionParam)).thenReturn(connectionResult);
+        PowerMockito.when(dataSourceService.checkConnection(dataSourceType, 
connectionParam))
+                .thenReturn(connectionResult);
         Result connectFailed = 
dataSourceService.updateDataSource(dataSourceId, loginUser, 
postgreSqlDatasourceParam);
         Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(), 
connectFailed.getCode().intValue());
 
-        //success
+        // success
         
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
         
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null);
-        connectionResult = new 
Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), 
Status.DATASOURCE_CONNECT_FAILED.getMsg());
-        PowerMockito.when(dataSourceService.checkConnection(dataSourceType, 
connectionParam)).thenReturn(connectionResult);
+        connectionResult =
+                new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), 
Status.DATASOURCE_CONNECT_FAILED.getMsg());
+        PowerMockito.when(dataSourceService.checkConnection(dataSourceType, 
connectionParam))
+                .thenReturn(connectionResult);
         Result success = dataSourceService.updateDataSource(dataSourceId, 
loginUser, postgreSqlDatasourceParam);
         Assert.assertEquals(Status.SUCCESS.getCode(), 
success.getCode().intValue());
 
@@ -209,12 +213,15 @@ public class DataSourceServiceTest {
         String searchVal = "";
         int pageNo = 1;
         int pageSize = 10;
-        
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
 null, loginUser.getId(), DATASOURCE_LIST, baseServiceLogger)).thenReturn(true);
-        
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
 null, loginUser.getId(), baseServiceLogger)).thenReturn(true);
-        
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
 loginUser.getId(), baseServiceLogger)).thenReturn(ids);
+        
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
 null,
+                loginUser.getId(), DATASOURCE_LIST, 
baseServiceLogger)).thenReturn(true);
+        
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
 null,
+                loginUser.getId(), baseServiceLogger)).thenReturn(true);
+        
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
+                loginUser.getId(), baseServiceLogger)).thenReturn(ids);
 
         Result result = dataSourceService.queryDataSourceListPaging(loginUser, 
searchVal, pageNo, pageSize);
-        Assert.assertEquals(Status.SUCCESS.getCode(),(int)result.getCode());
+        Assert.assertEquals(Status.SUCCESS.getCode(), (int) result.getCode());
     }
 
     @Test
@@ -230,9 +237,11 @@ public class DataSourceServiceTest {
         User loginUser = getAdminUser();
         int dataSourceId = 1;
         Result result = new Result();
-        
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
 null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
-        
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
 new Object[]{dataSourceId}, -1, baseServiceLogger)).thenReturn(true);
-        //resource not exist
+        
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
 null,
+                loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
+        
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
+                new Object[]{dataSourceId}, -1, 
baseServiceLogger)).thenReturn(true);
+        // resource not exist
         dataSourceService.putMsg(result, Status.RESOURCE_NOT_EXIST);
         
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null);
         Assert.assertEquals(result.getCode(), 
dataSourceService.delete(loginUser, dataSourceId).getCode());
@@ -250,8 +259,10 @@ public class DataSourceServiceTest {
         loginUser.setUserType(UserType.ADMIN_USER);
         loginUser.setId(1);
         dataSource.setId(22);
-        
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
 null, loginUser.getId(), DATASOURCE_DELETE, 
baseServiceLogger)).thenReturn(true);
-        
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,new
 Object[]{dataSource.getId()} , 0, baseServiceLogger)).thenReturn(true);
+        
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
 null,
+                loginUser.getId(), DATASOURCE_DELETE, 
baseServiceLogger)).thenReturn(true);
+        
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
+                new Object[]{dataSource.getId()}, 0, 
baseServiceLogger)).thenReturn(true);
         
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
         Assert.assertEquals(result.getCode(), 
dataSourceService.delete(loginUser, dataSourceId).getCode());
 
@@ -263,8 +274,10 @@ public class DataSourceServiceTest {
         loginUser.setId(1);
         loginUser.setUserType(UserType.ADMIN_USER);
         int userId = 3;
-        
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
 null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
-        
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
 null, 0, baseServiceLogger)).thenReturn(true);
+        
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
 null,
+                loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
+        
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
 null, 0,
+                baseServiceLogger)).thenReturn(true);
         // test admin user
         
Mockito.when(dataSourceMapper.queryAuthedDatasource(userId)).thenReturn(getSingleDataSourceList());
         
Mockito.when(dataSourceMapper.queryDatasourceExceptUserId(userId)).thenReturn(getDataSourceList());
@@ -276,7 +289,8 @@ public class DataSourceServiceTest {
         // test non-admin user
         loginUser.setId(2);
         loginUser.setUserType(UserType.GENERAL_USER);
-        
Mockito.when(dataSourceMapper.selectByMap(Collections.singletonMap("user_id", 
loginUser.getId()))).thenReturn(getDataSourceList());
+        
Mockito.when(dataSourceMapper.selectByMap(Collections.singletonMap("user_id", 
loginUser.getId())))
+                .thenReturn(getDataSourceList());
         result = dataSourceService.unauthDatasource(loginUser, userId);
         logger.info(result.toString());
         dataSources = (List<DataSource>) result.get(Constants.DATA_LIST);
@@ -311,9 +325,12 @@ public class DataSourceServiceTest {
         loginUser.setUserType(UserType.GENERAL_USER);
         Set<Integer> dataSourceIds = new HashSet<>();
         dataSourceIds.add(1);
-        
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
 null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
-        
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
 null, 0, baseServiceLogger)).thenReturn(true);
-        
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
 loginUser.getId(), dataSourceServiceLogger)).thenReturn(dataSourceIds);
+        
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
 null,
+                loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
+        
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
 null, 0,
+                baseServiceLogger)).thenReturn(true);
+        
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
+                loginUser.getId(), 
dataSourceServiceLogger)).thenReturn(dataSourceIds);
 
         DataSource dataSource = new DataSource();
         dataSource.setType(DbType.MYSQL);
@@ -361,8 +378,9 @@ public class DataSourceServiceTest {
         dataSource.setName("test");
         dataSource.setNote("Note");
         dataSource.setType(DbType.ORACLE);
-        
dataSource.setConnectionParams("{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
-                + 
"\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
+        dataSource.setConnectionParams(
+                
"{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
+                        + 
"\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
 
         return dataSource;
     }
@@ -373,8 +391,9 @@ public class DataSourceServiceTest {
         dataSource.setName("test");
         dataSource.setNote("Note");
         dataSource.setType(DbType.ORACLE);
-        
dataSource.setConnectionParams("{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
-                + 
"\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
+        dataSource.setConnectionParams(
+                
"{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
+                        + 
"\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
 
         return dataSource;
     }
@@ -390,8 +409,9 @@ public class DataSourceServiceTest {
         
oracleDatasourceParamDTO.setConnectType(DbConnectType.ORACLE_SERVICE_NAME);
 
         ConnectionParam connectionParam = 
DataSourceUtils.buildConnectionParams(oracleDatasourceParamDTO);
-        String expected = 
"{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\","
-                + 
"\"driverClassName\":\"oracle.jdbc.OracleDriver\",\"validationQuery\":\"select 
1 from dual\",\"connectType\":\"ORACLE_SERVICE_NAME\"}";
+        String expected =
+                
"{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\","
+                        + 
"\"driverClassName\":\"oracle.jdbc.OracleDriver\",\"validationQuery\":\"select 
1 from dual\",\"connectType\":\"ORACLE_SERVICE_NAME\"}";
         Assert.assertEquals(expected, JSONUtils.toJsonString(connectionParam));
 
         PowerMockito.mockStatic(CommonUtils.class);
@@ -410,36 +430,38 @@ public class DataSourceServiceTest {
         
hiveDataSourceParamDTO.setLoginUserKeytabUsername("test2/[email protected]");
         connectionParam = 
DataSourceUtils.buildConnectionParams(hiveDataSourceParamDTO);
 
-        expected = 
"{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:hive2://192.168.9.1:10000\",\"database\":\"im\","
-                + 
"\"jdbcUrl\":\"jdbc:hive2://192.168.9.1:10000/im\",\"driverClassName\":\"org.apache.hive.jdbc.HiveDriver\",\"validationQuery\":\"select
 1\","
-                + 
"\"principal\":\"hive/[email protected]\",\"javaSecurityKrb5Conf\":\"/opt/krb5.conf\",\"loginUserKeytabUsername\":\"test2/[email protected]\","
-                + "\"loginUserKeytabPath\":\"/opt/hdfs.headless.keytab\"}";
+        expected =
+                
"{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:hive2://192.168.9.1:10000\",\"database\":\"im\","
+                        + 
"\"jdbcUrl\":\"jdbc:hive2://192.168.9.1:10000/im\",\"driverClassName\":\"org.apache.hive.jdbc.HiveDriver\",\"validationQuery\":\"select
 1\","
+                        + 
"\"principal\":\"hive/[email protected]\",\"javaSecurityKrb5Conf\":\"/opt/krb5.conf\",\"loginUserKeytabUsername\":\"test2/[email protected]\","
+                        + 
"\"loginUserKeytabPath\":\"/opt/hdfs.headless.keytab\"}";
         Assert.assertEquals(expected, JSONUtils.toJsonString(connectionParam));
 
     }
 
     @Test
     public void buildParameterWithDecodePassword() {
-//        try (MockedStatic<PropertyUtils> mockedStaticPropertyUtils = 
Mockito.mockStatic(PropertyUtils.class)) {
-//            mockedStaticPropertyUtils
-//                    .when(() -> 
PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, 
false))
-//                    .thenReturn(true);
-//            Map<String, String> other = new HashMap<>();
-//            other.put("autoDeserialize", "yes");
-//            other.put("allowUrlInLocalInfile", "true");
-//            MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new 
MySQLDataSourceParamDTO();
-//            mysqlDatasourceParamDTO.setHost("192.168.9.1");
-//            mysqlDatasourceParamDTO.setPort(1521);
-//            mysqlDatasourceParamDTO.setDatabase("im");
-//            mysqlDatasourceParamDTO.setUserName("test");
-//            mysqlDatasourceParamDTO.setPassword("123456");
-//            mysqlDatasourceParamDTO.setOther(other);
-//            ConnectionParam connectionParam = 
DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO);
-//            String expected =
-//                    
"{\"user\":\"test\",\"password\":\"bnVsbE1USXpORFUy\",\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/"
-//                            + 
"im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\",\"validationQuery\":\"select
 
1\",\"props\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\"}}";
-//            Assertions.assertEquals(expected, 
JSONUtils.toJsonString(connectionParam));
-//        }
+        // try (MockedStatic<PropertyUtils> mockedStaticPropertyUtils = 
Mockito.mockStatic(PropertyUtils.class)) {
+        // mockedStaticPropertyUtils
+        // .when(() -> 
PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, 
false))
+        // .thenReturn(true);
+        // Map<String, String> other = new HashMap<>();
+        // other.put("autoDeserialize", "yes");
+        // other.put("allowUrlInLocalInfile", "true");
+        // MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new 
MySQLDataSourceParamDTO();
+        // mysqlDatasourceParamDTO.setHost("192.168.9.1");
+        // mysqlDatasourceParamDTO.setPort(1521);
+        // mysqlDatasourceParamDTO.setDatabase("im");
+        // mysqlDatasourceParamDTO.setUserName("test");
+        // mysqlDatasourceParamDTO.setPassword("123456");
+        // mysqlDatasourceParamDTO.setOther(other);
+        // ConnectionParam connectionParam = 
DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO);
+        // String expected =
+        // 
"{\"user\":\"test\",\"password\":\"bnVsbE1USXpORFUy\",\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/"
+        // + 
"im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\",\"validationQuery\":\"select
+        // 
1\",\"props\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\"}}";
+        // Assertions.assertEquals(expected, 
JSONUtils.toJsonString(connectionParam));
+        // }
     }
 
     /**
@@ -475,15 +497,12 @@ public class DataSourceServiceTest {
         ConnectionParam connectionParam = 
DataSourceUtils.buildConnectionParams(postgreSqlDatasourceParam);
 
         PowerMockito.mockStatic(DataSourceUtils.class);
-        PowerMockito.mockStatic(DataSourceClientProvider.class);
-        DataSourceClientProvider clientProvider = 
PowerMockito.mock(DataSourceClientProvider.class);
-        
PowerMockito.when(DataSourceClientProvider.getInstance()).thenReturn(clientProvider);
-
+        PowerMockito.when(DataSourceUtils.getConnection(Mockito.any(), 
Mockito.any())).thenReturn(null);
         Result result = dataSourceService.checkConnection(dataSourceType, 
connectionParam);
         Assert.assertEquals(Status.CONNECTION_TEST_FAILURE.getCode(), 
result.getCode().intValue());
 
         Connection connection = PowerMockito.mock(Connection.class);
-        PowerMockito.when(clientProvider.getConnection(Mockito.any(), 
Mockito.any())).thenReturn(connection);
+        PowerMockito.when(DataSourceUtils.getConnection(Mockito.any(), 
Mockito.any())).thenReturn(connection);
         result = dataSourceService.checkConnection(dataSourceType, 
connectionParam);
         Assert.assertEquals(Status.SUCCESS.getCode(), 
result.getCode().intValue());
 
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
index 0841e33eeb..773ad501db 100644
--- 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
@@ -79,8 +79,8 @@ public class CommonUtils {
      * @param loginUserKeytabPath loginUserKeytabPath
      * @throws IOException errors
      */
-    public static void loadKerberosConf(String javaSecurityKrb5Conf, String 
loginUserKeytabUsername,
-                                        String loginUserKeytabPath) throws 
IOException {
+    public static synchronized void loadKerberosConf(String 
javaSecurityKrb5Conf, String loginUserKeytabUsername,
+                                                     String 
loginUserKeytabPath) throws IOException {
         Configuration configuration = new Configuration();
         
configuration.setClassLoader(configuration.getClass().getClassLoader());
         loadKerberosConf(javaSecurityKrb5Conf, loginUserKeytabUsername, 
loginUserKeytabPath, configuration);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index a6b3d7da7b..61b67dd2dd 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -17,29 +17,15 @@
 
 package org.apache.dolphinscheduler.plugin.task.datax;
 
-import com.alibaba.druid.sql.ast.SQLStatement;
-import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
-import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
-import com.alibaba.druid.sql.ast.statement.SQLSelect;
-import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
-import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
-import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
-import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
-import com.alibaba.druid.sql.parser.SQLStatementParser;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import static 
org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import 
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -74,11 +60,23 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
+import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
+import com.alibaba.druid.sql.ast.statement.SQLSelect;
+import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
+import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
+import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
+import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
+import com.alibaba.druid.sql.parser.SQLStatementParser;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class DataxTask extends AbstractTask {
+
     /**
      * jvm parameters
      */
@@ -141,7 +139,8 @@ public class DataxTask extends AbstractTask {
             throw new RuntimeException("datax task params is not valid");
         }
 
-        dataxTaskExecutionContext = 
dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+        dataxTaskExecutionContext =
+                
dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
     }
 
     /**
@@ -195,8 +194,7 @@ public class DataxTask extends AbstractTask {
      * @return datax json file name
      * @throws Exception if error throws Exception
      */
-    private String buildDataxJsonFile(Map<String, Property> paramsMap)
-            throws Exception {
+    private String buildDataxJsonFile(Map<String, Property> paramsMap) throws 
Exception {
         // generate json
         String fileName = String.format("%s/%s_job.json",
                 taskExecutionContext.getExecutePath(),
@@ -273,7 +271,8 @@ public class DataxTask extends AbstractTask {
         ArrayNode tableArr = writerConn.putArray("table");
         tableArr.add(dataXParameters.getTargetTable());
 
-        writerConn.put("jdbcUrl", 
DataSourceUtils.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), 
dataTargetCfg));
+        writerConn.put("jdbcUrl",
+                
DataSourceUtils.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), 
dataTargetCfg));
         writerConnArr.add(writerConn);
 
         ObjectNode writerParam = JSONUtils.createObjectNode();
@@ -379,8 +378,7 @@ public class DataxTask extends AbstractTask {
      * @return shell command file name
      * @throws Exception if error throws Exception
      */
-    private String buildShellCommandFile(String jobConfigFilePath, Map<String, 
Property> paramsMap)
-            throws Exception {
+    private String buildShellCommandFile(String jobConfigFilePath, Map<String, 
Property> paramsMap) throws Exception {
         // generate scripts
         String fileName = String.format("%s/%s_node.%s",
                 taskExecutionContext.getExecutePath(),
@@ -468,7 +466,8 @@ public class DataxTask extends AbstractTask {
      * @param sql sql for data synchronization
      * @return Keyword converted column names
      */
-    private String[] parsingSqlColumnNames(DbType sourceType, DbType 
targetType, BaseConnectionParam dataSourceCfg, String sql) {
+    private String[] parsingSqlColumnNames(DbType sourceType, DbType 
targetType, BaseConnectionParam dataSourceCfg,
+                                           String sql) {
         String[] columnNames = 
tryGrammaticalAnalysisSqlColumnNames(sourceType, sql);
 
         if (columnNames == null || columnNames.length == 0) {
@@ -565,7 +564,7 @@ public class DataxTask extends AbstractTask {
         sql = sql.replace(";", "");
 
         try (
-                Connection connection = 
DataSourceClientProvider.getInstance().getConnection(sourceType, 
baseDataSource);
+                Connection connection = 
DataSourceUtils.getConnection(sourceType, baseDataSource);
                 PreparedStatement stmt = connection.prepareStatement(sql);
                 ResultSet resultSet = stmt.executeQuery()) {
 
@@ -573,9 +572,9 @@ public class DataxTask extends AbstractTask {
             int num = md.getColumnCount();
             columnNames = new String[num];
             for (int i = 1; i <= num; i++) {
-                columnNames[i - 1] = md.getColumnName(i).replace("t.","");
+                columnNames[i - 1] = md.getColumnName(i).replace("t.", "");
             }
-        } catch (SQLException | ExecutionException e) {
+        } catch (SQLException e) {
             logger.error(e.getMessage(), e);
             return null;
         }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index 69d04db280..e21c9b9f26 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -21,7 +21,6 @@ import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import 
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
@@ -108,7 +107,7 @@ public class ProcedureTask extends AbstractTask {
                             
procedureTaskExecutionContext.getConnectionParams());
 
             // get jdbc connection
-            connection = 
DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
+            connection = DataSourceUtils.getConnection(dbType, 
connectionParam);
 
             Map<Integer, Property> sqlParamsMap = new HashMap<>();
             Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap() == null ? Maps.newHashMap()
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index 9e4c1df861..6d021c7fdf 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -17,12 +17,7 @@
 
 package org.apache.dolphinscheduler.plugin.task.sql;
 
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import 
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@@ -44,8 +39,6 @@ import 
org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
-import org.slf4j.Logger;
-
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
@@ -65,6 +58,11 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import org.slf4j.Logger;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class SqlTask extends AbstractTask {
 
     /**
@@ -86,7 +84,8 @@ public class SqlTask extends AbstractTask {
      * create function format
      * include replace here which can be compatible with more cases, for 
example a long-running Spark session in Kyuubi will keep its own temp functions 
instead of destroying them right away
      */
-    private static final String CREATE_OR_REPLACE_FUNCTION_FORMAT = "create or 
replace temporary function {0} as ''{1}''";
+    private static final String CREATE_OR_REPLACE_FUNCTION_FORMAT =
+            "create or replace temporary function {0} as ''{1}''";
 
     /**
      * default query sql limit
@@ -110,7 +109,8 @@ public class SqlTask extends AbstractTask {
             throw new RuntimeException("sql task params is not valid");
         }
 
-        sqlTaskExecutionContext = 
sqlParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+        sqlTaskExecutionContext =
+                
sqlParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
     }
 
     @Override
@@ -121,7 +121,8 @@ public class SqlTask extends AbstractTask {
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         logger.info("Full sql parameters: {}", sqlParameters);
-        logger.info("sql type : {}, datasource : {}, sql : {} , localParams : 
{},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit 
 {}",
+        logger.info(
+                "sql type : {}, datasource : {}, sql : {} , localParams : 
{},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit 
 {}",
                 sqlParameters.getType(),
                 sqlParameters.getDatasource(),
                 sqlParameters.getSql(),
@@ -139,10 +140,11 @@ public class SqlTask extends AbstractTask {
                     sqlTaskExecutionContext.getConnectionParams());
 
             // ready to execute SQL and parameter entity Map
-            List<SqlBinds> mainStatementSqlBinds = 
SqlSplitter.split(sqlParameters.getSql(), sqlParameters.getSegmentSeparator())
-                    .stream()
-                    .map(this::getSqlAndSqlParamsMap)
-                    .collect(Collectors.toList());
+            List<SqlBinds> mainStatementSqlBinds =
+                    SqlSplitter.split(sqlParameters.getSql(), 
sqlParameters.getSegmentSeparator())
+                            .stream()
+                            .map(this::getSqlAndSqlParamsMap)
+                            .collect(Collectors.toList());
 
             List<SqlBinds> preStatementSqlBinds = 
Optional.ofNullable(sqlParameters.getPreStatements())
                     .orElse(new ArrayList<>())
@@ -190,7 +192,7 @@ public class SqlTask extends AbstractTask {
         try {
 
             // create connection
-            connection = 
DataSourceClientProvider.getInstance().getConnection(DbType.valueOf(sqlParameters.getType()),
 baseConnectionParam);
+            connection = 
DataSourceUtils.getConnection(DbType.valueOf(sqlParameters.getType()), 
baseConnectionParam);
             // create temp function
             if (CollectionUtils.isNotEmpty(createFuncs)) {
                 createTempFunction(connection, createFuncs);
@@ -210,7 +212,7 @@ public class SqlTask extends AbstractTask {
                 String updateResult = executeUpdate(connection, 
mainStatementsBinds, "main");
                 result = setNonQuerySqlReturn(updateResult, 
sqlParameters.getLocalParams());
             }
-            //deal out params
+            // deal out params
             sqlParameters.dealOutParam(result);
 
             // post execute
@@ -265,7 +267,8 @@ public class SqlTask extends AbstractTask {
                 resultJSONArray.add(mapOfColValues);
                 rowCount++;
             }
-            int displayRows = sqlParameters.getDisplayRows() > 0 ? 
sqlParameters.getDisplayRows() : TaskConstants.DEFAULT_DISPLAY_ROWS;
+            int displayRows = sqlParameters.getDisplayRows() > 0 ? 
sqlParameters.getDisplayRows()
+                    : TaskConstants.DEFAULT_DISPLAY_ROWS;
             displayRows = Math.min(displayRows, rowCount);
             logger.info("display sql result {} rows as follows:", displayRows);
             for (int i = 0; i < displayRows; i++) {
@@ -305,12 +308,14 @@ public class SqlTask extends AbstractTask {
         }
     }
 
-    private String executeUpdate(Connection connection, List<SqlBinds> 
statementsBinds, String handlerType) throws Exception {
+    private String executeUpdate(Connection connection, List<SqlBinds> 
statementsBinds,
+                                 String handlerType) throws Exception {
         int result = 0;
         for (SqlBinds sqlBind : statementsBinds) {
             try (PreparedStatement statement = 
prepareStatementAndBind(connection, sqlBind)) {
                 result = statement.executeUpdate();
-                logger.info("{} statement execute update result: {}, for sql: 
{}", handlerType, result, sqlBind.getSql());
+                logger.info("{} statement execute update result: {}, for sql: 
{}", handlerType, result,
+                        sqlBind.getSql());
             }
         }
         return String.valueOf(result);
@@ -371,7 +376,8 @@ public class SqlTask extends AbstractTask {
                     ParameterUtils.setInParameter(entry.getKey(), stmt, 
prop.getType(), prop.getValue());
                 }
             }
-            logger.info("prepare statement replace sql : {}, sql parameters : 
{}", sqlBinds.getSql(), sqlBinds.getParamsMap());
+            logger.info("prepare statement replace sql : {}, sql parameters : 
{}", sqlBinds.getSql(),
+                    sqlBinds.getParamsMap());
             return stmt;
         } catch (Exception exception) {
             throw new TaskException("SQL task prepareStatementAndBind error", 
exception);
@@ -387,14 +393,15 @@ public class SqlTask extends AbstractTask {
      * @param sqlParamsMap sql params map
      */
     private void printReplacedSql(String content, String formatSql, String 
rgex, Map<Integer, Property> sqlParamsMap) {
-        //parameter print style
+        // parameter print style
         logger.info("after replace sql , preparing : {}", formatSql);
         StringBuilder logPrint = new StringBuilder("replaced sql , 
parameters:");
         if (sqlParamsMap == null) {
             logger.info("printReplacedSql: sqlParamsMap is null.");
         } else {
             for (int i = 1; i <= sqlParamsMap.size(); i++) {
-                
logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType()).append(")");
+                
logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType())
+                        .append(")");
             }
         }
         logger.info("Sql Params are {}", logPrint);
@@ -428,8 +435,8 @@ public class SqlTask extends AbstractTask {
         }
 
         // special characters need to be escaped, ${} needs to be escaped
-        setSqlParamsMap(sql, rgex, sqlParamsMap, 
paramsMap,taskExecutionContext.getTaskInstanceId());
-        //Replace the original value in sql !{...} ,Does not participate in 
precompilation
+        setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap, 
taskExecutionContext.getTaskInstanceId());
+        // Replace the original value in sql !{...} ,Does not participate in 
precompilation
         String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*";
         sql = replaceOriginalValue(sql, rgexo, paramsMap);
         // replace the ${} of the SQL statement with the Placeholder
@@ -485,7 +492,8 @@ public class SqlTask extends AbstractTask {
      */
     private List<String> buildTempFuncSql(List<UdfFuncParameters> 
udfFuncParameters) {
         return udfFuncParameters.stream().map(value -> MessageFormat
-                .format(CREATE_OR_REPLACE_FUNCTION_FORMAT, 
value.getFuncName(), value.getClassName())).collect(Collectors.toList());
+                .format(CREATE_OR_REPLACE_FUNCTION_FORMAT, 
value.getFuncName(), value.getClassName()))
+                .collect(Collectors.toList());
     }
 
     /**
@@ -499,7 +507,8 @@ public class SqlTask extends AbstractTask {
             String prefixPath = defaultFS.startsWith("file://") ? "file://" : 
defaultFS;
             String uploadPath = 
CommonUtils.getHdfsUdfDir(value.getTenantCode());
             String resourceFullName = value.getResourceName();
-            resourceFullName = resourceFullName.startsWith("/") ? 
resourceFullName : String.format("/%s", resourceFullName);
+            resourceFullName =
+                    resourceFullName.startsWith("/") ? resourceFullName : 
String.format("/%s", resourceFullName);
             return String.format("add jar %s%s%s", prefixPath, uploadPath, 
resourceFullName);
         }).collect(Collectors.toList());
     }

Reply via email to