This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 733e8bc support download the resources existed in process definition
which was old version (#2506)
733e8bc is described below
commit 733e8bcb0daaec64d694b98bd38736bb703216e8
Author: lgcareer <[email protected]>
AuthorDate: Fri Apr 24 14:09:36 2020 +0800
support download the resources existed in process definition which was old
version (#2506)
* fix #2442 and remove unavailable code
* revert verifyResourceName method
* Add ServiceException
* add ServiceExceptionTest
* update ServiceExceptionTest
* add ServiceExceptionTest in pom
* support download the resources existed in process definition which was
old version
---
.../master/consumer/TaskPriorityQueueConsumer.java | 44 +++++++++++-----------
1 file changed, 23 insertions(+), 21 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index cdd9ff2..480d665 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -48,13 +48,13 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
-import java.util.HashSet;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.dolphinscheduler.common.Constants.*;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/**
* TaskUpdateQueue consumer
@@ -328,36 +328,38 @@ public class TaskPriorityQueueConsumer extends Thread{
return false;
}
-
/**
- * create project resource files
+ * get resource full name list
*/
- private List<String> getResourceFullNames(TaskNode taskNode){
-
- Set<Integer> resourceIdsSet = new HashSet<>();
+ private List<String> getResourceFullNames(TaskNode taskNode) {
+ List<String> resourceFullNameList = new ArrayList<>();
AbstractParameters baseParam =
TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) {
List<ResourceInfo> projectResourceFiles =
baseParam.getResourceFilesList();
if (projectResourceFiles != null) {
- Stream<Integer> resourceInfotream =
projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId());
-
resourceIdsSet.addAll(resourceInfotream.collect(Collectors.toSet()));
- }
- }
+ // filter the resources that the resource id equals 0
+ Set<ResourceInfo> oldVersionResources =
projectResourceFiles.stream().filter(t -> t.getId() ==
0).collect(Collectors.toSet());
+ if (CollectionUtils.isNotEmpty(oldVersionResources)) {
+
resourceFullNameList.addAll(oldVersionResources.stream().map(resource ->
resource.getRes()).collect(Collectors.toSet()));
+ }
- if (CollectionUtils.isEmpty(resourceIdsSet)){
- return null;
- }
-
- Integer[] resourceIds = resourceIdsSet.toArray(new
Integer[resourceIdsSet.size()]);
+ // get the resource id in order to get the resource names in
batch
+ Stream<Integer> resourceIdStream =
projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId());
+ Set<Integer> resourceIdsSet =
resourceIdStream.collect(Collectors.toSet());
- List<Resource> resources =
processService.listResourceByIds(resourceIds);
+ if (CollectionUtils.isNotEmpty(resourceIdsSet)) {
+ Integer[] resourceIds = resourceIdsSet.toArray(new
Integer[resourceIdsSet.size()]);
- List<String> resourceFullNames = resources.stream()
- .map(resourceInfo -> resourceInfo.getFullName())
- .collect(Collectors.toList());
+ List<Resource> resources =
processService.listResourceByIds(resourceIds);
+ resourceFullNameList.addAll(resources.stream()
+ .map(resourceInfo -> resourceInfo.getFullName())
+ .collect(Collectors.toList()));
+ }
+ }
+ }
- return resourceFullNames;
+ return resourceFullNameList;
}
}