KenanAdel opened a new pull request, #28458:
URL: https://github.com/apache/flink/pull/28458

   ## What is the purpose of the change
   
   This PR adds support for per-cluster starting and stopping offset 
initializers in PyFlink's `DynamicKafkaSource`, aligning the Python API with 
the underlying Java implementation introduced in FLINK-38876.
   
   ## Brief change log
   
   - Updated `ClusterMetadata` and `SingleClusterTopicMetadataService` 
constructors in `dynamic_kafka.py` to accept optional 
`starting_offsets_initializer` and `stopping_offsets_initializer`.
   - Forwarded the PyFlink offset initializers to the corresponding Java 
objects via the Py4J gateway using `_j_initializer`.
   - Ensured proper propagation of offset configuration from Python layer to 
the underlying Java metadata service.
   - Added comprehensive unit tests in `test_dynamic_kafka.py` to verify offset 
forwarding and ensure backward compatibility when offsets are not provided.
   
   ## Verifying this change
   
   This change is covered by the following newly added unit tests in 
`test_dynamic_kafka.py`:
   - `test_single_cluster_metadata_service_with_offsets`
   - `test_single_cluster_metadata_service_default_offsets`
   - `test_cluster_metadata_with_offsets`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to