hackergin commented on PR #7914:
URL: https://github.com/apache/paimon/pull/7914#issuecomment-4504498933
@ArnavBalyan Thanks for picking this up! The direction is right, but I
think there are a few gaps worth addressing before merging:
1. Only handles explicitly configured scan.parallelism, misses the
inferred path
The PR only propagates options.get(SCAN_PARALLELISM). However, Paimon's
parallelism resolution has two sources — see
FlinkTableSource#inferSourceParallelism:
1. User-configured scan.parallelism (explicit)
2. infer-scan.parallelism = true (enabled by default), which infers from
bucket count / split count
This PR only covers case (1). For case (2) — which is the default behavior
most users hit — the inferred value is still applied via
DataStream#setParallelism() inside the
producer lambda, so ParallelismProvider#getParallelism() returns
Optional.empty() and the planner-side isParallelismConfigured=true flag is
never set. The forward-edge
propagation described in #7905 will still happen for any user who relies
on inferred parallelism.
2. Parallelism is now set twice — provider + producer
After this PR, parallelism is applied in two places:
- The planner reads ParallelismProvider#getParallelism() and calls
setParallelism(p, true) on the Source Transformation
- The producer lambda still calls
sourceBuilder.sourceParallelism(inferSourceParallelism(env)) /
dataStreamSource.setParallelism(parallelism) internally
The DataStream#setParallelism(int) overload sets
isParallelismConfigured=false, while the planner path sets it to true.
Depending on execution order, the planner's
"configured" flag may be overridden back to false, which would silently
re-introduce the forward-edge propagation this PR is trying to fix.
3. Test placement and coverage
- testScanProviderGetParallelism is placed in LineageUtilsTest, but it has
nothing to do with lineage.
- The test only asserts Optional.ofNullable behavior on the field. It
doesn't actually verify the bug being fixed — there's no assertion that
downstream operators no longer
inherit the source parallelism, and no assertion on Source
Transformation#isParallelismConfigured(). An end-to-end SQL test would be much
more convincing, and the inferred
path from (1) should also be covered.
4. Flink version note
ParallelismProvider#getParallelism() only fully solves the forward-edge
issue from Flink 1.19+ (FLIP-367), where the planner explicitly calls
setParallelism(p, true) on the
Source Transformation. On older Flink versions the behavior is unchanged.
Worth mentioning this in the PR description / a Javadoc note so users know what
to expect.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]