Copilot commented on code in PR #3272:
URL: https://github.com/apache/fluss/pull/3272#discussion_r3207333674
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -942,21 +943,23 @@ private void assignPendingSplits(Set<Integer>
pendingReaders) {
@VisibleForTesting
protected int getSplitOwner(SourceSplitBase split) {
TableBucket tableBucket = split.getTableBucket();
- int startIndex =
- tableBucket.getPartitionId() == null
- ? 0
- : ((tableBucket.getPartitionId().hashCode() * 31) &
0x7FFFFFFF)
- % context.currentParallelism();
+ int numChannels = context.currentParallelism();
// super hack logic, if the bucket is -1, it means the split is
// for bucket unaware, like paimon unaware bucket log table,
// we use hash split id to get the split owner
// todo: refactor the split assign logic
if (split.isLakeSplit() && tableBucket.getBucket() == -1) {
- return (split.splitId().hashCode() & 0x7FFFFFFF) %
context.currentParallelism();
+ return (split.splitId().hashCode() & 0x7FFFFFFF) % numChannels;
}
- return (startIndex + tableBucket.getBucket()) %
context.currentParallelism();
+ Long partitionId = tableBucket.getPartitionId();
+ int bucketId = tableBucket.getBucket();
+ if (ChannelComputer.shouldCombinePartitionInSharding(
+ partitionId != null, tableInfo.getNumBuckets(), numChannels)) {
+ return ChannelComputer.select(partitionId, bucketId, numChannels);
+ }
+ return ChannelComputer.select(bucketId, numChannels);
Review Comment:
This changes split ownership for partitioned tables when numBuckets %
parallelism == 0 (it becomes bucket-only). The existing unit test
FlinkSourceEnumeratorTest#testGetSplitOwner currently asserts different owners
for different partitionIds with the same bucket (e.g., partition 1 vs 2, bucket
0) and will fail under the new behavior. Please update/add assertions to cover
both cases: (1) divisible -> same owner for same bucket across partitions, and
(2) non-divisible -> include partition in sharding to avoid skew.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -942,21 +943,23 @@ private void assignPendingSplits(Set<Integer>
pendingReaders) {
@VisibleForTesting
protected int getSplitOwner(SourceSplitBase split) {
TableBucket tableBucket = split.getTableBucket();
- int startIndex =
- tableBucket.getPartitionId() == null
- ? 0
- : ((tableBucket.getPartitionId().hashCode() * 31) &
0x7FFFFFFF)
- % context.currentParallelism();
+ int numChannels = context.currentParallelism();
// super hack logic, if the bucket is -1, it means the split is
// for bucket unaware, like paimon unaware bucket log table,
// we use hash split id to get the split owner
// todo: refactor the split assign logic
if (split.isLakeSplit() && tableBucket.getBucket() == -1) {
- return (split.splitId().hashCode() & 0x7FFFFFFF) %
context.currentParallelism();
+ return (split.splitId().hashCode() & 0x7FFFFFFF) % numChannels;
}
- return (startIndex + tableBucket.getBucket()) %
context.currentParallelism();
+ Long partitionId = tableBucket.getPartitionId();
+ int bucketId = tableBucket.getBucket();
+ if (ChannelComputer.shouldCombinePartitionInSharding(
+ partitionId != null, tableInfo.getNumBuckets(), numChannels)) {
+ return ChannelComputer.select(partitionId, bucketId, numChannels);
Review Comment:
shouldCombinePartitionInSharding is intended to be driven by whether the
*table* is partitioned; passing (partitionId != null) couples the decision to
per-split state and can hide bugs where a partitioned split accidentally has a
null partitionId. Consider using the existing isPartitioned flag here, and if
the table is partitioned and the combine-with-partition path is chosen, fail
fast with a clear null check before calling select(partitionId, ...).
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -32,6 +32,7 @@
import org.apache.fluss.flink.lake.LakeSplitGenerator;
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
+import org.apache.fluss.flink.sink.ChannelComputer;
Review Comment:
FlinkSourceEnumerator now depends on
org.apache.fluss.flink.sink.ChannelComputer just to reuse static sharding
helpers. This introduces a source->sink package dependency that can be
confusing and makes future refactors harder. Consider extracting the static
sharding utilities (shouldCombinePartitionInSharding/select) into a neutral
flink-common utility class/package so both source and sink can depend on it
without crossing layers.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -942,21 +943,23 @@ private void assignPendingSplits(Set<Integer>
pendingReaders) {
@VisibleForTesting
protected int getSplitOwner(SourceSplitBase split) {
TableBucket tableBucket = split.getTableBucket();
Review Comment:
The Javadoc for getSplitOwner() (immediately above this method) still states
that, for partitioned tables, bucket assignment is always offset/round-robin
based on partition id. With the new conditional routing (bucket-only when
numBuckets % parallelism == 0), that contract is no longer accurate—please
update the Javadoc to reflect when partition is (and isn’t) part of the channel
computation.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/ChannelComputer.java:
##########
@@ -59,6 +59,11 @@ static int select(String partitionName, int bucket, int
numChannels) {
return (startChannel + bucket) % numChannels;
}
+ static int select(Long partitionId, int bucket, int numChannels) {
+ int startChannel = ((partitionId.hashCode() * 31) & 0x7FFFFFFF) %
numChannels;
+ return (startChannel + bucket) % numChannels;
+ }
Review Comment:
select(Long partitionId, ...) accepts a boxed Long but immediately
dereferences it; passing null will NPE. Since null isn’t supported, consider
either switching the parameter type to primitive long (and unbox at the call
site with an explicit null check) or adding an explicit non-null check here to
fail with a clearer message.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -942,21 +943,23 @@ private void assignPendingSplits(Set<Integer>
pendingReaders) {
@VisibleForTesting
protected int getSplitOwner(SourceSplitBase split) {
TableBucket tableBucket = split.getTableBucket();
- int startIndex =
- tableBucket.getPartitionId() == null
- ? 0
- : ((tableBucket.getPartitionId().hashCode() * 31) &
0x7FFFFFFF)
- % context.currentParallelism();
+ int numChannels = context.currentParallelism();
// super hack logic, if the bucket is -1, it means the split is
// for bucket unaware, like paimon unaware bucket log table,
// we use hash split id to get the split owner
// todo: refactor the split assign logic
if (split.isLakeSplit() && tableBucket.getBucket() == -1) {
- return (split.splitId().hashCode() & 0x7FFFFFFF) %
context.currentParallelism();
+ return (split.splitId().hashCode() & 0x7FFFFFFF) % numChannels;
}
- return (startIndex + tableBucket.getBucket()) %
context.currentParallelism();
+ Long partitionId = tableBucket.getPartitionId();
+ int bucketId = tableBucket.getBucket();
+ if (ChannelComputer.shouldCombinePartitionInSharding(
+ partitionId != null, tableInfo.getNumBuckets(), numChannels)) {
+ return ChannelComputer.select(partitionId, bucketId, numChannels);
+ }
+ return ChannelComputer.select(bucketId, numChannels);
Review Comment:
getSplitOwner() now dereferences tableInfo.getNumBuckets(), but tableInfo is
initialized lazily in start(). Calling getSplitOwner before start() (as
FlinkSourceEnumeratorTest#testGetSplitOwner currently does) will throw an NPE
with an unclear message. Please add an explicit precondition (e.g.,
checkState/tableInfo non-null) or restructure so numBuckets is available
without relying on start(), to avoid surprising NPEs in tests and future call
sites.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]