This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 680aad70c3 [core] Start channel should not be negative when
partition.hashCode() equals to Integer.MIN_VALUE (#5623)
680aad70c3 is described below
commit 680aad70c3f50cb9293f91fb75eb773389475320
Author: tsreaper <[email protected]>
AuthorDate: Mon May 19 17:36:42 2025 +0800
[core] Start channel should not be negative when partition.hashCode()
equals to Integer.MIN_VALUE (#5623)
---
.../org/apache/paimon/table/sink/ChannelComputer.java | 17 +++++++++++++++--
.../postpone/PostponeBucketCompactSplitSource.java | 9 ++-------
.../flink/sink/PostponeBucketChannelComputer.java | 5 +++--
3 files changed, 20 insertions(+), 11 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java
index 52f1dda36b..6bac8b3a8a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/ChannelComputer.java
@@ -35,14 +35,27 @@ public interface ChannelComputer<T> extends Serializable {
int channel(T record);
static int select(BinaryRow partition, int bucket, int numChannels) {
- int startChannel = Math.abs(partition.hashCode()) % numChannels;
- return (startChannel + bucket) % numChannels;
+ return (startChannel(partition, numChannels) + bucket) % numChannels;
}
static int select(int bucket, int numChannels) {
return bucket % numChannels;
}
+ static int startChannel(BinaryRow partition, int numChannels) {
+ int hashCode = partition.hashCode();
+ if (hashCode == Integer.MIN_VALUE) {
+ hashCode = Integer.MAX_VALUE;
+ }
+ // Due to backward compatibility (Flink users may recover from state),
+ // we need to use this formula.
+ // However, if hashCode equals Integer.MIN_VALUE,
+ // Math.abs will still return Integer.MIN_VALUE,
+ // and this formula will produce a negative integer.
+ // So we specially handle this case above.
+ return Math.abs(hashCode) % numChannels;
+ }
+
static <T, R> ChannelComputer<R> transform(
ChannelComputer<T> input, SerializableFunction<R, T> converter) {
return new ChannelComputer<R>() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
index 1cab64c2ca..53bc7a50a7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -186,13 +186,8 @@ public class PostponeBucketCompactSplitSource extends
AbstractNonCoordinatedSour
matcher.find(),
"Data file name %s does not match the pattern. This is
unexpected.",
fileName);
-
- // use long to avoid overflow
- long subtaskId = Long.parseLong(matcher.group(1));
- // send records written by the same subtask to the same subtask
- // to make sure we replay the written records in the exact order
- long channel = (Math.abs(dataSplit.partition().hashCode()) +
subtaskId) % numChannels;
- return (int) channel;
+ return ChannelComputer.select(
+ dataSplit.partition(), Integer.parseInt(matcher.group(1)),
numChannels);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketChannelComputer.java
index ca2bc17a0e..af56fef8b7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketChannelComputer.java
@@ -50,7 +50,8 @@ public class PostponeBucketChannelComputer implements
ChannelComputer<InternalRo
@Override
public int channel(InternalRow record) {
extractor.setRecord(record);
- return Math.abs(extractor.partition().hashCode() +
extractor.trimmedPrimaryKey().hashCode())
- % numChannels;
+ return Math.abs(
+ (extractor.partition().hashCode() +
extractor.trimmedPrimaryKey().hashCode())
+ % numChannels);
}
}