This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.1.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.8-prepare by this push:
new 8e058fc227 Use adhoc connection in sql/procedure task (#14670)
8e058fc227 is described below
commit 8e058fc227c2ac958e4cd345eb855b1e4d69a2d9
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Jul 31 22:44:43 2023 +0800
Use adhoc connection in sql/procedure task (#14670)
---
.../api/service/impl/DataSourceServiceImpl.java | 57 ++++----
.../api/service/DataSourceServiceTest.java | 159 ++++++++++++---------
.../plugin/datasource/api/utils/CommonUtils.java | 4 +-
.../plugin/task/datax/DataxTask.java | 49 ++++---
.../plugin/task/procedure/ProcedureTask.java | 3 +-
.../dolphinscheduler/plugin/task/sql/SqlTask.java | 61 ++++----
6 files changed, 183 insertions(+), 150 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
index 7080b83646..309d2ce01f 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
@@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
-import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@@ -70,8 +69,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.node.ObjectNode;
-
-
/**
* data source service impl
*/
@@ -86,7 +83,6 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
@Autowired
private DataSourceUserMapper datasourceUserMapper;
-
private static final String TABLE = "TABLE";
private static final String VIEW = "VIEW";
private static final String[] TABLE_TYPES = new String[]{TABLE, VIEW};
@@ -105,7 +101,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
public Result<Object> createDataSource(User loginUser,
BaseDataSourceParamDTO datasourceParam) {
DataSourceUtils.checkDatasourceParam(datasourceParam);
Result<Object> result = new Result<>();
- if (!canOperatorPermissions(loginUser,null,
AuthorizationType.DATASOURCE,
ApiFuncIdentificationConstant.DATASOURCE_CREATE_DATASOURCE)) {
+ if (!canOperatorPermissions(loginUser, null,
AuthorizationType.DATASOURCE,
+ ApiFuncIdentificationConstant.DATASOURCE_CREATE_DATASOURCE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -114,7 +111,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
putMsg(result, Status.DATASOURCE_EXIST);
return result;
}
- if(checkDescriptionLength(datasourceParam.getNote())){
+ if (checkDescriptionLength(datasourceParam.getNote())) {
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -141,7 +138,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
try {
dataSourceMapper.insert(dataSource);
putMsg(result, Status.SUCCESS);
- permissionPostHandle(AuthorizationType.DATASOURCE,
loginUser.getId(), Collections.singletonList(dataSource.getId()), logger);
+ permissionPostHandle(AuthorizationType.DATASOURCE,
loginUser.getId(),
+ Collections.singletonList(dataSource.getId()), logger);
} catch (DuplicateKeyException ex) {
logger.error("Create datasource error.", ex);
putMsg(result, Status.DATASOURCE_EXIST);
@@ -168,22 +166,24 @@ public class DataSourceServiceImpl extends
BaseServiceImpl implements DataSource
return result;
}
- if (!canOperatorPermissions(loginUser,new
Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE, DATASOURCE_UPDATE))
{
+ if (!canOperatorPermissions(loginUser, new
Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE,
+ DATASOURCE_UPDATE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
- //check name can use or not
+ // check name can use or not
if (!dataSource.getName().trim().equals(dataSource.getName()) &&
checkName(dataSource.getName())) {
putMsg(result, Status.DATASOURCE_EXIST);
return result;
}
- if(checkDescriptionLength(dataSourceParam.getNote())){
+ if (checkDescriptionLength(dataSourceParam.getNote())) {
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
- //check password,if the password is not updated, set to the old
password.
- BaseConnectionParam connectionParam = (BaseConnectionParam)
DataSourceUtils.buildConnectionParams(dataSourceParam);
+ // check password,if the password is not updated, set to the old
password.
+ BaseConnectionParam connectionParam =
+ (BaseConnectionParam)
DataSourceUtils.buildConnectionParams(dataSourceParam);
String password = connectionParam.getPassword();
if (StringUtils.isBlank(password)) {
String oldConnectionParams = dataSource.getConnectionParams();
@@ -262,9 +262,11 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
Page<DataSource> dataSourcePage = new Page<>(pageNo, pageSize);
PageInfo<DataSource> pageInfo = new PageInfo<>(pageNo, pageSize);
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
- dataSourceList = dataSourceMapper.selectPaging(dataSourcePage,
UserType.ADMIN_USER.equals(loginUser.getUserType()) ? 0 : loginUser.getId(),
searchVal);
+ dataSourceList = dataSourceMapper.selectPaging(dataSourcePage,
+ UserType.ADMIN_USER.equals(loginUser.getUserType()) ? 0 :
loginUser.getId(), searchVal);
} else {
- Set<Integer> ids =
resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
loginUser.getId(), logger);
+ Set<Integer> ids = resourcePermissionCheckService
+
.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
loginUser.getId(), logger);
if (ids.isEmpty()) {
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
@@ -318,13 +320,15 @@ public class DataSourceServiceImpl extends
BaseServiceImpl implements DataSource
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
datasourceList = dataSourceMapper.queryDataSourceByType(0, type);
} else {
- Set<Integer> ids =
resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
loginUser.getId(), logger);
+ Set<Integer> ids = resourcePermissionCheckService
+
.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
loginUser.getId(), logger);
if (ids.isEmpty()) {
result.put(Constants.DATA_LIST, Collections.emptyList());
putMsg(result, Status.SUCCESS);
return result;
}
- datasourceList =
dataSourceMapper.selectBatchIds(ids).stream().filter(dataSource ->
dataSource.getType().getCode() == type).collect(Collectors.toList());
+ datasourceList = dataSourceMapper.selectBatchIds(ids).stream()
+ .filter(dataSource -> dataSource.getType().getCode() ==
type).collect(Collectors.toList());
}
result.put(Constants.DATA_LIST, datasourceList);
putMsg(result, Status.SUCCESS);
@@ -361,7 +365,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
@Override
public Result<Object> checkConnection(DbType type, ConnectionParam
connectionParam) {
Result<Object> result = new Result<>();
- try (Connection connection =
DataSourceClientProvider.getInstance().getConnection(type, connectionParam)) {
+ try (Connection connection = DataSourceUtils.getConnection(type,
connectionParam)) {
if (connection == null) {
putMsg(result, Status.CONNECTION_TEST_FAILURE);
return result;
@@ -372,7 +376,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
String message = Optional.of(e).map(Throwable::getCause)
.map(Throwable::getMessage)
.orElse(e.getMessage());
- logger.error("datasource test connection error, dbType:{},
connectionParam:{}, message:{}.", type, connectionParam, message);
+ logger.error("datasource test connection error, dbType:{},
connectionParam:{}, message:{}.", type,
+ connectionParam, message);
return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(),
message);
}
}
@@ -391,7 +396,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
- return checkConnection(dataSource.getType(),
DataSourceUtils.buildConnectionParams(dataSource.getType(),
dataSource.getConnectionParams()));
+ return checkConnection(dataSource.getType(),
+ DataSourceUtils.buildConnectionParams(dataSource.getType(),
dataSource.getConnectionParams()));
}
/**
@@ -406,14 +412,15 @@ public class DataSourceServiceImpl extends
BaseServiceImpl implements DataSource
public Result<Object> delete(User loginUser, int datasourceId) {
Result<Object> result = new Result<>();
try {
- //query datasource by id
+ // query datasource by id
DataSource dataSource = dataSourceMapper.selectById(datasourceId);
if (dataSource == null) {
logger.error("resource id {} not exist", datasourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
- if (!canOperatorPermissions(loginUser, new
Object[]{dataSource.getId()},AuthorizationType.DATASOURCE,DATASOURCE_DELETE)) {
+ if (!canOperatorPermissions(loginUser, new
Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE,
+ DATASOURCE_DELETE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -438,7 +445,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
public Map<String, Object> unauthDatasource(User loginUser, Integer
userId) {
Map<String, Object> result = new HashMap<>();
List<DataSource> datasourceList;
- if
(canOperatorPermissions(loginUser,null,AuthorizationType.DATASOURCE,null)) {
+ if (canOperatorPermissions(loginUser, null,
AuthorizationType.DATASOURCE, null)) {
// admin gets all data sources except userId
datasourceList =
dataSourceMapper.queryDatasourceExceptUserId(userId);
} else {
@@ -519,7 +526,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
tables = metaData.getTables(
connectionParam.getDatabase(),
-
getDbSchemaPattern(dataSource.getType(),schema,connectionParam),
+ getDbSchemaPattern(dataSource.getType(), schema,
connectionParam),
"%", TABLE_TYPES);
if (null == tables) {
putMsg(result, Status.GET_DATASOURCE_TABLES_ERROR);
@@ -549,7 +556,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
}
@Override
- public Map<String, Object> getTableColumns(Integer datasourceId,String
tableName) {
+ public Map<String, Object> getTableColumns(Integer datasourceId, String
tableName) {
Map<String, Object> result = new HashMap<>();
DataSource dataSource = dataSourceMapper.selectById(datasourceId);
@@ -615,7 +622,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl
implements DataSource
return options;
}
- private String getDbSchemaPattern(DbType dbType,String
schema,BaseConnectionParam connectionParam) {
+ private String getDbSchemaPattern(DbType dbType, String schema,
BaseConnectionParam connectionParam) {
if (dbType == null) {
return null;
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
index 2c46170618..30b9cbd150 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
@@ -21,15 +21,14 @@ import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DATASOURCE_LIST;
import org.apache.dolphinscheduler.api.enums.Status;
+import
org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.DataSourceServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.constants.DataSourceConstants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@@ -39,10 +38,8 @@ import
org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import
org.apache.dolphinscheduler.plugin.datasource.hive.param.HiveDataSourceParamDTO;
-import
org.apache.dolphinscheduler.plugin.datasource.mysql.param.MySQLDataSourceParamDTO;
import
org.apache.dolphinscheduler.plugin.datasource.oracle.param.OracleDataSourceParamDTO;
import
org.apache.dolphinscheduler.plugin.datasource.postgresql.param.PostgreSQLDataSourceParamDTO;
-import
org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbConnectType;
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -51,9 +48,7 @@ import org.apache.commons.collections.CollectionUtils;
import java.sql.Connection;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -61,11 +56,9 @@ import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
-import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -81,6 +74,7 @@ import org.slf4j.LoggerFactory;
@PowerMockIgnore({"sun.security.*", "javax.net.*"})
@PrepareForTest({DataSourceUtils.class, CommonUtils.class,
DataSourceClientProvider.class, PasswordUtils.class})
public class DataSourceServiceTest {
+
private static final Logger baseServiceLogger =
LoggerFactory.getLogger(BaseServiceImpl.class);
private static final Logger logger =
LoggerFactory.getLogger(DataSourceServiceTest.class);
private static final Logger dataSourceServiceLogger =
LoggerFactory.getLogger(DataSourceServiceImpl.class);
@@ -124,22 +118,26 @@ public class DataSourceServiceTest {
DbType dataSourceType = postgreSqlDatasourceParam.getType();
// data source exits
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
- Result connectionResult = new
Result(Status.DATASOURCE_CONNECT_FAILED.getCode(),
Status.DATASOURCE_CONNECT_FAILED.getMsg());
- //PowerMockito.when(dataSourceService.checkConnection(dataSourceType,
parameter)).thenReturn(connectionResult);
-
PowerMockito.doReturn(connectionResult).when(dataSourceService).checkConnection(dataSourceType,
connectionParam);
+ Result connectionResult =
+ new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(),
Status.DATASOURCE_CONNECT_FAILED.getMsg());
+ // PowerMockito.when(dataSourceService.checkConnection(dataSourceType,
parameter)).thenReturn(connectionResult);
+
PowerMockito.doReturn(connectionResult).when(dataSourceService).checkConnection(dataSourceType,
+ connectionParam);
Result connectFailedResult =
dataSourceService.createDataSource(loginUser, postgreSqlDatasourceParam);
Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(),
connectFailedResult.getCode().intValue());
// data source exits
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
connectionResult = new Result(Status.SUCCESS.getCode(),
Status.SUCCESS.getMsg());
- PowerMockito.when(dataSourceService.checkConnection(dataSourceType,
connectionParam)).thenReturn(connectionResult);
+ PowerMockito.when(dataSourceService.checkConnection(dataSourceType,
connectionParam))
+ .thenReturn(connectionResult);
Result notValidError = dataSourceService.createDataSource(loginUser,
postgreSqlDatasourceParam);
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(),
notValidError.getCode().intValue());
// success
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
- PowerMockito.when(dataSourceService.checkConnection(dataSourceType,
connectionParam)).thenReturn(connectionResult);
+ PowerMockito.when(dataSourceService.checkConnection(dataSourceType,
connectionParam))
+ .thenReturn(connectionResult);
Result success = dataSourceService.createDataSource(loginUser,
postgreSqlDatasourceParam);
Assert.assertEquals(Status.SUCCESS.getCode(),
success.getCode().intValue());
}
@@ -162,13 +160,15 @@ public class DataSourceServiceTest {
// data source not exits
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null);
- Result resourceNotExits =
dataSourceService.updateDataSource(dataSourceId, loginUser,
postgreSqlDatasourceParam);
+ Result resourceNotExits =
+ dataSourceService.updateDataSource(dataSourceId, loginUser,
postgreSqlDatasourceParam);
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getCode(),
resourceNotExits.getCode().intValue());
// user no operation perm
DataSource dataSource = new DataSource();
dataSource.setUserId(0);
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
- Result userNoOperationPerm =
dataSourceService.updateDataSource(dataSourceId, loginUser,
postgreSqlDatasourceParam);
+ Result userNoOperationPerm =
+ dataSourceService.updateDataSource(dataSourceId, loginUser,
postgreSqlDatasourceParam);
Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(),
userNoOperationPerm.getCode().intValue());
// data source name exits
@@ -177,7 +177,8 @@ public class DataSourceServiceTest {
dataSourceList.add(dataSource);
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(dataSourceList);
- Result dataSourceNameExist =
dataSourceService.updateDataSource(dataSourceId, loginUser,
postgreSqlDatasourceParam);
+ Result dataSourceNameExist =
+ dataSourceService.updateDataSource(dataSourceId, loginUser,
postgreSqlDatasourceParam);
Assert.assertEquals(Status.DATASOURCE_EXIST.getCode(),
dataSourceNameExist.getCode().intValue());
// data source connect failed
@@ -186,15 +187,18 @@ public class DataSourceServiceTest {
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null);
Result connectionResult = new Result(Status.SUCCESS.getCode(),
Status.SUCCESS.getMsg());
- PowerMockito.when(dataSourceService.checkConnection(dataSourceType,
connectionParam)).thenReturn(connectionResult);
+ PowerMockito.when(dataSourceService.checkConnection(dataSourceType,
connectionParam))
+ .thenReturn(connectionResult);
Result connectFailed =
dataSourceService.updateDataSource(dataSourceId, loginUser,
postgreSqlDatasourceParam);
Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(),
connectFailed.getCode().intValue());
- //success
+ // success
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null);
- connectionResult = new
Result(Status.DATASOURCE_CONNECT_FAILED.getCode(),
Status.DATASOURCE_CONNECT_FAILED.getMsg());
- PowerMockito.when(dataSourceService.checkConnection(dataSourceType,
connectionParam)).thenReturn(connectionResult);
+ connectionResult =
+ new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(),
Status.DATASOURCE_CONNECT_FAILED.getMsg());
+ PowerMockito.when(dataSourceService.checkConnection(dataSourceType,
connectionParam))
+ .thenReturn(connectionResult);
Result success = dataSourceService.updateDataSource(dataSourceId,
loginUser, postgreSqlDatasourceParam);
Assert.assertEquals(Status.SUCCESS.getCode(),
success.getCode().intValue());
@@ -209,12 +213,15 @@ public class DataSourceServiceTest {
String searchVal = "";
int pageNo = 1;
int pageSize = 10;
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
null, loginUser.getId(), DATASOURCE_LIST, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
null, loginUser.getId(), baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
loginUser.getId(), baseServiceLogger)).thenReturn(ids);
+
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
null,
+ loginUser.getId(), DATASOURCE_LIST,
baseServiceLogger)).thenReturn(true);
+
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
null,
+ loginUser.getId(), baseServiceLogger)).thenReturn(true);
+
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
+ loginUser.getId(), baseServiceLogger)).thenReturn(ids);
Result result = dataSourceService.queryDataSourceListPaging(loginUser,
searchVal, pageNo, pageSize);
- Assert.assertEquals(Status.SUCCESS.getCode(),(int)result.getCode());
+ Assert.assertEquals(Status.SUCCESS.getCode(), (int) result.getCode());
}
@Test
@@ -230,9 +237,11 @@ public class DataSourceServiceTest {
User loginUser = getAdminUser();
int dataSourceId = 1;
Result result = new Result();
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
new Object[]{dataSourceId}, -1, baseServiceLogger)).thenReturn(true);
- //resource not exist
+
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
null,
+ loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
+
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
+ new Object[]{dataSourceId}, -1,
baseServiceLogger)).thenReturn(true);
+ // resource not exist
dataSourceService.putMsg(result, Status.RESOURCE_NOT_EXIST);
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null);
Assert.assertEquals(result.getCode(),
dataSourceService.delete(loginUser, dataSourceId).getCode());
@@ -250,8 +259,10 @@ public class DataSourceServiceTest {
loginUser.setUserType(UserType.ADMIN_USER);
loginUser.setId(1);
dataSource.setId(22);
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
null, loginUser.getId(), DATASOURCE_DELETE,
baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,new
Object[]{dataSource.getId()} , 0, baseServiceLogger)).thenReturn(true);
+
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
null,
+ loginUser.getId(), DATASOURCE_DELETE,
baseServiceLogger)).thenReturn(true);
+
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
+ new Object[]{dataSource.getId()}, 0,
baseServiceLogger)).thenReturn(true);
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
Assert.assertEquals(result.getCode(),
dataSourceService.delete(loginUser, dataSourceId).getCode());
@@ -263,8 +274,10 @@ public class DataSourceServiceTest {
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
int userId = 3;
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
null, 0, baseServiceLogger)).thenReturn(true);
+
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
null,
+ loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
+
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
null, 0,
+ baseServiceLogger)).thenReturn(true);
// test admin user
Mockito.when(dataSourceMapper.queryAuthedDatasource(userId)).thenReturn(getSingleDataSourceList());
Mockito.when(dataSourceMapper.queryDatasourceExceptUserId(userId)).thenReturn(getDataSourceList());
@@ -276,7 +289,8 @@ public class DataSourceServiceTest {
// test non-admin user
loginUser.setId(2);
loginUser.setUserType(UserType.GENERAL_USER);
-
Mockito.when(dataSourceMapper.selectByMap(Collections.singletonMap("user_id",
loginUser.getId()))).thenReturn(getDataSourceList());
+
Mockito.when(dataSourceMapper.selectByMap(Collections.singletonMap("user_id",
loginUser.getId())))
+ .thenReturn(getDataSourceList());
result = dataSourceService.unauthDatasource(loginUser, userId);
logger.info(result.toString());
dataSources = (List<DataSource>) result.get(Constants.DATA_LIST);
@@ -311,9 +325,12 @@ public class DataSourceServiceTest {
loginUser.setUserType(UserType.GENERAL_USER);
Set<Integer> dataSourceIds = new HashSet<>();
dataSourceIds.add(1);
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
null, 0, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
loginUser.getId(), dataSourceServiceLogger)).thenReturn(dataSourceIds);
+
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE,
null,
+ loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
+
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
null, 0,
+ baseServiceLogger)).thenReturn(true);
+
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
+ loginUser.getId(),
dataSourceServiceLogger)).thenReturn(dataSourceIds);
DataSource dataSource = new DataSource();
dataSource.setType(DbType.MYSQL);
@@ -361,8 +378,9 @@ public class DataSourceServiceTest {
dataSource.setName("test");
dataSource.setNote("Note");
dataSource.setType(DbType.ORACLE);
-
dataSource.setConnectionParams("{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
- +
"\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
+ dataSource.setConnectionParams(
+
"{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
+ +
"\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
return dataSource;
}
@@ -373,8 +391,9 @@ public class DataSourceServiceTest {
dataSource.setName("test");
dataSource.setNote("Note");
dataSource.setType(DbType.ORACLE);
-
dataSource.setConnectionParams("{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
- +
"\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
+ dataSource.setConnectionParams(
+
"{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
+ +
"\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
return dataSource;
}
@@ -390,8 +409,9 @@ public class DataSourceServiceTest {
oracleDatasourceParamDTO.setConnectType(DbConnectType.ORACLE_SERVICE_NAME);
ConnectionParam connectionParam =
DataSourceUtils.buildConnectionParams(oracleDatasourceParamDTO);
- String expected =
"{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\","
- +
"\"driverClassName\":\"oracle.jdbc.OracleDriver\",\"validationQuery\":\"select
1 from dual\",\"connectType\":\"ORACLE_SERVICE_NAME\"}";
+ String expected =
+
"{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\","
+ +
"\"driverClassName\":\"oracle.jdbc.OracleDriver\",\"validationQuery\":\"select
1 from dual\",\"connectType\":\"ORACLE_SERVICE_NAME\"}";
Assert.assertEquals(expected, JSONUtils.toJsonString(connectionParam));
PowerMockito.mockStatic(CommonUtils.class);
@@ -410,36 +430,38 @@ public class DataSourceServiceTest {
hiveDataSourceParamDTO.setLoginUserKeytabUsername("test2/[email protected]");
connectionParam =
DataSourceUtils.buildConnectionParams(hiveDataSourceParamDTO);
- expected =
"{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:hive2://192.168.9.1:10000\",\"database\":\"im\","
- +
"\"jdbcUrl\":\"jdbc:hive2://192.168.9.1:10000/im\",\"driverClassName\":\"org.apache.hive.jdbc.HiveDriver\",\"validationQuery\":\"select
1\","
- +
"\"principal\":\"hive/[email protected]\",\"javaSecurityKrb5Conf\":\"/opt/krb5.conf\",\"loginUserKeytabUsername\":\"test2/[email protected]\","
- + "\"loginUserKeytabPath\":\"/opt/hdfs.headless.keytab\"}";
+ expected =
+
"{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:hive2://192.168.9.1:10000\",\"database\":\"im\","
+ +
"\"jdbcUrl\":\"jdbc:hive2://192.168.9.1:10000/im\",\"driverClassName\":\"org.apache.hive.jdbc.HiveDriver\",\"validationQuery\":\"select
1\","
+ +
"\"principal\":\"hive/[email protected]\",\"javaSecurityKrb5Conf\":\"/opt/krb5.conf\",\"loginUserKeytabUsername\":\"test2/[email protected]\","
+ +
"\"loginUserKeytabPath\":\"/opt/hdfs.headless.keytab\"}";
Assert.assertEquals(expected, JSONUtils.toJsonString(connectionParam));
}
@Test
public void buildParameterWithDecodePassword() {
-// try (MockedStatic<PropertyUtils> mockedStaticPropertyUtils =
Mockito.mockStatic(PropertyUtils.class)) {
-// mockedStaticPropertyUtils
-// .when(() ->
PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE,
false))
-// .thenReturn(true);
-// Map<String, String> other = new HashMap<>();
-// other.put("autoDeserialize", "yes");
-// other.put("allowUrlInLocalInfile", "true");
-// MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new
MySQLDataSourceParamDTO();
-// mysqlDatasourceParamDTO.setHost("192.168.9.1");
-// mysqlDatasourceParamDTO.setPort(1521);
-// mysqlDatasourceParamDTO.setDatabase("im");
-// mysqlDatasourceParamDTO.setUserName("test");
-// mysqlDatasourceParamDTO.setPassword("123456");
-// mysqlDatasourceParamDTO.setOther(other);
-// ConnectionParam connectionParam =
DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO);
-// String expected =
-//
"{\"user\":\"test\",\"password\":\"bnVsbE1USXpORFUy\",\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/"
-// +
"im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\",\"validationQuery\":\"select
1\",\"props\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\"}}";
-// Assertions.assertEquals(expected,
JSONUtils.toJsonString(connectionParam));
-// }
+ // try (MockedStatic<PropertyUtils> mockedStaticPropertyUtils =
Mockito.mockStatic(PropertyUtils.class)) {
+ // mockedStaticPropertyUtils
+ // .when(() ->
PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE,
false))
+ // .thenReturn(true);
+ // Map<String, String> other = new HashMap<>();
+ // other.put("autoDeserialize", "yes");
+ // other.put("allowUrlInLocalInfile", "true");
+ // MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new
MySQLDataSourceParamDTO();
+ // mysqlDatasourceParamDTO.setHost("192.168.9.1");
+ // mysqlDatasourceParamDTO.setPort(1521);
+ // mysqlDatasourceParamDTO.setDatabase("im");
+ // mysqlDatasourceParamDTO.setUserName("test");
+ // mysqlDatasourceParamDTO.setPassword("123456");
+ // mysqlDatasourceParamDTO.setOther(other);
+ // ConnectionParam connectionParam =
DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO);
+ // String expected =
+ //
"{\"user\":\"test\",\"password\":\"bnVsbE1USXpORFUy\",\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/"
+ // +
"im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\",\"validationQuery\":\"select
+ //
1\",\"props\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\"}}";
+ // Assertions.assertEquals(expected,
JSONUtils.toJsonString(connectionParam));
+ // }
}
/**
@@ -475,15 +497,12 @@ public class DataSourceServiceTest {
ConnectionParam connectionParam =
DataSourceUtils.buildConnectionParams(postgreSqlDatasourceParam);
PowerMockito.mockStatic(DataSourceUtils.class);
- PowerMockito.mockStatic(DataSourceClientProvider.class);
- DataSourceClientProvider clientProvider =
PowerMockito.mock(DataSourceClientProvider.class);
-
PowerMockito.when(DataSourceClientProvider.getInstance()).thenReturn(clientProvider);
-
+ PowerMockito.when(DataSourceUtils.getConnection(Mockito.any(),
Mockito.any())).thenReturn(null);
Result result = dataSourceService.checkConnection(dataSourceType,
connectionParam);
Assert.assertEquals(Status.CONNECTION_TEST_FAILURE.getCode(),
result.getCode().intValue());
Connection connection = PowerMockito.mock(Connection.class);
- PowerMockito.when(clientProvider.getConnection(Mockito.any(),
Mockito.any())).thenReturn(connection);
+ PowerMockito.when(DataSourceUtils.getConnection(Mockito.any(),
Mockito.any())).thenReturn(connection);
result = dataSourceService.checkConnection(dataSourceType,
connectionParam);
Assert.assertEquals(Status.SUCCESS.getCode(),
result.getCode().intValue());
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
index 0841e33eeb..773ad501db 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
@@ -79,8 +79,8 @@ public class CommonUtils {
* @param loginUserKeytabPath loginUserKeytabPath
* @throws IOException errors
*/
- public static void loadKerberosConf(String javaSecurityKrb5Conf, String
loginUserKeytabUsername,
- String loginUserKeytabPath) throws
IOException {
+ public static synchronized void loadKerberosConf(String
javaSecurityKrb5Conf, String loginUserKeytabUsername,
+ String
loginUserKeytabPath) throws IOException {
Configuration configuration = new Configuration();
configuration.setClassLoader(configuration.getClass().getClassLoader());
loadKerberosConf(javaSecurityKrb5Conf, loginUserKeytabUsername,
loginUserKeytabPath, configuration);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index a6b3d7da7b..61b67dd2dd 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -17,29 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.datax;
-import com.alibaba.druid.sql.ast.SQLStatement;
-import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
-import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
-import com.alibaba.druid.sql.ast.statement.SQLSelect;
-import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
-import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
-import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
-import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
-import com.alibaba.druid.sql.parser.SQLStatementParser;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
import static
org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -74,11 +60,23 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
+import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
+import com.alibaba.druid.sql.ast.statement.SQLSelect;
+import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
+import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
+import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
+import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
+import com.alibaba.druid.sql.parser.SQLStatementParser;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class DataxTask extends AbstractTask {
+
/**
* jvm parameters
*/
@@ -141,7 +139,8 @@ public class DataxTask extends AbstractTask {
throw new RuntimeException("datax task params is not valid");
}
- dataxTaskExecutionContext =
dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+ dataxTaskExecutionContext =
+
dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
}
/**
@@ -195,8 +194,7 @@ public class DataxTask extends AbstractTask {
* @return datax json file name
* @throws Exception if error throws Exception
*/
- private String buildDataxJsonFile(Map<String, Property> paramsMap)
- throws Exception {
+ private String buildDataxJsonFile(Map<String, Property> paramsMap) throws
Exception {
// generate json
String fileName = String.format("%s/%s_job.json",
taskExecutionContext.getExecutePath(),
@@ -273,7 +271,8 @@ public class DataxTask extends AbstractTask {
ArrayNode tableArr = writerConn.putArray("table");
tableArr.add(dataXParameters.getTargetTable());
- writerConn.put("jdbcUrl",
DataSourceUtils.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()),
dataTargetCfg));
+ writerConn.put("jdbcUrl",
+
DataSourceUtils.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()),
dataTargetCfg));
writerConnArr.add(writerConn);
ObjectNode writerParam = JSONUtils.createObjectNode();
@@ -379,8 +378,7 @@ public class DataxTask extends AbstractTask {
* @return shell command file name
* @throws Exception if error throws Exception
*/
- private String buildShellCommandFile(String jobConfigFilePath, Map<String,
Property> paramsMap)
- throws Exception {
+ private String buildShellCommandFile(String jobConfigFilePath, Map<String,
Property> paramsMap) throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s",
taskExecutionContext.getExecutePath(),
@@ -468,7 +466,8 @@ public class DataxTask extends AbstractTask {
* @param sql sql for data synchronization
* @return Keyword converted column names
*/
- private String[] parsingSqlColumnNames(DbType sourceType, DbType
targetType, BaseConnectionParam dataSourceCfg, String sql) {
+ private String[] parsingSqlColumnNames(DbType sourceType, DbType
targetType, BaseConnectionParam dataSourceCfg,
+ String sql) {
String[] columnNames =
tryGrammaticalAnalysisSqlColumnNames(sourceType, sql);
if (columnNames == null || columnNames.length == 0) {
@@ -565,7 +564,7 @@ public class DataxTask extends AbstractTask {
sql = sql.replace(";", "");
try (
- Connection connection =
DataSourceClientProvider.getInstance().getConnection(sourceType,
baseDataSource);
+ Connection connection =
DataSourceUtils.getConnection(sourceType, baseDataSource);
PreparedStatement stmt = connection.prepareStatement(sql);
ResultSet resultSet = stmt.executeQuery()) {
@@ -573,9 +572,9 @@ public class DataxTask extends AbstractTask {
int num = md.getColumnCount();
columnNames = new String[num];
for (int i = 1; i <= num; i++) {
- columnNames[i - 1] = md.getColumnName(i).replace("t.","");
+ columnNames[i - 1] = md.getColumnName(i).replace("t.", "");
}
- } catch (SQLException | ExecutionException e) {
+ } catch (SQLException e) {
logger.error(e.getMessage(), e);
return null;
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index 69d04db280..e21c9b9f26 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -21,7 +21,6 @@ import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
@@ -108,7 +107,7 @@ public class ProcedureTask extends AbstractTask {
procedureTaskExecutionContext.getConnectionParams());
// get jdbc connection
- connection =
DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
+ connection = DataSourceUtils.getConnection(dbType,
connectionParam);
Map<Integer, Property> sqlParamsMap = new HashMap<>();
Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap() == null ? Maps.newHashMap()
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index 9e4c1df861..6d021c7fdf 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -17,12 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.sql;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@@ -44,8 +39,6 @@ import
org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
-import org.slf4j.Logger;
-
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -65,6 +58,11 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class SqlTask extends AbstractTask {
/**
@@ -86,7 +84,8 @@ public class SqlTask extends AbstractTask {
* create function format
* include replace here which can be compatible with more cases, for
example a long-running Spark session in Kyuubi will keep its own temp functions
instead of destroying them right away
*/
- private static final String CREATE_OR_REPLACE_FUNCTION_FORMAT = "create or
replace temporary function {0} as ''{1}''";
+ private static final String CREATE_OR_REPLACE_FUNCTION_FORMAT =
+ "create or replace temporary function {0} as ''{1}''";
/**
* default query sql limit
@@ -110,7 +109,8 @@ public class SqlTask extends AbstractTask {
throw new RuntimeException("sql task params is not valid");
}
- sqlTaskExecutionContext =
sqlParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+ sqlTaskExecutionContext =
+
sqlParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
}
@Override
@@ -121,7 +121,8 @@ public class SqlTask extends AbstractTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
logger.info("Full sql parameters: {}", sqlParameters);
- logger.info("sql type : {}, datasource : {}, sql : {} , localParams :
{},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit
{}",
+ logger.info(
+ "sql type : {}, datasource : {}, sql : {} , localParams :
{},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit
{}",
sqlParameters.getType(),
sqlParameters.getDatasource(),
sqlParameters.getSql(),
@@ -139,10 +140,11 @@ public class SqlTask extends AbstractTask {
sqlTaskExecutionContext.getConnectionParams());
// ready to execute SQL and parameter entity Map
- List<SqlBinds> mainStatementSqlBinds =
SqlSplitter.split(sqlParameters.getSql(), sqlParameters.getSegmentSeparator())
- .stream()
- .map(this::getSqlAndSqlParamsMap)
- .collect(Collectors.toList());
+ List<SqlBinds> mainStatementSqlBinds =
+ SqlSplitter.split(sqlParameters.getSql(),
sqlParameters.getSegmentSeparator())
+ .stream()
+ .map(this::getSqlAndSqlParamsMap)
+ .collect(Collectors.toList());
List<SqlBinds> preStatementSqlBinds =
Optional.ofNullable(sqlParameters.getPreStatements())
.orElse(new ArrayList<>())
@@ -190,7 +192,7 @@ public class SqlTask extends AbstractTask {
try {
// create connection
- connection =
DataSourceClientProvider.getInstance().getConnection(DbType.valueOf(sqlParameters.getType()),
baseConnectionParam);
+ connection =
DataSourceUtils.getConnection(DbType.valueOf(sqlParameters.getType()),
baseConnectionParam);
// create temp function
if (CollectionUtils.isNotEmpty(createFuncs)) {
createTempFunction(connection, createFuncs);
@@ -210,7 +212,7 @@ public class SqlTask extends AbstractTask {
String updateResult = executeUpdate(connection,
mainStatementsBinds, "main");
result = setNonQuerySqlReturn(updateResult,
sqlParameters.getLocalParams());
}
- //deal out params
+ // deal out params
sqlParameters.dealOutParam(result);
// post execute
@@ -265,7 +267,8 @@ public class SqlTask extends AbstractTask {
resultJSONArray.add(mapOfColValues);
rowCount++;
}
- int displayRows = sqlParameters.getDisplayRows() > 0 ?
sqlParameters.getDisplayRows() : TaskConstants.DEFAULT_DISPLAY_ROWS;
+ int displayRows = sqlParameters.getDisplayRows() > 0 ?
sqlParameters.getDisplayRows()
+ : TaskConstants.DEFAULT_DISPLAY_ROWS;
displayRows = Math.min(displayRows, rowCount);
logger.info("display sql result {} rows as follows:", displayRows);
for (int i = 0; i < displayRows; i++) {
@@ -305,12 +308,14 @@ public class SqlTask extends AbstractTask {
}
}
- private String executeUpdate(Connection connection, List<SqlBinds>
statementsBinds, String handlerType) throws Exception {
+ private String executeUpdate(Connection connection, List<SqlBinds>
statementsBinds,
+ String handlerType) throws Exception {
int result = 0;
for (SqlBinds sqlBind : statementsBinds) {
try (PreparedStatement statement =
prepareStatementAndBind(connection, sqlBind)) {
result = statement.executeUpdate();
- logger.info("{} statement execute update result: {}, for sql:
{}", handlerType, result, sqlBind.getSql());
+ logger.info("{} statement execute update result: {}, for sql:
{}", handlerType, result,
+ sqlBind.getSql());
}
}
return String.valueOf(result);
@@ -371,7 +376,8 @@ public class SqlTask extends AbstractTask {
ParameterUtils.setInParameter(entry.getKey(), stmt,
prop.getType(), prop.getValue());
}
}
- logger.info("prepare statement replace sql : {}, sql parameters :
{}", sqlBinds.getSql(), sqlBinds.getParamsMap());
+ logger.info("prepare statement replace sql : {}, sql parameters :
{}", sqlBinds.getSql(),
+ sqlBinds.getParamsMap());
return stmt;
} catch (Exception exception) {
throw new TaskException("SQL task prepareStatementAndBind error",
exception);
@@ -387,14 +393,15 @@ public class SqlTask extends AbstractTask {
* @param sqlParamsMap sql params map
*/
private void printReplacedSql(String content, String formatSql, String
rgex, Map<Integer, Property> sqlParamsMap) {
- //parameter print style
+ // parameter print style
logger.info("after replace sql , preparing : {}", formatSql);
StringBuilder logPrint = new StringBuilder("replaced sql ,
parameters:");
if (sqlParamsMap == null) {
logger.info("printReplacedSql: sqlParamsMap is null.");
} else {
for (int i = 1; i <= sqlParamsMap.size(); i++) {
-
logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType()).append(")");
+
logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType())
+ .append(")");
}
}
logger.info("Sql Params are {}", logPrint);
@@ -428,8 +435,8 @@ public class SqlTask extends AbstractTask {
}
// special characters need to be escaped, ${} needs to be escaped
- setSqlParamsMap(sql, rgex, sqlParamsMap,
paramsMap,taskExecutionContext.getTaskInstanceId());
- //Replace the original value in sql !{...} ,Does not participate in
precompilation
+ setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap,
taskExecutionContext.getTaskInstanceId());
+ // Replace the original value in sql !{...} ,Does not participate in
precompilation
String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*";
sql = replaceOriginalValue(sql, rgexo, paramsMap);
// replace the ${} of the SQL statement with the Placeholder
@@ -485,7 +492,8 @@ public class SqlTask extends AbstractTask {
*/
private List<String> buildTempFuncSql(List<UdfFuncParameters>
udfFuncParameters) {
return udfFuncParameters.stream().map(value -> MessageFormat
- .format(CREATE_OR_REPLACE_FUNCTION_FORMAT,
value.getFuncName(), value.getClassName())).collect(Collectors.toList());
+ .format(CREATE_OR_REPLACE_FUNCTION_FORMAT,
value.getFuncName(), value.getClassName()))
+ .collect(Collectors.toList());
}
/**
@@ -499,7 +507,8 @@ public class SqlTask extends AbstractTask {
String prefixPath = defaultFS.startsWith("file://") ? "file://" :
defaultFS;
String uploadPath =
CommonUtils.getHdfsUdfDir(value.getTenantCode());
String resourceFullName = value.getResourceName();
- resourceFullName = resourceFullName.startsWith("/") ?
resourceFullName : String.format("/%s", resourceFullName);
+ resourceFullName =
+ resourceFullName.startsWith("/") ? resourceFullName :
String.format("/%s", resourceFullName);
return String.format("add jar %s%s%s", prefixPath, uploadPath,
resourceFullName);
}).collect(Collectors.toList());
}