This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6d1874ecf2 [Fix-17831] Fix incorrect parallelism num when complement 
data in parallel excution mode (#17853)
6d1874ecf2 is described below

commit 6d1874ecf2531b969f38b3c1ca2bdf18549b20da
Author: xiangzihao <[email protected]>
AuthorDate: Thu Jan 8 15:36:49 2026 +0800

    [Fix-17831] Fix incorrect parallelism num when complement data in parallel 
excution mode (#17853)
---
 .../workflow/BackfillWorkflowExecutorDelegate.java | 29 ++++++++++++++++++++--
 1 file changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
index 3f695e810c..0c684f2876 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
@@ -35,6 +35,7 @@ import 
org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.time.ZonedDateTime;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -95,9 +96,9 @@ public class BackfillWorkflowExecutorDelegate implements 
IExecutorDelegate<Backf
             expectedParallelismNumber = listDate.size();
         }
 
-        log.info("In parallel mode, current expectedParallelismNumber:{}", 
expectedParallelismNumber);
+        log.info("In parallel mode, current expectedParallelismNumber: {}", 
expectedParallelismNumber);
         final List<Integer> workflowInstanceIdList = Lists.newArrayList();
-        for (List<ZonedDateTime> stringDate : Lists.partition(listDate, 
expectedParallelismNumber)) {
+        for (List<ZonedDateTime> stringDate : splitDateTime(listDate, 
expectedParallelismNumber)) {
             final Integer workflowInstanceId = doBackfillWorkflow(
                     backfillWorkflowDTO,
                     
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
@@ -106,6 +107,30 @@ public class BackfillWorkflowExecutorDelegate implements 
IExecutorDelegate<Backf
         return workflowInstanceIdList;
     }
 
+    /**
+     * split date time list into n parts, the last part may be larger if not 
divisible
+     */
+    private List<List<ZonedDateTime>> splitDateTime(List<ZonedDateTime> 
dateTimeList, int numParts) {
+        List<List<ZonedDateTime>> result = new ArrayList<>();
+        int n = dateTimeList.size();
+
+        int baseSize = n / numParts;
+        int remainder = n % numParts;
+
+        int start = 0;
+        for (int i = 0; i < numParts; i++) {
+            int currentSize = baseSize;
+            if (i == numParts - 1) {
+                currentSize += remainder;
+            }
+            List<ZonedDateTime> part = dateTimeList.subList(start, start + 
currentSize);
+            result.add(part);
+            start += currentSize;
+        }
+
+        return result;
+    }
+
     private Integer doBackfillWorkflow(final BackfillWorkflowDTO 
backfillWorkflowDTO,
                                        final List<String> backfillTimeList) {
         final Server masterServer = 
registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);

Reply via email to