KenanAdel opened a new pull request, #28457: URL: https://github.com/apache/flink/pull/28457
## What is the purpose of the change This PR adds support for per-cluster starting and stopping offset initializers in PyFlink's `DynamicKafkaSource`, aligning the Python API with the underlying Java implementation introduced in FLINK-38876. ## Brief change log - Updated `ClusterMetadata` and `SingleClusterTopicMetadataService` constructors in `dynamic_kafka.py` to accept optional `starting_offsets_initializer` and `stopping_offsets_initializer`. - Forwarded the PyFlink offset initializers to the corresponding Java objects via the Py4J gateway using `_j_initializer`. - Ensured proper propagation of offset configuration from Python layer to the underlying Java metadata service. - Added comprehensive unit tests in `test_dynamic_kafka.py` to verify offset forwarding and ensure backward compatibility when offsets are not provided. ## Verifying this change This change is covered by the following newly added unit tests in `test_dynamic_kafka.py`: - `test_single_cluster_metadata_service_with_offsets` - `test_single_cluster_metadata_service_default_offsets` - `test_cluster_metadata_with_offsets` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
