This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7f3b4be075 [BUG][Connector-V2] Iceberg source lost data with
parallelism option (#5732)
7f3b4be075 is described below
commit 7f3b4be075e8e00f2f94ed7f5b69371befeda374
Author: Dennis <[email protected]>
AuthorDate: Sun Oct 29 00:20:39 2023 +0900
[BUG][Connector-V2] Iceberg source lost data with parallelism option (#5732)
---
.../seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
index 199c56eb15..26a971cc2e 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
@@ -108,7 +108,7 @@ public abstract class AbstractSplitEnumerator
private void addPendingSplits(Collection<IcebergFileScanTaskSplit>
newSplits) {
int numReaders = context.currentParallelism();
for (IcebergFileScanTaskSplit newSplit : newSplits) {
- int ownerReader = newSplit.splitId().hashCode() % numReaders;
+ int ownerReader = (newSplit.splitId().hashCode() &
Integer.MAX_VALUE) % numReaders;
pendingSplits.computeIfAbsent(ownerReader, r -> new
ArrayList<>()).add(newSplit);
log.info("Assigning {} to {} reader.", newSplit, ownerReader);
}