toimcio commented on issue #21318:
URL: https://github.com/apache/beam/issues/21318#issuecomment-3727922356

   Kind of beating a dead horse here, but it seems that this is the best place 
to track this. We have run into this and done some more investigation:
   
   Additional findings: FlinkPipelineOptions not populated from proto in 
portable runner
   
   We investigated this issue further and can confirm that Flink-specific 
pipeline options (like checkpointing_interval and savepoint_path) are NOT being 
read by Java-side FlinkPipelineOptions when using the portable Python SDK.
   
     Key findings:
   
     1. Python side works correctly: Options are serialized to proto with the 
expected format beam:option:{option_name}:v1. Verified by inspecting 
options.to_runner_api(None) which correctly contains the options.
     2. Java side has the code: 
FlinkExecutionEnvironments.createStreamExecutionEnvironment() checks 
options.getSavepointPath() (line 159) and options.getCheckpointingInterval() 
and would use them if populated.
     3. The translation layer is broken: PipelineOptionsTranslation.fromProto() 
should convert LOWER_UNDERSCORE → LOWER_CAMEL and populate the dynamic proxy 
that implements FlinkPipelineOptions. However, calling these getters returns 
null/default at runtime even when the option is correctly in the proto.
     4. The "Discarding invalid overrides" warning is misleading: This warning 
appears when setting options, but the options ARE correctly serialized to 
proto. The bug is on the Java side where fromProto() doesn't properly translate 
them into FlinkPipelineOptions.
     5. Re: checkpointing_interval working: The earlier comment suggesting 
checkpointing_interval works might refer to Flink's cluster-level 
execution.checkpointing.interval configuration (set in flink-conf.yaml or 
FlinkDeployment spec), not the Beam pipeline option --checkpointing_interval. 
These are different mechanisms.
   
     Reproduction:
     from apache_beam.options.pipeline_options import PipelineOptions
   
     options = PipelineOptions(['--runner=FlinkRunner', 
'--savepoint_path=/path/to/savepoint'])
     # Proto serialization works - contains beam:option:savepoint_path:v1
     # But Java FlinkPipelineOptions.getSavepointPath() returns null
   
     This affects all Flink-specific options passed from Python portable SDK, 
not just checkpointing_interval.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to