hackergin commented on PR #7914:
URL: https://github.com/apache/paimon/pull/7914#issuecomment-4504498933

     @ArnavBalyan  Thanks for picking this up! The direction is right, but I 
think there are a few gaps worth addressing before merging:
   
     1. Only handles explicitly configured scan.parallelism, misses the 
inferred path
   
     The PR only propagates options.get(SCAN_PARALLELISM). However, Paimon's 
parallelism resolution has two sources — see 
FlinkTableSource#inferSourceParallelism:
   
     1. User-configured scan.parallelism (explicit)
     2. infer-scan.parallelism = true (enabled by default), which infers from 
bucket count / split count
   
     This PR only covers case (1). For case (2) — which is the default behavior 
most users hit — the inferred value is still applied via 
DataStream#setParallelism() inside the
     producer lambda, so ParallelismProvider#getParallelism() returns 
Optional.empty() and the planner-side isParallelismConfigured=true flag is 
never set. The forward-edge
     propagation described in #7905 will still happen for any user who relies 
on inferred parallelism.
     
     2. Parallelism is now set twice — provider + producer
   
     After this PR, parallelism is applied in two places:
     - The planner reads ParallelismProvider#getParallelism() and calls 
setParallelism(p, true) on the Source Transformation
     - The producer lambda still calls 
sourceBuilder.sourceParallelism(inferSourceParallelism(env)) / 
dataStreamSource.setParallelism(parallelism) internally
     
     The DataStream#setParallelism(int) overload sets 
isParallelismConfigured=false, while the planner path sets it to true. 
Depending on execution order, the planner's
     "configured" flag may be overridden back to false, which would silently 
re-introduce the forward-edge propagation this PR is trying to fix.
   
     3. Test placement and coverage
     
     - testScanProviderGetParallelism is placed in LineageUtilsTest, but it has 
nothing to do with lineage.
     - The test only asserts Optional.ofNullable behavior on the field. It 
doesn't actually verify the bug being fixed — there's no assertion that 
downstream operators no longer
     inherit the source parallelism, and no assertion on Source 
Transformation#isParallelismConfigured(). An end-to-end SQL test would be much 
more convincing, and the inferred
     path from (1) should also be covered.
   
     4. Flink version note
   
     ParallelismProvider#getParallelism() only fully solves the forward-edge 
issue from Flink 1.19+ (FLIP-367), where the planner explicitly calls 
setParallelism(p, true) on the
     Source Transformation. On older Flink versions the behavior is unchanged. 
Worth mentioning this in the PR description / a Javadoc note so users know what 
to expect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to