JeremyXin commented on code in PR #8453:
URL: https://github.com/apache/seatunnel/pull/8453#discussion_r1906862176


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java:
##########
@@ -91,13 +93,14 @@ private void assignSplit(int taskId) {
         ArrayList<FileSourceSplit> currentTaskSplits = new ArrayList<>();
         if (context.currentParallelism() == 1) {
             // if parallelism == 1, we should assign all the splits to reader
-            currentTaskSplits.addAll(pendingSplit);
+            currentTaskSplits.addAll(allSplit);
         } else {
-            // if parallelism > 1, according to hashCode of split's id to 
determine whether to
+            // if parallelism > 1, according to polling strategy to determine 
whether to
             // allocate the current task
-            for (FileSourceSplit fileSourceSplit : pendingSplit) {
+            assignCount.set(0);
+            for (FileSourceSplit fileSourceSplit : allSplit) {
                 int splitOwner =
-                        getSplitOwner(fileSourceSplit.splitId(), 
context.currentParallelism());
+                        getSplitOwner(assignCount.getAndIncrement(), 
context.currentParallelism());

Review Comment:
   @hailin0  I have found the reason for the duplication of file allocation 
under ParallelSource: `FileSourceSplitEnumerator` `allSplit` is a HashSet type 
attributes, lead to when different `ParallelSource` objects initialize 
`allSplit` with the `open` method, the order of the files in the `allSplit` 
collection is inconsistent. As a result, files allocated in different 
`ParallelSource` readers will be duplicated under the above algorithm. I have 
reproduced this situation in unit test, you are right.
   
   To solve this problem, I tried to change the type of `allSplit` to TreeSet 
and define the comparator to be based on splitid values. This ensures the same 
order of `allSplit` collection contents under different Parallelsources, thus 
solving the problem of duplicate file allocation.
   
   I have created a `ParallelSource` unit test under the 
Seatunnel-Translatation-Base project. You can see if there are still problems 
with the testing process. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to