This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 2.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0-prepare by this push:
     new 9942d73  [Fix-6478] [Server] Fix the lack of scheduling time in 
complement parallelism mode (#6491) (#6498)
9942d73 is described below

commit 9942d73db6664a9dc6c9721c9d6447b135b96842
Author: Kirs <[email protected]>
AuthorDate: Mon Oct 11 20:24:29 2021 +0800

    [Fix-6478] [Server] Fix the lack of scheduling time in complement 
parallelism mode (#6491) (#6498)
    
    * Fix the lack of scheduling time in complement parallelism mode
---
 .../api/service/impl/ExecutorServiceImpl.java      |  6 ++---
 .../api/service/ExecutorServiceTest.java           | 31 ++++++++++++++++++++++
 .../service/quartz/cron/CronUtils.java             |  1 +
 3 files changed, 35 insertions(+), 3 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index b910c4b..5042a03 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -604,15 +604,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                         createCount = Math.min(listDate.size(), 
expectedParallelismNumber);
                     }
                     logger.info("In parallel mode, current 
expectedParallelismNumber:{}", createCount);
+
+                    listDate.addLast(end);
                     int chunkSize = listDate.size() / createCount;
 
                     for (int i = 0; i < createCount; i++) {
                         int rangeStart = i == 0 ? i : (i * chunkSize);
                         int rangeEnd = i == createCount - 1 ? listDate.size() 
- 1
                                 : rangeStart + chunkSize;
-                        if (rangeEnd == listDate.size()) {
-                            rangeEnd = listDate.size() - 1;
-                        }
+
                         cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, 
DateUtils.dateToString(listDate.get(rangeStart)));
                         cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, 
DateUtils.dateToString(listDate.get(rangeEnd)));
                         
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index e308f58..9766c61 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -58,12 +58,15 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * executor service 2 test
  */
 @RunWith(MockitoJUnitRunner.Silent.class)
 public class ExecutorServiceTest {
+    private static final Logger logger = 
LoggerFactory.getLogger(ExecutorServiceTest.class);
 
     @InjectMocks
     private ExecutorServiceImpl executorService;
@@ -326,4 +329,32 @@ public class ExecutorServiceTest {
         result.put(Constants.STATUS, Status.SUCCESS);
         return result;
     }
+
+    @Test
+    public void testCreateComplementToParallel() {
+        List<String> result = new ArrayList<>();
+        int expectedParallelismNumber = 3;
+        LinkedList<Integer> listDate = new LinkedList<>();
+        listDate.add(0);
+        listDate.add(1);
+        listDate.add(2);
+        listDate.add(3);
+
+        int createCount = Math.min(listDate.size(), expectedParallelismNumber);
+        logger.info("In parallel mode, current expectedParallelismNumber:{}", 
createCount);
+
+        listDate.addLast(4);
+        int chunkSize = listDate.size() / createCount;
+        for (int i = 0; i < createCount; i++) {
+            int rangeStart = i == 0 ? i : (i * chunkSize);
+            int rangeEnd = i == createCount - 1 ? listDate.size() - 1 : 
rangeStart + chunkSize;
+            logger.info("rangeStart:{}, rangeEnd:{}",rangeStart, rangeEnd);
+            result.add(listDate.get(rangeStart) + "," + 
listDate.get(rangeEnd));
+        }
+
+        Assert.assertEquals("0,1", result.get(0));
+        Assert.assertEquals("1,2", result.get(1));
+        Assert.assertEquals("2,4", result.get(2));
+
+    }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
index f23da26..f195d62 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
@@ -190,6 +190,7 @@ public class CronUtils {
             return result;
         }
 
+        // support left closed and right open time interval (startDate <= N < 
endDate)
         Date from = new Date(startTime.getTime() - 
Constants.SECOND_TIME_MILLIS);
         Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS);
 

Reply via email to