Copilot commented on code in PR #3288:
URL: https://github.com/apache/fluss/pull/3288#discussion_r3212778711


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java:
##########
@@ -133,6 +134,20 @@ public FlussSourceBuilder<OUT> 
setScanPartitionDiscoveryIntervalMs(
         return this;
     }
 
+    /**
+     * Sets the maximum number of splits assigned to a reader in one 
assignment request.
+     *
+     * <p>If not specified, the default value from {@link
+     * FlinkConnectorOptions#SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE} is used.
+     *
+     * @param splitPerAssignmentBatchSize maximum splits per assignment request
+     * @return this builder
+     */
+    public FlussSourceBuilder<OUT> setSplitPerAssignmentBatchSize(int 
splitPerAssignmentBatchSize) {
+        this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize;
+        return this;
+    }

Review Comment:
   `splitPerAssignmentBatchSize` is accepted without validation in the builder. 
If a user sets 0/negative here, the failure will only surface later when the 
enumerator is constructed. Validate this parameter eagerly (e.g., require > 0) 
to provide a clearer, earlier error to DataStream API users.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java:
##########
@@ -150,6 +150,15 @@ public class FlinkConnectorOptions {
                                     + "as a small value would cause frequent 
requests and increase server load. In the future, "
                                     + "once list partitions is optimized, the 
default value of this parameter can be reduced.");
 
+    public static final ConfigOption<Integer> SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE 
=
+            ConfigOptions.key("scan.split.assignment.batch-size")
+                    .intType()
+                    .defaultValue(Integer.MAX_VALUE)
+                    .withDescription(
+                            "The maximum number of Fluss source splits 
assigned to a reader in "
+                                    + "one assignment request. The value must 
be positive. By default, "
+                                    + "all pending splits for a reader are 
assigned in one request.");

Review Comment:
   The option default is `Integer.MAX_VALUE`, which preserves existing behavior 
but doesn’t actually prevent oversized RPC payloads unless users explicitly 
configure this value. If the intent is to close #3287 by default, consider 
using a safer default (or documenting prominently that users must set this 
option to avoid frame-size issues).



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -146,6 +146,8 @@ public DynamicTableSource createDynamicTableSource(Context 
context) {
                 tableOptions
                         
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
                         .toMillis();
+        int splitAssignmentBatchSize =
+                
tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE);
 

Review Comment:
   The new table option `scan.split.assignment.batch-size` is read and passed 
through without any factory-time validation. Since the enumerator enforces `> 
0` at runtime, consider validating here (or in 
`FlinkConnectorOptionsUtils.validateTableSourceOptions`) so misconfiguration 
fails during planning with a `ValidationException`, not after job submission.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -927,7 +1043,19 @@ private void assignPendingSplits(Set<Integer> 
pendingReaders) {
         // Assign pending splits to readers
         if (!incrementalAssignment.isEmpty()) {
             LOG.info("Assigning splits to readers {}", incrementalAssignment);

Review Comment:
   `incrementalAssignment` can contain very large split lists (the exact 
scenario this PR targets). Logging the full map (including all split objects) 
at INFO can generate huge log entries and add significant overhead. Prefer 
logging only counts (e.g., readers involved, total splits, batches per reader) 
or downgrade the detailed assignment to DEBUG.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to