JeremyXin commented on code in PR #8453:
URL: https://github.com/apache/seatunnel/pull/8453#discussion_r1906862176
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java:
##########
@@ -91,13 +93,14 @@ private void assignSplit(int taskId) {
ArrayList<FileSourceSplit> currentTaskSplits = new ArrayList<>();
if (context.currentParallelism() == 1) {
// if parallelism == 1, we should assign all the splits to reader
- currentTaskSplits.addAll(pendingSplit);
+ currentTaskSplits.addAll(allSplit);
} else {
- // if parallelism > 1, according to hashCode of split's id to
determine whether to
+ // if parallelism > 1, according to polling strategy to determine
whether to
// allocate the current task
- for (FileSourceSplit fileSourceSplit : pendingSplit) {
+ assignCount.set(0);
+ for (FileSourceSplit fileSourceSplit : allSplit) {
int splitOwner =
- getSplitOwner(fileSourceSplit.splitId(),
context.currentParallelism());
+ getSplitOwner(assignCount.getAndIncrement(),
context.currentParallelism());
Review Comment:
@hailin0 I have found the reason for the duplication of file allocation
under ParallelSource: `FileSourceSplitEnumerator` `allSplit` is a HashSet type
attributes, lead to when different `ParallelSource` objects initialize
`allSplit` with the `open` method, the order of the files in the `allSplit`
collection is inconsistent. As a result, files allocated in different
`ParallelSource` readers will be duplicated under the above algorithm. I have
reproduced this situation in unit test, you are right.
To solve this problem, I tried to change the type of `allSplit` to TreeSet
and define the comparator to be based on splitid values. This ensures the same
order of `allSplit` collection contents under different Parallelsources, thus
solving the problem of duplicate file allocation.
I have created a `ParallelSource` unit test under the
Seatunnel-Translatation-Base project. You can see if there are still problems
with the testing process.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]