toimcio commented on issue #21318:
URL: https://github.com/apache/beam/issues/21318#issuecomment-3727922356
Kind of beating a dead horse here, but it seems that this is the best place
to track this. We have run into this and done some more investigation:
Additional findings: FlinkPipelineOptions not populated from proto in
portable runner
We investigated this issue further and can confirm that Flink-specific
pipeline options (like checkpointing_interval and savepoint_path) are NOT being
read by Java-side FlinkPipelineOptions when using the portable Python SDK.
Key findings:
1. Python side works correctly: Options are serialized to proto with the
expected format beam:option:{option_name}:v1. Verified by inspecting
options.to_runner_api(None) which correctly contains the options.
2. Java side has the code:
FlinkExecutionEnvironments.createStreamExecutionEnvironment() checks
options.getSavepointPath() (line 159) and options.getCheckpointingInterval()
and would use them if populated.
3. The translation layer is broken: PipelineOptionsTranslation.fromProto()
should convert LOWER_UNDERSCORE → LOWER_CAMEL and populate the dynamic proxy
that implements FlinkPipelineOptions. However, calling these getters returns
null/default at runtime even when the option is correctly in the proto.
4. The "Discarding invalid overrides" warning is misleading: This warning
appears when setting options, but the options ARE correctly serialized to
proto. The bug is on the Java side where fromProto() doesn't properly translate
them into FlinkPipelineOptions.
5. Re: checkpointing_interval working: The earlier comment suggesting
checkpointing_interval works might refer to Flink's cluster-level
execution.checkpointing.interval configuration (set in flink-conf.yaml or
FlinkDeployment spec), not the Beam pipeline option --checkpointing_interval.
These are different mechanisms.
Reproduction:
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions(['--runner=FlinkRunner',
'--savepoint_path=/path/to/savepoint'])
# Proto serialization works - contains beam:option:savepoint_path:v1
# But Java FlinkPipelineOptions.getSavepointPath() returns null
This affects all Flink-specific options passed from Python portable SDK,
not just checkpointing_interval.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]