Copilot commented on code in PR #3288:
URL: https://github.com/apache/fluss/pull/3288#discussion_r3212778711
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java:
##########
@@ -133,6 +134,20 @@ public FlussSourceBuilder<OUT>
setScanPartitionDiscoveryIntervalMs(
return this;
}
+ /**
+ * Sets the maximum number of splits assigned to a reader in one
assignment request.
+ *
+ * <p>If not specified, the default value from {@link
+ * FlinkConnectorOptions#SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE} is used.
+ *
+ * @param splitPerAssignmentBatchSize maximum splits per assignment request
+ * @return this builder
+ */
+ public FlussSourceBuilder<OUT> setSplitPerAssignmentBatchSize(int
splitPerAssignmentBatchSize) {
+ this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize;
+ return this;
+ }
Review Comment:
`splitPerAssignmentBatchSize` is accepted without validation in the builder.
If a user sets 0/negative here, the failure will only surface later when the
enumerator is constructed. Validate this parameter eagerly (e.g., require > 0)
to provide a clearer, earlier error to DataStream API users.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java:
##########
@@ -150,6 +150,15 @@ public class FlinkConnectorOptions {
+ "as a small value would cause frequent
requests and increase server load. In the future, "
+ "once list partitions is optimized, the
default value of this parameter can be reduced.");
+ public static final ConfigOption<Integer> SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE
=
+ ConfigOptions.key("scan.split.assignment.batch-size")
+ .intType()
+ .defaultValue(Integer.MAX_VALUE)
+ .withDescription(
+ "The maximum number of Fluss source splits
assigned to a reader in "
+ + "one assignment request. The value must
be positive. By default, "
+ + "all pending splits for a reader are
assigned in one request.");
Review Comment:
The option default is `Integer.MAX_VALUE`, which preserves existing behavior
but doesn’t actually prevent oversized RPC payloads unless users explicitly
configure this value. If the intent is to close #3287 by default, consider
using a safer default (or documenting prominently that users must set this
option to avoid frame-size issues).
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -146,6 +146,8 @@ public DynamicTableSource createDynamicTableSource(Context
context) {
tableOptions
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
.toMillis();
+ int splitAssignmentBatchSize =
+
tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE);
Review Comment:
The new table option `scan.split.assignment.batch-size` is read and passed
through without any factory-time validation. Since the enumerator enforces `>
0` at runtime, consider validating here (or in
`FlinkConnectorOptionsUtils.validateTableSourceOptions`) so misconfiguration
fails during planning with a `ValidationException`, not after job submission.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -927,7 +1043,19 @@ private void assignPendingSplits(Set<Integer>
pendingReaders) {
// Assign pending splits to readers
if (!incrementalAssignment.isEmpty()) {
LOG.info("Assigning splits to readers {}", incrementalAssignment);
Review Comment:
`incrementalAssignment` can contain very large split lists (the exact
scenario this PR targets). Logging the full map (including all split objects)
at INFO can generate huge log entries and add significant overhead. Prefer
logging only counts (e.g., readers involved, total splits, batches per reader)
or downgrade the detailed assignment to DEBUG.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]