This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 45586f71d5 [BUG][Resource Center] Task can not select main JAR (#13536)
45586f71d5 is described below
commit 45586f71d548cf22195bcc60c63d8bc84884974f
Author: Aaron Wang <[email protected]>
AuthorDate: Tue Apr 18 18:38:16 2023 +0800
[BUG][Resource Center] Task can not select main JAR (#13536)
---
.../api/dto/resources/filter/IFilter.java | 4 +-
.../api/dto/resources/filter/ResourceFilter.java | 47 +++++--------
.../api/service/impl/ResourcesServiceImpl.java | 80 +++++++++++++---------
.../dto/resources/filter/ResourceFilterTest.java | 24 +++----
.../master/runner/task/BaseTaskProcessor.java | 3 +-
.../service/process/ProcessServiceImpl.java | 7 +-
.../service/process/ProcessServiceTest.java | 2 -
.../plugin/task/api/AbstractYarnTask.java | 22 ------
.../plugin/task/dq/DataQualityTask.java | 4 +-
.../plugin/task/dq/utils/SparkArgsUtils.java | 2 +-
.../plugin/task/flink/FlinkStreamTask.java | 10 ---
.../plugin/task/flink/FlinkArgsUtilsTest.java | 6 +-
.../plugin/task/flink/FlinkParametersTest.java | 6 +-
.../plugin/task/flink/FlinkArgsUtils.java | 2 +-
.../plugin/task/flink/FlinkTask.java | 16 -----
.../plugin/task/flink/FlinkArgsUtilsTest.java | 6 +-
.../plugin/task/flink/FlinkParametersTest.java | 6 +-
.../plugin/task/java/JavaTask.java | 33 ++-------
.../plugin/task/java/JavaTaskTest.java | 10 +++
.../plugin/task/mr/MapReduceArgsUtils.java | 5 +-
.../plugin/task/mr/MapReduceTask.java | 13 +---
.../plugin/task/seatunnel/SeatunnelTask.java | 1 +
.../plugin/task/spark/SparkTask.java | 14 +---
.../plugin/task/spark/SparkParametersTest.java | 6 +-
.../plugin/task/spark/SparkTaskTest.java | 6 +-
.../plugin/task/sqoop/SqoopTask.java | 4 --
.../node/fields/use-java-task-main-jar.ts | 5 +-
.../task/components/node/fields/use-main-jar.ts | 5 +-
.../worker/utils/TaskExecutionCheckerUtils.java | 8 +--
29 files changed, 143 insertions(+), 214 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java
index 2bcc2822a8..39c81b5880 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java
@@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
-import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import java.util.List;
@@ -25,5 +25,5 @@ import java.util.List;
*/
public interface IFilter {
- List<Resource> filter();
+ List<StorageEntity> filter();
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java
index 6206769f7f..14f4a6292d 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java
@@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
-import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import java.util.ArrayList;
import java.util.HashSet;
@@ -36,7 +36,7 @@ public class ResourceFilter implements IFilter {
/**
* resource list
*/
- private List<Resource> resourceList;
+ private List<StorageEntity> resourceList;
/**
* parent list
@@ -48,7 +48,7 @@ public class ResourceFilter implements IFilter {
* @param suffix resource suffix
* @param resourceList resource list
*/
- public ResourceFilter(String suffix, List<Resource> resourceList) {
+ public ResourceFilter(String suffix, List<StorageEntity> resourceList) {
this.suffix = suffix;
this.resourceList = resourceList;
}
@@ -57,44 +57,31 @@ public class ResourceFilter implements IFilter {
* file filter
* @return file filtered by suffix
*/
- public Set<Resource> fileFilter() {
- return resourceList.stream().filter(t -> {
- String alias = t.getAlias();
- return alias.endsWith(suffix);
- }).collect(Collectors.toSet());
+ public Set<StorageEntity> fileFilter() {
+ return resourceList.stream().filter(t ->
t.getFullName().endsWith(suffix)).collect(Collectors.toSet());
}
/**
* list all parent dir
* @return parent resource dir set
*/
- Set<Resource> listAllParent() {
- Set<Resource> parentList = new HashSet<>();
- Set<Resource> filterFileList = fileFilter();
- for (Resource file : filterFileList) {
- parentList.add(file);
- setAllParent(file, parentList);
- }
- return parentList;
-
- }
-
- /**
- * list all parent dir
- * @param resource resource
- * @return parent resource dir set
- */
- private void setAllParent(Resource resource, Set<Resource> parentList) {
- for (Resource resourceTemp : resourceList) {
- if (resourceTemp.getId() == resource.getPid()) {
- parentList.add(resourceTemp);
- setAllParent(resourceTemp, parentList);
+ Set<StorageEntity> listAllParent() {
+ Set<StorageEntity> parentList = new HashSet<>();
+ Set<StorageEntity> filterFileList = fileFilter();
+ for (StorageEntity file : filterFileList) {
+ String fullName = file.getFullName();
+ for (StorageEntity resource : resourceList) {
+ if (fullName.startsWith(resource.getFullName())) {
+ parentList.add(resource);
+ }
}
}
+
+ return parentList;
}
@Override
- public List<Resource> filter() {
+ public List<StorageEntity> filter() {
return new ArrayList<>(listAllParent());
}
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
index 6f83c4f914..cd8d2cd1ea 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
@@ -583,11 +583,36 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
return result;
}
+ List<StorageEntity> resourcesList = new ArrayList<>();
+ try {
+ resourcesList = queryStorageEntityList(loginUser, fullName, type,
tenantCode, false);
+ } catch (ServiceException e) {
+ putMsg(result, Status.RESOURCE_NOT_EXIST);
+ return result;
+ }
+
+ // remove leading and trailing spaces in searchVal
+ String trimmedSearchVal = searchVal != null ? searchVal.trim() : "";
+ // filter based on trimmed searchVal
+ List<StorageEntity> filteredResourceList = resourcesList.stream()
+ .filter(x ->
x.getFileName().contains(trimmedSearchVal)).collect(Collectors.toList());
+ // inefficient pagination
+ List<StorageEntity> slicedResourcesList =
filteredResourceList.stream().skip((long) (pageNo - 1) * pageSize)
+ .limit(pageSize).collect(Collectors.toList());
+
+ pageInfo.setTotal(filteredResourceList.size());
+ pageInfo.setTotalList(slicedResourcesList);
+ result.setData(pageInfo);
+ putMsg(result, Status.SUCCESS);
+ return result;
+ }
+
+ private List<StorageEntity> queryStorageEntityList(User loginUser, String
fullName, ResourceType type,
+ String tenantCode,
boolean recursive) {
String defaultPath = "";
List<StorageEntity> resourcesList = new ArrayList<>();
String resourceStorageType =
PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE,
ResUploadType.NONE.name());
-
if (isAdmin(loginUser) && StringUtils.isBlank(fullName)) {
// list all tenants' resources to admin users in the root directory
List<User> userList = userMapper.selectList(null);
@@ -600,13 +625,15 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
defaultPath =
storageOperate.getUdfDir(tenantEntityCode);
}
try {
-
resourcesList.addAll(storageOperate.listFilesStatus(defaultPath, defaultPath,
- tenantEntityCode, type));
+ resourcesList.addAll(recursive
+ ?
storageOperate.listFilesStatusRecursively(defaultPath, defaultPath,
+ tenantEntityCode, type)
+ : storageOperate.listFilesStatus(defaultPath,
defaultPath,
+ tenantEntityCode, type));
visitedTenantEntityCode.add(tenantEntityCode);
} catch (Exception e) {
log.error(e.getMessage() + " Resource path: {}",
defaultPath, e);
- putMsg(result, Status.RESOURCE_NOT_EXIST);
throw new
ServiceException(String.format(e.getMessage() +
" make sure resource path: %s exists in %s",
defaultPath, resourceStorageType));
}
@@ -626,26 +653,12 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
}
} catch (Exception e) {
log.error(e.getMessage() + " Resource path: {}", fullName, e);
- putMsg(result, Status.RESOURCE_NOT_EXIST);
throw new ServiceException(String.format(e.getMessage() +
" make sure resource path: %s exists in %s",
defaultPath, resourceStorageType));
}
}
- // remove leading and trailing spaces in searchVal
- String trimmedSearchVal = searchVal != null ? searchVal.trim() : "";
- // filter based on trimmed searchVal
- List<StorageEntity> filteredResourceList = resourcesList.stream()
- .filter(x ->
x.getFileName().contains(trimmedSearchVal)).collect(Collectors.toList());
- // inefficient pagination
- List<StorageEntity> slicedResourcesList =
filteredResourceList.stream().skip((long) (pageNo - 1) * pageSize)
- .limit(pageSize).collect(Collectors.toList());
-
- pageInfo.setTotal(filteredResourceList.size());
- pageInfo.setTotalList(slicedResourcesList);
- result.setData(pageInfo);
- putMsg(result, Status.SUCCESS);
- return result;
+ return resourcesList;
}
/**
@@ -799,14 +812,23 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
public Result<Object> queryResourceByProgramType(User loginUser,
ResourceType type, ProgramType programType) {
Result<Object> result = new Result<>();
- Set<Integer> resourceIds = resourcePermissionCheckService
- .userOwnedResourceIdsAcquisition(checkResourceType(type),
loginUser.getId(), log);
- if (resourceIds.isEmpty()) {
- result.setData(Collections.emptyList());
- putMsg(result, Status.SUCCESS);
+ User user = userMapper.selectById(loginUser.getId());
+ if (user == null) {
+ log.error("user {} not exists", loginUser.getId());
+ putMsg(result, Status.USER_NOT_EXIST, loginUser.getId());
+ return result;
+ }
+
+ Tenant tenant = tenantMapper.queryById(user.getTenantId());
+ if (tenant == null) {
+ log.error("tenant not exists");
+ putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
- List<Resource> allResourceList =
resourcesMapper.selectBatchIds(resourceIds);
+
+ String tenantCode = tenant.getTenantCode();
+
+ List<StorageEntity> allResourceList =
queryStorageEntityList(loginUser, "", type, tenantCode, true);
String suffix = ".jar";
if (programType != null) {
@@ -820,12 +842,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
default:
}
}
- List<Resource> resources = new ResourceFilter(suffix, new
ArrayList<>(allResourceList)).filter();
- // Transform into StorageEntity for compatibility
- List<StorageEntity> transformedResourceList = resources.stream()
- .map(this::createStorageEntityBasedOnResource)
- .collect(Collectors.toList());
- Visitor visitor = new ResourceTreeVisitor(transformedResourceList);
+ List<StorageEntity> resources = new ResourceFilter(suffix, new
ArrayList<>(allResourceList)).filter();
+ Visitor visitor = new ResourceTreeVisitor(resources);
result.setData(visitor.visit("").getChildren());
putMsg(result, Status.SUCCESS);
return result;
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java
index ba0c796f2e..c84574a7e9 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
-import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import java.util.ArrayList;
import java.util.List;
@@ -34,25 +34,23 @@ public class ResourceFilterTest {
private static Logger logger =
LoggerFactory.getLogger(ResourceFilterTest.class);
@Test
public void filterTest() {
- List<Resource> allList = new ArrayList<>();
+ List<StorageEntity> allList = new ArrayList<>();
- Resource resource1 = new Resource(3, -1, "b", "/b", true);
- Resource resource2 = new Resource(4, 2, "a1.txt", "/a/a1.txt", false);
- Resource resource3 = new Resource(5, 3, "b1.txt", "/b/b1.txt", false);
- Resource resource4 = new Resource(6, 3, "b2.jar", "/b/b2.jar", false);
- Resource resource5 = new Resource(7, -1, "b2", "/b2", true);
- Resource resource6 = new Resource(8, -1, "b2", "/b/b2", true);
- Resource resource7 = new Resource(9, 8, "c2.jar", "/b/b2/c2.jar",
false);
+ StorageEntity resource1 = new StorageEntity();
+ resource1.setFullName("a1.txt");
+ StorageEntity resource2 = new StorageEntity();
+ resource2.setFullName("b1.txt");
+ StorageEntity resource3 = new StorageEntity();
+ resource3.setFullName("b2.jar");
+ StorageEntity resource4 = new StorageEntity();
+ resource4.setFullName("c2.jar");
allList.add(resource1);
allList.add(resource2);
allList.add(resource3);
allList.add(resource4);
- allList.add(resource5);
- allList.add(resource6);
- allList.add(resource7);
ResourceFilter resourceFilter = new ResourceFilter(".jar", allList);
- List<Resource> resourceList = resourceFilter.filter();
+ List<StorageEntity> resourceList = resourceFilter.filter();
Assertions.assertNotNull(resourceList);
resourceList.forEach(t -> logger.info(t.toString()));
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 6045e2609e..9925e36b5e 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -606,9 +606,8 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
if (baseParam != null) {
List<ResourceInfo> projectResourceFiles =
baseParam.getResourceFilesList();
if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
- // TODO: Modify this part to accomodate(migrate)
oldversionresources in the future.
projectResourceFiles.forEach(file ->
resourcesMap.put(file.getResourceName(),
-
processService.queryTenantCodeByResName(file.getResourceName(),
ResourceType.FILE)));
+
storageOperate.getResourceFileName(file.getResourceName())));
}
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 43196ad569..be997c934a 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1364,7 +1364,8 @@ public class ProcessServiceImpl implements ProcessService
{
ResourceInfo mainJar = JSONUtils.parseObject(
JSONUtils.toJsonString(mainJarObj),
ResourceInfo.class);
- ResourceInfo resourceInfo =
updateResourceInfo(taskDefinition.getId(), mainJar);
+ ResourceInfo resourceInfo =
+
updateResourceInfo(taskDefinitionMapper.queryByCode(taskDefinition.getCode()).getId(),
mainJar);
if (resourceInfo != null) {
taskParameters.put("mainJar", resourceInfo);
}
@@ -1375,7 +1376,8 @@ public class ProcessServiceImpl implements ProcessService
{
List<ResourceInfo> resourceInfos =
JSONUtils.toList(resourceListStr, ResourceInfo.class);
List<ResourceInfo> updatedResourceInfos = resourceInfos
.stream()
- .map(resourceInfo ->
updateResourceInfo(taskDefinition.getId(), resourceInfo))
+ .map(resourceInfo -> updateResourceInfo(
+
taskDefinitionMapper.queryByCode(taskDefinition.getCode()).getId(),
resourceInfo))
.filter(Objects::nonNull)
.collect(Collectors.toList());
taskParameters.put("resourceList", updatedResourceInfos);
@@ -1402,7 +1404,6 @@ public class ProcessServiceImpl implements ProcessService
{
}
resourceInfo = new ResourceInfo();
resourceInfo.setId(-1);
- resourceInfo.setRes(res.getRes());
resourceInfo.setResourceName(resourceFullName);
log.info("updated resource info {}",
JSONUtils.toJsonString(resourceInfo));
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index f2b7dbe489..81ef7a214f 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -714,13 +714,11 @@ public class ProcessServiceTest {
// test normal situation
ResourceInfo resourceInfoNormal = new ResourceInfo();
resourceInfoNormal.setId(1);
- resourceInfoNormal.setRes("test.txt");
resourceInfoNormal.setResourceName("/test.txt");
ResourceInfo updatedResourceInfo3 =
processService.updateResourceInfo(0, resourceInfoNormal);
Assertions.assertEquals(-1, updatedResourceInfo3.getId().intValue());
- Assertions.assertEquals("test.txt", updatedResourceInfo3.getRes());
Assertions.assertEquals("/test.txt",
updatedResourceInfo3.getResourceName());
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index 85ab9a610f..406e78a80b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -21,7 +21,6 @@ import static
org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLE
import static
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -122,25 +121,4 @@ public abstract class AbstractYarnTask extends
AbstractRemoteTask {
*/
protected abstract String buildCommand();
- /**
- * set main jar name
- */
- protected abstract void setMainJarName();
-
- /**
- * Get name of jar resource.
- *
- * @param mainJar
- * @return
- */
- protected String getResourceNameOfMainJar(ResourceInfo mainJar) {
- if (null == mainJar) {
- throw new RuntimeException("The jar for the task is required.");
- }
-
- return mainJar.getId() == null
- ? mainJar.getRes()
- // when update resource maybe has error
- : mainJar.getResourceName().replaceFirst("/", "");
- }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
index 54b88c5977..b270b9a5f5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
@@ -180,11 +180,11 @@ public class DataQualityTask extends AbstractYarnTask {
return command;
}
- @Override
protected void setMainJarName() {
ResourceInfo mainJar = new ResourceInfo();
String basePath =
System.getProperty("user.dir").replace(File.separator + "bin", "");
- mainJar.setRes(basePath + File.separator + "libs" + File.separator +
CommonUtils.getDataQualityJarName());
+ mainJar.setResourceName(
+ basePath + File.separator + "libs" + File.separator +
CommonUtils.getDataQualityJarName());
dataQualityParameters.getSparkParameters().setMainJar(mainJar);
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java
index 66c157e82b..d576abe6d0 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java
@@ -120,7 +120,7 @@ public class SparkArgsUtils {
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
- args.add(mainJar.getRes());
+ args.add(mainJar.getResourceName());
}
String mainArgs = param.getMainArgs();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
index 750711502f..4da841b0d3 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
@@ -58,7 +57,6 @@ public class FlinkStreamTask extends FlinkTask implements
StreamTask {
throw new RuntimeException("flink task params is not valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
- setMainJarName();
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
}
@@ -80,14 +78,6 @@ public class FlinkStreamTask extends FlinkTask implements
StreamTask {
return command;
}
- @Override
- protected void setMainJarName() {
- ResourceInfo mainJar = flinkParameters.getMainJar();
- String resourceName = getResourceNameOfMainJar(mainJar);
- mainJar.setRes(resourceName);
- flinkParameters.setMainJar(mainJar);
- }
-
@Override
public AbstractParameters getParameters() {
return flinkParameters;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
index e8d29df4be..c274ee2594 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import java.util.HashMap;
import java.util.List;
import org.junit.jupiter.api.Assertions;
@@ -38,7 +39,7 @@ public class FlinkArgsUtilsTest {
flinkParameters.setParallelism(4);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setId(1);
- resourceInfo.setResourceName("job");
+ resourceInfo.setResourceName("/opt/job.jar");
resourceInfo.setRes("/opt/job.jar");
flinkParameters.setMainJar(resourceInfo);
flinkParameters.setMainClass("org.example.Main");
@@ -53,6 +54,9 @@ public class FlinkArgsUtilsTest {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskAppId("app-id");
taskExecutionContext.setExecutePath("/tmp/execution");
+ HashMap<String, String> map = new HashMap<>();
+ map.put("/opt/job.jar", "/opt/job.jar");
+ taskExecutionContext.setResources(map);
return taskExecutionContext;
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java
index cb39748fcc..7118d40667 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java
@@ -33,12 +33,12 @@ public class FlinkParametersTest {
Assertions.assertTrue(flinkParameters.getResourceFilesList().isEmpty());
ResourceInfo mainResource = new ResourceInfo();
- mainResource.setRes("testFlinkMain-1.0.0-SNAPSHOT.jar");
+ mainResource.setResourceName("/testFlinkMain-1.0.0-SNAPSHOT.jar");
flinkParameters.setMainJar(mainResource);
List<ResourceInfo> resourceInfos = new LinkedList<>();
ResourceInfo resourceInfo1 = new ResourceInfo();
- resourceInfo1.setRes("testFlinkParameters1.jar");
+ resourceInfo1.setResourceName("/testFlinkParameters1.jar");
resourceInfos.add(resourceInfo1);
flinkParameters.setResourceList(resourceInfos);
@@ -47,7 +47,7 @@ public class FlinkParametersTest {
Assertions.assertEquals(2, resourceFilesList.size());
ResourceInfo resourceInfo2 = new ResourceInfo();
- resourceInfo2.setRes("testFlinkParameters2.jar");
+ resourceInfo2.setResourceName("/testFlinkParameters2.jar");
resourceInfos.add(resourceInfo2);
flinkParameters.setResourceList(resourceInfos);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
index 45b10c4128..20f460857c 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
@@ -297,7 +297,7 @@ public class FlinkArgsUtils {
if (ProgramType.PYTHON == programType) {
args.add(FlinkConstants.FLINK_PYTHON);
}
- args.add(mainJar.getRes());
+
args.add(taskExecutionContext.getResources().get(mainJar.getResourceName()));
}
String mainArgs = flinkParameters.getMainArgs();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index 2e58ae0534..01862816b6 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
@@ -61,7 +60,6 @@ public class FlinkTask extends AbstractYarnTask {
throw new RuntimeException("flink task params is not valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
- setMainJarName();
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
}
@@ -83,20 +81,6 @@ public class FlinkTask extends AbstractYarnTask {
return command;
}
- @Override
- protected void setMainJarName() {
- if (flinkParameters.getProgramType() == ProgramType.SQL) {
- log.info("The current flink job type is SQL, will no need to set
main jar");
- return;
- }
-
- ResourceInfo mainJar = flinkParameters.getMainJar();
- String resourceName = getResourceNameOfMainJar(mainJar);
- mainJar.setRes(resourceName);
- flinkParameters.setMainJar(mainJar);
- log.info("Success set flink jar: {}", resourceName);
- }
-
@Override
public AbstractParameters getParameters() {
return flinkParameters;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
index 952e0c42ed..64d9931f22 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import java.util.HashMap;
import java.util.List;
import org.junit.jupiter.api.Assertions;
@@ -38,7 +39,7 @@ public class FlinkArgsUtilsTest {
flinkParameters.setParallelism(4);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setId(1);
- resourceInfo.setResourceName("job");
+ resourceInfo.setResourceName("/opt/job.jar");
resourceInfo.setRes("/opt/job.jar");
flinkParameters.setMainJar(resourceInfo);
flinkParameters.setMainClass("org.example.Main");
@@ -53,6 +54,9 @@ public class FlinkArgsUtilsTest {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskAppId("app-id");
taskExecutionContext.setExecutePath("/tmp/execution");
+ HashMap<String, String> map = new HashMap<>();
+ map.put("/opt/job.jar", "/opt/job.jar");
+ taskExecutionContext.setResources(map);
return taskExecutionContext;
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java
index 3e4a614acb..3f9594241b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java
@@ -33,12 +33,12 @@ public class FlinkParametersTest {
Assertions.assertTrue(flinkParameters.getResourceFilesList().isEmpty());
ResourceInfo mainResource = new ResourceInfo();
- mainResource.setRes("testFlinkMain-1.0.0-SNAPSHOT.jar");
+ mainResource.setResourceName("/testFlinkMain-1.0.0-SNAPSHOT.jar");
flinkParameters.setMainJar(mainResource);
List<ResourceInfo> resourceInfos = new LinkedList<>();
ResourceInfo resourceInfo1 = new ResourceInfo();
- resourceInfo1.setRes("testFlinkParameters1.jar");
+ resourceInfo1.setResourceName("/testFlinkParameters1.jar");
resourceInfos.add(resourceInfo1);
flinkParameters.setResourceList(resourceInfos);
@@ -47,7 +47,7 @@ public class FlinkParametersTest {
Assertions.assertEquals(2, resourceFilesList.size());
ResourceInfo resourceInfo2 = new ResourceInfo();
- resourceInfo2.setRes("testFlinkParameters2.jar");
+ resourceInfo2.setResourceName("/testFlinkParameters2.jar");
resourceInfos.add(resourceInfo2);
flinkParameters.setResourceList(resourceInfos);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
index 280ede0d7e..d05cbec1bd 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.java;
-import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.SINGLE_SLASH;
+import static
org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
import static
org.apache.dolphinscheduler.plugin.task.java.JavaConstants.JAVA_HOME_VAR;
import static
org.apache.dolphinscheduler.plugin.task.java.JavaConstants.PUBLIC_CLASS_NAME_REGEX;
@@ -93,9 +93,6 @@ public class JavaTask extends AbstractTask {
if (javaParameters == null || !javaParameters.checkParameters()) {
throw new TaskException("java task params is not valid");
}
- if (javaParameters.getRunType().equals(JavaConstants.RUN_TYPE_JAR)) {
- setMainJarName();
- }
log.info("Initialize java task params {}",
JSONUtils.toPrettyJsonString(javaParameters));
}
@@ -169,44 +166,25 @@ public class JavaTask extends AbstractTask {
return builder.toString();
}
- private void setMainJarName() {
- ResourceInfo mainJar = javaParameters.getMainJar();
- String resourceName = getResourceNameOfMainJar(mainJar);
- mainJar.setRes(resourceName);
- javaParameters.setMainJar(mainJar);
- }
-
/**
* Construct a shell command for the java -jar Run mode
*
* @return String
**/
protected String buildJarCommand() {
- String fullName = javaParameters.getMainJar().getResourceName();
- String mainJarName = fullName.substring(0, fullName.lastIndexOf('.'));
- mainJarName = mainJarName.substring(mainJarName.lastIndexOf('.') + 1)
+ ".jar";
+ String mainJarName =
taskRequest.getResources().get(javaParameters.getMainJar().getResourceName());
StringBuilder builder = new StringBuilder();
builder.append(getJavaCommandPath())
.append("java").append(" ")
.append(buildResourcePath()).append(" ")
.append("-jar").append(" ")
- .append(taskRequest.getExecutePath())
+ .append(taskRequest.getExecutePath()).append(FOLDER_SEPARATOR)
.append(mainJarName).append(" ")
.append(javaParameters.getMainArgs().trim()).append(" ")
.append(javaParameters.getJvmArgs().trim());
return builder.toString();
}
- private String getResourceNameOfMainJar(ResourceInfo mainJar) {
- if (null == mainJar) {
- throw new RuntimeException("The jar for the task is required.");
- }
- return mainJar.getId() == 0
- ? mainJar.getRes()
- // when update resource maybe has error
- : mainJar.getResourceName().replaceFirst(SINGLE_SLASH, "");
- }
-
@Override
public void cancel() throws TaskException {
// cancel process
@@ -300,10 +278,11 @@ public class JavaTask extends AbstractTask {
builder.append(" ").append(JavaConstants.CLASSPATH_CURRENT_DIR)
.append(JavaConstants.PATH_SEPARATOR)
.append(taskRequest.getExecutePath());
+ Map<String, String> resourceMap = taskRequest.getResources();
for (ResourceInfo info : javaParameters.getResourceFilesList()) {
builder.append(JavaConstants.PATH_SEPARATOR);
- builder.append(taskRequest.getExecutePath())
- .append(info.getResourceName());
+
builder.append(taskRequest.getExecutePath()).append(FOLDER_SEPARATOR)
+ .append(resourceMap.get(info.getResourceName()));
}
return builder.toString();
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
index c9193c397a..4e6bc30e1f 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
@@ -39,6 +39,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.HashMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -252,6 +253,11 @@ public class JavaTaskTest {
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(createJavaParametersObject(RUN_TYPE_JAVA)));
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/test/executepath");
taskExecutionContext.setTaskAppId("runJavaType");
+ HashMap<String, String> map = new HashMap<>();
+ map.put("/opt/share/jar/resource2.jar", "opt/share/jar/resource2.jar");
+ map.put("/opt/share/jar/main.jar", "opt/share/jar/main.jar");
+ map.put("/JavaTaskTest.java", "JavaTaskTest.java");
+ taskExecutionContext.setResources(map);
JavaTask javaTask = new JavaTask(taskExecutionContext);
javaTask.init();
return javaTask;
@@ -267,6 +273,10 @@ public class JavaTaskTest {
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(createJavaParametersObject(RUN_TYPE_JAR)));
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/test/executepath");
taskExecutionContext.setTaskAppId("runJavaType");
+ HashMap<String, String> map = new HashMap<>();
+ map.put("/opt/share/jar/resource2.jar", "opt/share/jar/resource2.jar");
+ map.put("/opt/share/jar/main.jar", "opt/share/jar/main.jar");
+ taskExecutionContext.setResources(map);
JavaTask javaTask = new JavaTask(taskExecutionContext);
javaTask.init();
return javaTask;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java
index 9147761068..070774adef 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java
@@ -22,6 +22,7 @@ import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAR;
import static
org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_NAME;
import static
org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_QUEUE;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
@@ -45,13 +46,13 @@ public class MapReduceArgsUtils {
* @param param param
* @return argument list
*/
- public static List<String> buildArgs(MapReduceParameters param) {
+ public static List<String> buildArgs(MapReduceParameters param,
TaskExecutionContext taskExecutionContext) {
List<String> args = new ArrayList<>();
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
args.add(JAR);
- args.add(mainJar.getRes());
+
args.add(taskExecutionContext.getResources().get(mainJar.getResourceName()));
}
ProgramType programType = param.getProgramType();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
index 8d9befd2d0..988711370f 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
@@ -22,7 +22,6 @@ import
org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
@@ -73,7 +72,6 @@ public class MapReduceTask extends AbstractYarnTask {
}
mapreduceParameters.setQueue(taskExecutionContext.getQueue());
- setMainJarName();
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap =
taskExecutionContext.getPrepareParamsMap();
@@ -101,7 +99,7 @@ public class MapReduceTask extends AbstractYarnTask {
args.add(MAPREDUCE_COMMAND);
// other parameters
- args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters));
+ args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters,
taskExecutionContext));
String command =
ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
@@ -110,15 +108,6 @@ public class MapReduceTask extends AbstractYarnTask {
return command;
}
- @Override
- protected void setMainJarName() {
- // main jar
- ResourceInfo mainJar = mapreduceParameters.getMainJar();
- String resourceName = getResourceNameOfMainJar(mainJar);
- mainJar.setRes(resourceName);
- mapreduceParameters.setMainJar(mainJar);
- }
-
@Override
public AbstractParameters getParameters() {
return mapreduceParameters;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index 975f8d8106..d9ef25b860 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -156,6 +156,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
} else {
seatunnelParameters.getResourceList().forEach(resourceInfo -> {
args.add(CONFIG_OPTIONS);
+ // TODO: Need further check for refactored resource center
// TODO Currently resourceName is `/xxx.sh`, it has more `/`
and needs to be optimized
args.add(resourceInfo.getResourceName().substring(1));
});
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index 16533397da..00a12430a3 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -84,9 +84,6 @@ public class SparkTask extends AbstractYarnTask {
}
sparkParameters.setQueue(taskExecutionContext.getQueue());
- if (sparkParameters.getProgramType() != ProgramType.SQL) {
- setMainJarName();
- }
log.info("Initialize spark task params {}",
JSONUtils.toPrettyJsonString(sparkParameters));
}
@@ -191,7 +188,7 @@ public class SparkTask extends AbstractYarnTask {
ResourceInfo mainJar = sparkParameters.getMainJar();
if (programType != ProgramType.SQL) {
- args.add(mainJar.getRes());
+
args.add(taskExecutionContext.getResources().get(mainJar.getResourceName()));
}
String mainArgs = sparkParameters.getMainArgs();
@@ -276,15 +273,6 @@ public class SparkTask extends AbstractYarnTask {
return script;
}
- @Override
- protected void setMainJarName() {
- // main jar
- ResourceInfo mainJar = sparkParameters.getMainJar();
- String resourceName = getResourceNameOfMainJar(mainJar);
- mainJar.setRes(resourceName);
- sparkParameters.setMainJar(mainJar);
- }
-
@Override
public AbstractParameters getParameters() {
return sparkParameters;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
index 42c1f5ccf1..ab164f2eb5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
@@ -33,12 +33,12 @@ public class SparkParametersTest {
Assertions.assertTrue(sparkParameters.getResourceFilesList().isEmpty());
ResourceInfo mainResource = new ResourceInfo();
- mainResource.setRes("testSparkMain-1.0.0-SNAPSHOT.jar\"");
+ mainResource.setResourceName("testSparkMain-1.0.0-SNAPSHOT.jar\"");
sparkParameters.setMainJar(mainResource);
LinkedList<ResourceInfo> resourceInfos = new LinkedList<>();
ResourceInfo resourceInfo1 = new ResourceInfo();
- resourceInfo1.setRes("testSparkParameters1.jar");
+ resourceInfo1.setResourceName("testSparkParameters1.jar");
resourceInfos.add(resourceInfo1);
sparkParameters.setResourceList(resourceInfos);
@@ -47,7 +47,7 @@ public class SparkParametersTest {
Assertions.assertEquals(2, resourceFilesList.size());
ResourceInfo resourceInfo2 = new ResourceInfo();
- resourceInfo2.setRes("testSparkParameters2.jar");
+ resourceInfo2.setResourceName("testSparkParameters2.jar");
resourceInfos.add(resourceInfo2);
sparkParameters.setResourceList(resourceInfos);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
index 3c2c41e787..3bc9be5959 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
@@ -22,6 +22,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import java.util.Collections;
+import java.util.HashMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -59,6 +60,9 @@ public class SparkTaskTest {
public void testBuildCommandWithSparkSubmit() {
String parameters = buildSparkParametersWithSparkSubmit();
TaskExecutionContext taskExecutionContext =
Mockito.mock(TaskExecutionContext.class);
+ HashMap<String, String> map = new HashMap<>();
+ map.put("/lib/dolphinscheduler-task-spark.jar",
"/lib/dolphinscheduler-task-spark.jar");
+ Mockito.when(taskExecutionContext.getResources()).thenReturn(map);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
SparkTask sparkTask = Mockito.spy(new SparkTask(taskExecutionContext));
sparkTask.init();
@@ -73,7 +77,7 @@ public class SparkTaskTest {
"--conf spark.executor.cores=2 " +
"--conf spark.executor.memory=1G " +
"--name spark " +
- "lib/dolphinscheduler-task-spark.jar");
+ "/lib/dolphinscheduler-task-spark.jar");
}
private String buildSparkParametersWithSparkSql() {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
index 72e2eca02d..d5a5d87c8b 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
@@ -87,10 +87,6 @@ public class SqoopTask extends AbstractYarnTask {
}
- @Override
- protected void setMainJarName() {
- }
-
@Override
public AbstractParameters getParameters() {
return sqoopParameters;
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-java-task-main-jar.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-java-task-main-jar.ts
index fed6ef3bc1..826d709fad 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-java-task-main-jar.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-java-task-main-jar.ts
@@ -59,12 +59,13 @@ export function useJavaTaskMainJar(model: { [field:
string]: any }): IJsonItem {
name: t('project.node.main_package'),
span: mainJarSpan,
props: {
+ checkable: true,
cascade: true,
showPath: true,
checkStrategy: 'child',
placeholder: t('project.node.main_package_tips'),
- keyField: 'id',
- labelField: 'fullName'
+ keyField: 'fullName',
+ labelField: 'name'
},
validate: {
trigger: ['input', 'blur'],
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts
index b617329c82..79ef7a2825 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts
@@ -61,12 +61,13 @@ export function useMainJar(model: { [field: string]: any
}): IJsonItem {
name: t('project.node.main_package'),
span: mainJarSpan,
props: {
+ checkable: true,
cascade: true,
showPath: true,
checkStrategy: 'child',
placeholder: t('project.node.main_package_tips'),
- keyField: 'id',
- labelField: 'fullName'
+ keyField: 'fullName',
+ labelField: 'name'
},
validate: {
trigger: ['input', 'blur'],
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
index 44143d0f9a..8b66a7e893 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
@@ -133,14 +133,12 @@ public class TaskExecutionCheckerUtils {
for (Pair<String, String> fileDownload : downloadFiles) {
try {
String fullName = fileDownload.getLeft();
- // we do not actually get & need tenantCode with this
implementation right now.
- String tenantCode = fileDownload.getRight();
- // TODO: Need a better way to get fileName because this
implementation is tricky.
- String fileName =
storageOperate.getResourceFileName(fullName);
+ String fileName = fileDownload.getRight();
logger.info("get resource file from path:{}", fullName);
long resourceDownloadStartTime =
System.currentTimeMillis();
- storageOperate.download(tenantCode, fullName,
execLocalPath + File.separator + fileName, false,
+
storageOperate.download(taskExecutionContext.getTenantCode(), fullName,
+ execLocalPath + File.separator + fileName, false,
true);
WorkerServerMetrics
.recordWorkerResourceDownloadTime(System.currentTimeMillis() -
resourceDownloadStartTime);