Copilot commented on code in PR #3272:
URL: https://github.com/apache/fluss/pull/3272#discussion_r3207333674


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -942,21 +943,23 @@ private void assignPendingSplits(Set<Integer> 
pendingReaders) {
     @VisibleForTesting
     protected int getSplitOwner(SourceSplitBase split) {
         TableBucket tableBucket = split.getTableBucket();
-        int startIndex =
-                tableBucket.getPartitionId() == null
-                        ? 0
-                        : ((tableBucket.getPartitionId().hashCode() * 31) & 
0x7FFFFFFF)
-                                % context.currentParallelism();
+        int numChannels = context.currentParallelism();
 
         // super hack logic, if the bucket is -1, it means the split is
         // for bucket unaware, like paimon unaware bucket log table,
         // we use hash split id to get the split owner
         // todo: refactor the split assign logic
         if (split.isLakeSplit() && tableBucket.getBucket() == -1) {
-            return (split.splitId().hashCode() & 0x7FFFFFFF) % 
context.currentParallelism();
+            return (split.splitId().hashCode() & 0x7FFFFFFF) % numChannels;
         }
 
-        return (startIndex + tableBucket.getBucket()) % 
context.currentParallelism();
+        Long partitionId = tableBucket.getPartitionId();
+        int bucketId = tableBucket.getBucket();
+        if (ChannelComputer.shouldCombinePartitionInSharding(
+                partitionId != null, tableInfo.getNumBuckets(), numChannels)) {
+            return ChannelComputer.select(partitionId, bucketId, numChannels);
+        }
+        return ChannelComputer.select(bucketId, numChannels);

Review Comment:
   This changes split ownership for partitioned tables when numBuckets % 
parallelism == 0 (it becomes bucket-only). The existing unit test 
FlinkSourceEnumeratorTest#testGetSplitOwner currently asserts different owners 
for different partitionIds with the same bucket (e.g., partition 1 vs 2, bucket 
0) and will fail under the new behavior. Please update/add assertions to cover 
both cases: (1) divisible -> same owner for same bucket across partitions, and 
(2) non-divisible -> include partition in sharding to avoid skew.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -942,21 +943,23 @@ private void assignPendingSplits(Set<Integer> 
pendingReaders) {
     @VisibleForTesting
     protected int getSplitOwner(SourceSplitBase split) {
         TableBucket tableBucket = split.getTableBucket();
-        int startIndex =
-                tableBucket.getPartitionId() == null
-                        ? 0
-                        : ((tableBucket.getPartitionId().hashCode() * 31) & 
0x7FFFFFFF)
-                                % context.currentParallelism();
+        int numChannels = context.currentParallelism();
 
         // super hack logic, if the bucket is -1, it means the split is
         // for bucket unaware, like paimon unaware bucket log table,
         // we use hash split id to get the split owner
         // todo: refactor the split assign logic
         if (split.isLakeSplit() && tableBucket.getBucket() == -1) {
-            return (split.splitId().hashCode() & 0x7FFFFFFF) % 
context.currentParallelism();
+            return (split.splitId().hashCode() & 0x7FFFFFFF) % numChannels;
         }
 
-        return (startIndex + tableBucket.getBucket()) % 
context.currentParallelism();
+        Long partitionId = tableBucket.getPartitionId();
+        int bucketId = tableBucket.getBucket();
+        if (ChannelComputer.shouldCombinePartitionInSharding(
+                partitionId != null, tableInfo.getNumBuckets(), numChannels)) {
+            return ChannelComputer.select(partitionId, bucketId, numChannels);

Review Comment:
   shouldCombinePartitionInSharding is intended to be driven by whether the 
*table* is partitioned; passing (partitionId != null) couples the decision to 
per-split state and can hide bugs where a partitioned split accidentally has a 
null partitionId. Consider using the existing isPartitioned flag here, and if 
the table is partitioned and the combine-with-partition path is chosen, fail 
fast with a clear null check before calling select(partitionId, ...).
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -32,6 +32,7 @@
 import org.apache.fluss.flink.lake.LakeSplitGenerator;
 import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
+import org.apache.fluss.flink.sink.ChannelComputer;

Review Comment:
   FlinkSourceEnumerator now depends on 
org.apache.fluss.flink.sink.ChannelComputer just to reuse static sharding 
helpers. This introduces a source->sink package dependency that can be 
confusing and makes future refactors harder. Consider extracting the static 
sharding utilities (shouldCombinePartitionInSharding/select) into a neutral 
flink-common utility class/package so both source and sink can depend on it 
without crossing layers.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -942,21 +943,23 @@ private void assignPendingSplits(Set<Integer> 
pendingReaders) {
     @VisibleForTesting
     protected int getSplitOwner(SourceSplitBase split) {
         TableBucket tableBucket = split.getTableBucket();

Review Comment:
   The Javadoc for getSplitOwner() (immediately above this method) still states 
that, for partitioned tables, bucket assignment is always offset/round-robin 
based on partition id. With the new conditional routing (bucket-only when 
numBuckets % parallelism == 0), that contract is no longer accurate—please 
update the Javadoc to reflect when partition is (and isn’t) part of the channel 
computation.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/ChannelComputer.java:
##########
@@ -59,6 +59,11 @@ static int select(String partitionName, int bucket, int 
numChannels) {
         return (startChannel + bucket) % numChannels;
     }
 
+    static int select(Long partitionId, int bucket, int numChannels) {
+        int startChannel = ((partitionId.hashCode() * 31) & 0x7FFFFFFF) % 
numChannels;
+        return (startChannel + bucket) % numChannels;
+    }

Review Comment:
   select(Long partitionId, ...) accepts a boxed Long but immediately 
dereferences it; passing null will NPE. Since null isn’t supported, consider 
either switching the parameter type to primitive long (and unbox at the call 
site with an explicit null check) or adding an explicit non-null check here to 
fail with a clearer message.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -942,21 +943,23 @@ private void assignPendingSplits(Set<Integer> 
pendingReaders) {
     @VisibleForTesting
     protected int getSplitOwner(SourceSplitBase split) {
         TableBucket tableBucket = split.getTableBucket();
-        int startIndex =
-                tableBucket.getPartitionId() == null
-                        ? 0
-                        : ((tableBucket.getPartitionId().hashCode() * 31) & 
0x7FFFFFFF)
-                                % context.currentParallelism();
+        int numChannels = context.currentParallelism();
 
         // super hack logic, if the bucket is -1, it means the split is
         // for bucket unaware, like paimon unaware bucket log table,
         // we use hash split id to get the split owner
         // todo: refactor the split assign logic
         if (split.isLakeSplit() && tableBucket.getBucket() == -1) {
-            return (split.splitId().hashCode() & 0x7FFFFFFF) % 
context.currentParallelism();
+            return (split.splitId().hashCode() & 0x7FFFFFFF) % numChannels;
         }
 
-        return (startIndex + tableBucket.getBucket()) % 
context.currentParallelism();
+        Long partitionId = tableBucket.getPartitionId();
+        int bucketId = tableBucket.getBucket();
+        if (ChannelComputer.shouldCombinePartitionInSharding(
+                partitionId != null, tableInfo.getNumBuckets(), numChannels)) {
+            return ChannelComputer.select(partitionId, bucketId, numChannels);
+        }
+        return ChannelComputer.select(bucketId, numChannels);

Review Comment:
   getSplitOwner() now dereferences tableInfo.getNumBuckets(), but tableInfo is 
initialized lazily in start(). Calling getSplitOwner before start() (as 
FlinkSourceEnumeratorTest#testGetSplitOwner currently does) will throw an NPE 
with an unclear message. Please add an explicit precondition (e.g., 
checkState/tableInfo non-null) or restructure so numBuckets is available 
without relying on start(), to avoid surprising NPEs in tests and future call 
sites.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to