This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch 2.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0-prepare by this push:
new 9942d73 [Fix-6478] [Server] Fix the lack of scheduling time in
complement parallelism mode (#6491) (#6498)
9942d73 is described below
commit 9942d73db6664a9dc6c9721c9d6447b135b96842
Author: Kirs <[email protected]>
AuthorDate: Mon Oct 11 20:24:29 2021 +0800
[Fix-6478] [Server] Fix the lack of scheduling time in complement
parallelism mode (#6491) (#6498)
* Fix the lack of scheduling time in complement parallelism mode
---
.../api/service/impl/ExecutorServiceImpl.java | 6 ++---
.../api/service/ExecutorServiceTest.java | 31 ++++++++++++++++++++++
.../service/quartz/cron/CronUtils.java | 1 +
3 files changed, 35 insertions(+), 3 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index b910c4b..5042a03 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -604,15 +604,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
createCount = Math.min(listDate.size(),
expectedParallelismNumber);
}
logger.info("In parallel mode, current
expectedParallelismNumber:{}", createCount);
+
+ listDate.addLast(end);
int chunkSize = listDate.size() / createCount;
for (int i = 0; i < createCount; i++) {
int rangeStart = i == 0 ? i : (i * chunkSize);
int rangeEnd = i == createCount - 1 ? listDate.size()
- 1
: rangeStart + chunkSize;
- if (rangeEnd == listDate.size()) {
- rangeEnd = listDate.size() - 1;
- }
+
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(listDate.get(rangeStart)));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(listDate.get(rangeEnd)));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index e308f58..9766c61 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -58,12 +58,15 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* executor service 2 test
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class ExecutorServiceTest {
+ private static final Logger logger =
LoggerFactory.getLogger(ExecutorServiceTest.class);
@InjectMocks
private ExecutorServiceImpl executorService;
@@ -326,4 +329,32 @@ public class ExecutorServiceTest {
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}
+
+ @Test
+ public void testCreateComplementToParallel() {
+ List<String> result = new ArrayList<>();
+ int expectedParallelismNumber = 3;
+ LinkedList<Integer> listDate = new LinkedList<>();
+ listDate.add(0);
+ listDate.add(1);
+ listDate.add(2);
+ listDate.add(3);
+
+ int createCount = Math.min(listDate.size(), expectedParallelismNumber);
+ logger.info("In parallel mode, current expectedParallelismNumber:{}",
createCount);
+
+ listDate.addLast(4);
+ int chunkSize = listDate.size() / createCount;
+ for (int i = 0; i < createCount; i++) {
+ int rangeStart = i == 0 ? i : (i * chunkSize);
+ int rangeEnd = i == createCount - 1 ? listDate.size() - 1 :
rangeStart + chunkSize;
+ logger.info("rangeStart:{}, rangeEnd:{}",rangeStart, rangeEnd);
+ result.add(listDate.get(rangeStart) + "," +
listDate.get(rangeEnd));
+ }
+
+ Assert.assertEquals("0,1", result.get(0));
+ Assert.assertEquals("1,2", result.get(1));
+ Assert.assertEquals("2,4", result.get(2));
+
+ }
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
index f23da26..f195d62 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
@@ -190,6 +190,7 @@ public class CronUtils {
return result;
}
+ // support left closed and right open time interval (startDate <= N <
endDate)
Date from = new Date(startTime.getTime() -
Constants.SECOND_TIME_MILLIS);
Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS);