This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch dynamick
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c21bb0d6286aa8b484725b5c7b23804ca0e20646
Author: Bowen Li <[email protected]>
AuthorDate: Tue Jan 13 13:15:50 2026 -0800

    [FLINK-38530] create Dynamic Kafka Source for pyflink
---
 .../pyflink/datastream/connectors/__init__.py      |  10 +
 .../pyflink/datastream/connectors/dynamic_kafka.py | 237 ++++++++++++++++
 .../connectors/tests/test_dynamic_kafka.py         | 306 +++++++++++++++++++++
 3 files changed, 553 insertions(+)

diff --git a/flink-python/pyflink/datastream/connectors/__init__.py 
b/flink-python/pyflink/datastream/connectors/__init__.py
index f8a29f60978..f9bd1d90258 100644
--- a/flink-python/pyflink/datastream/connectors/__init__.py
+++ b/flink-python/pyflink/datastream/connectors/__init__.py
@@ -44,6 +44,16 @@ def _install():
     setattr(connectors, 'FlinkKafkaConsumer', kafka.FlinkKafkaConsumer)
     setattr(connectors, 'FlinkKafkaProducer', kafka.FlinkKafkaProducer)
     setattr(connectors, 'Semantic', kafka.Semantic)
+    # dynamic kafka
+    from pyflink.datastream.connectors import dynamic_kafka
+    setattr(connectors, 'DynamicKafkaSource', dynamic_kafka.DynamicKafkaSource)
+    setattr(connectors, 'DynamicKafkaSourceBuilder', 
dynamic_kafka.DynamicKafkaSourceBuilder)
+    setattr(connectors, 'KafkaMetadataService', 
dynamic_kafka.KafkaMetadataService)
+    setattr(connectors, 'KafkaRecordDeserializationSchema',
+            dynamic_kafka.KafkaRecordDeserializationSchema)
+    setattr(connectors, 'KafkaStreamSubscriber', 
dynamic_kafka.KafkaStreamSubscriber)
+    setattr(connectors, 'SingleClusterTopicMetadataService',
+            dynamic_kafka.SingleClusterTopicMetadataService)
 
     # pulsar
     from pyflink.datastream.connectors import pulsar
diff --git a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py 
b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py
new file mode 100644
index 00000000000..3b75811f686
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py
@@ -0,0 +1,237 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Set, Union
+
+from py4j.java_gateway import JavaObject
+
+from pyflink.common import DeserializationSchema
+from pyflink.datastream.connectors import Source
+from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'DynamicKafkaSource',
+    'DynamicKafkaSourceBuilder',
+    'KafkaMetadataService',
+    'KafkaRecordDeserializationSchema',
+    'KafkaStreamSubscriber',
+    'SingleClusterTopicMetadataService'
+]
+
+
+class KafkaMetadataService(object):
+    """
+    Base class for Kafka metadata service wrappers.
+    """
+
+    def __init__(self, j_metadata_service: JavaObject):
+        self._j_metadata_service = j_metadata_service
+
+
+class SingleClusterTopicMetadataService(KafkaMetadataService):
+    """
+    A KafkaMetadataService backed by a single Kafka cluster where stream ids 
map to topics.
+    """
+
+    def __init__(self, kafka_cluster_id: str, properties: Dict[str, str]):
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in properties.items():
+            j_properties.setProperty(key, value)
+        j_service = 
gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \
+            .SingleClusterTopicMetadataService(kafka_cluster_id, j_properties)
+        super().__init__(j_service)
+
+
+class KafkaStreamSubscriber(object):
+    """
+    Wrapper for Java KafkaStreamSubscriber implementations.
+    """
+
+    def __init__(self, j_kafka_stream_subscriber: JavaObject):
+        self._j_kafka_stream_subscriber = j_kafka_stream_subscriber
+
+
+class KafkaRecordDeserializationSchema(DeserializationSchema):
+    """
+    Wrapper for KafkaRecordDeserializationSchema.
+    """
+
+    def __init__(self, j_deserialization_schema: JavaObject):
+        super().__init__(j_deserialization_schema)
+
+    @staticmethod
+    def value_only(deserialization_schema: DeserializationSchema) -> \
+            'KafkaRecordDeserializationSchema':
+        jvm = get_gateway().jvm
+        j_deserializer = 
jvm.org.apache.flink.connector.kafka.source.reader.deserializer \
+            .KafkaRecordDeserializationSchema.valueOnly(
+                deserialization_schema._j_deserialization_schema)
+        return KafkaRecordDeserializationSchema(j_deserializer)
+
+
+class DynamicKafkaSource(Source):
+    """
+    Source implementation for dynamic Kafka streams.
+
+    Example:
+    ::
+
+        >>> metadata_service = SingleClusterTopicMetadataService(
+        ...     'cluster-a', {'bootstrap.servers': 'localhost:9092'})
+        >>> source = DynamicKafkaSource.builder() \\
+        ...     .set_stream_ids({'stream-a'}) \\
+        ...     .set_kafka_metadata_service(metadata_service) \\
+        ...     .set_value_only_deserializer(SimpleStringSchema()) \\
+        ...     .build()
+    """
+
+    def __init__(self, j_dynamic_kafka_source: JavaObject):
+        super().__init__(j_dynamic_kafka_source)
+
+    @staticmethod
+    def builder() -> 'DynamicKafkaSourceBuilder':
+        return DynamicKafkaSourceBuilder()
+
+
+class DynamicKafkaSourceBuilder(object):
+    """
+    Builder for DynamicKafkaSource.
+    """
+
+    def __init__(self):
+        self._j_builder = 
get_gateway().jvm.org.apache.flink.connector.kafka.dynamic.source \
+            .DynamicKafkaSource.builder()
+
+    def build(self) -> DynamicKafkaSource:
+        return DynamicKafkaSource(self._j_builder.build())
+
+    def set_stream_ids(self, stream_ids: Set[str]) -> 
'DynamicKafkaSourceBuilder':
+        """
+        Set the stream ids to consume.
+        """
+        j_set = get_gateway().jvm.java.util.HashSet()
+        for stream_id in stream_ids:
+            j_set.add(stream_id)
+        self._j_builder.setStreamIds(j_set)
+        return self
+
+    def set_stream_pattern(self, stream_pattern: str) -> 
'DynamicKafkaSourceBuilder':
+        """
+        Set a regex pattern to match stream ids.
+        """
+        self._j_builder.setStreamPattern(get_gateway().jvm.java.util.regex
+                                         .Pattern.compile(stream_pattern))
+        return self
+
+    def set_kafka_stream_subscriber(
+            self,
+            kafka_stream_subscriber: Union[KafkaStreamSubscriber, JavaObject]) 
\
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set a custom KafkaStreamSubscriber.
+        """
+        if isinstance(kafka_stream_subscriber, KafkaStreamSubscriber):
+            j_subscriber = kafka_stream_subscriber._j_kafka_stream_subscriber
+        else:
+            j_subscriber = kafka_stream_subscriber
+        self._j_builder.setKafkaStreamSubscriber(j_subscriber)
+        return self
+
+    def set_kafka_metadata_service(
+            self,
+            kafka_metadata_service: Union[KafkaMetadataService, JavaObject]) \
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the KafkaMetadataService.
+        """
+        if isinstance(kafka_metadata_service, KafkaMetadataService):
+            j_metadata_service = kafka_metadata_service._j_metadata_service
+        else:
+            j_metadata_service = kafka_metadata_service
+        self._j_builder.setKafkaMetadataService(j_metadata_service)
+        return self
+
+    def set_deserializer(self,
+                         deserializer: Union[KafkaRecordDeserializationSchema, 
JavaObject]) \
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the KafkaRecordDeserializationSchema.
+        """
+        if isinstance(deserializer, KafkaRecordDeserializationSchema):
+            j_deserializer = deserializer._j_deserialization_schema
+        else:
+            j_deserializer = deserializer
+        self._j_builder.setDeserializer(j_deserializer)
+        return self
+
+    def set_value_only_deserializer(self, deserialization_schema: 
DeserializationSchema) \
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set a value-only DeserializationSchema.
+        """
+        return self.set_deserializer(
+            
KafkaRecordDeserializationSchema.value_only(deserialization_schema))
+
+    def set_starting_offsets(self, starting_offsets_initializer: 
KafkaOffsetsInitializer) \
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the starting offsets for all streams.
+        """
+        
self._j_builder.setStartingOffsets(starting_offsets_initializer._j_initializer)
+        return self
+
+    def set_bounded(self, stopping_offsets_initializer: 
KafkaOffsetsInitializer) \
+            -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the source to bounded mode with stopping offsets.
+        """
+        self._j_builder.setBounded(stopping_offsets_initializer._j_initializer)
+        return self
+
+    def set_properties(self, props: Dict[str, str]) -> 
'DynamicKafkaSourceBuilder':
+        """
+        Set consumer properties for all clusters.
+        """
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in props.items():
+            j_properties.setProperty(key, value)
+        self._j_builder.setProperties(j_properties)
+        return self
+
+    def set_property(self, key: str, value: str) -> 
'DynamicKafkaSourceBuilder':
+        """
+        Set a consumer property for all clusters.
+        """
+        self._j_builder.setProperty(key, value)
+        return self
+
+    def set_group_id(self, group_id: str) -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the consumer group id for all clusters.
+        """
+        self._j_builder.setGroupId(group_id)
+        return self
+
+    def set_client_id_prefix(self, prefix: str) -> 'DynamicKafkaSourceBuilder':
+        """
+        Set the client id prefix for all clusters.
+        """
+        self._j_builder.setClientIdPrefix(prefix)
+        return self
diff --git 
a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py 
b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py
new file mode 100644
index 00000000000..e3812558487
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py
@@ -0,0 +1,306 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import json
+from typing import Dict
+
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.watermark_strategy import WatermarkStrategy
+from pyflink.datastream.connectors.dynamic_kafka import DynamicKafkaSource, \
+    KafkaRecordDeserializationSchema, SingleClusterTopicMetadataService
+from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, 
KafkaOffsetResetStrategy, \
+    KafkaTopicPartition
+from pyflink.java_gateway import get_gateway
+from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
+from pyflink.util.java_utils import get_field_value, is_instance_of
+
+
+class DynamicKafkaSourceTests(PyFlinkStreamingTestCase):
+
+    def test_compiling(self):
+        source = DynamicKafkaSource.builder() \
+            .set_stream_ids({'test-stream'}) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+
+        ds = self.env.from_source(source=source,
+                                  
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+                                  source_name='dynamic kafka source')
+        ds.print()
+        plan = json.loads(self.env.get_execution_plan())
+        self.assertEqual('Source: dynamic kafka source', 
plan['nodes'][0]['type'])
+
+    def test_set_stream_ids(self):
+        source = DynamicKafkaSource.builder() \
+            .set_stream_ids({'stream-a', 'stream-b'}) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        subscriber = get_field_value(source.get_java_function(), 
'kafkaStreamSubscriber')
+        self.assertEqual(
+            subscriber.getClass().getCanonicalName(),
+            
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber'
+            '.KafkaStreamSetSubscriber'
+        )
+        stream_ids = get_field_value(subscriber, 'streamIds')
+        self.assertTrue(is_instance_of(stream_ids, 
get_gateway().jvm.java.util.Set))
+        self.assertEqual(stream_ids.size(), 2)
+        self.assertTrue('stream-a' in stream_ids)
+        self.assertTrue('stream-b' in stream_ids)
+
+    def test_set_stream_pattern(self):
+        source = DynamicKafkaSource.builder() \
+            .set_stream_pattern('stream-*') \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        subscriber = get_field_value(source.get_java_function(), 
'kafkaStreamSubscriber')
+        self.assertEqual(
+            subscriber.getClass().getCanonicalName(),
+            
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber'
+            '.StreamPatternSubscriber'
+        )
+        stream_pattern = get_field_value(subscriber, 'streamPattern')
+        self.assertTrue(is_instance_of(stream_pattern, 
get_gateway().jvm.java.util.regex.Pattern))
+        self.assertEqual(stream_pattern.toString(), 'stream-*')
+
+    def test_set_properties(self):
+        source = DynamicKafkaSource.builder() \
+            .set_stream_ids({'stream-a'}) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .set_group_id('test-group') \
+            .set_client_id_prefix('test-client-id') \
+            .set_property('test-property', 'test-value') \
+            .build()
+        properties = get_field_value(source.get_java_function(), 'properties')
+        self.assertEqual(properties.getProperty('group.id'), 'test-group')
+        self.assertEqual(properties.getProperty('client.id.prefix'), 
'test-client-id')
+        self.assertEqual(properties.getProperty('test-property'), 'test-value')
+
+    def test_set_starting_offsets(self):
+        def _build_source(initializer: KafkaOffsetsInitializer):
+            return DynamicKafkaSource.builder() \
+                .set_stream_ids({'stream-a'}) \
+                .set_kafka_metadata_service(self._build_metadata_service()) \
+                .set_value_only_deserializer(SimpleStringSchema()) \
+                .set_starting_offsets(initializer) \
+                .build()
+
+        self._check_reader_handled_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.latest()), -1, 
KafkaOffsetResetStrategy.LATEST
+        )
+        self._check_reader_handled_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.earliest()), -2,
+            KafkaOffsetResetStrategy.EARLIEST
+        )
+        self._check_reader_handled_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.committed_offsets()), -3,
+            KafkaOffsetResetStrategy.NONE
+        )
+        self._check_reader_handled_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.committed_offsets(
+                KafkaOffsetResetStrategy.LATEST
+            )), -3, KafkaOffsetResetStrategy.LATEST
+        )
+        self._check_timestamp_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.timestamp(100)), 100
+        )
+        specified_offsets = {
+            KafkaTopicPartition('test_topic1', 1): 1000,
+            KafkaTopicPartition('test_topic2', 2): 2000
+        }
+        self._check_specified_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.offsets(specified_offsets)), 
specified_offsets,
+            KafkaOffsetResetStrategy.EARLIEST
+        )
+        self._check_specified_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.offsets(
+                specified_offsets,
+                KafkaOffsetResetStrategy.LATEST
+            )),
+            specified_offsets,
+            KafkaOffsetResetStrategy.LATEST
+        )
+
+    def test_bounded(self):
+        def _build_source(initializer: KafkaOffsetsInitializer):
+            return DynamicKafkaSource.builder() \
+                .set_stream_ids({'stream-a'}) \
+                .set_kafka_metadata_service(self._build_metadata_service()) \
+                .set_value_only_deserializer(SimpleStringSchema()) \
+                .set_bounded(initializer) \
+                .build()
+
+        def _check_bounded(source: DynamicKafkaSource):
+            self.assertEqual(
+                get_field_value(source.get_java_function(), 
'boundedness').toString(), 'BOUNDED'
+            )
+
+        source = _build_source(KafkaOffsetsInitializer.latest())
+        _check_bounded(source)
+        self._check_reader_handled_offsets_initializer(
+            source, -1, KafkaOffsetResetStrategy.LATEST, False
+        )
+        source = _build_source(KafkaOffsetsInitializer.earliest())
+        _check_bounded(source)
+        self._check_reader_handled_offsets_initializer(
+            source, -2, KafkaOffsetResetStrategy.EARLIEST, False
+        )
+        source = _build_source(KafkaOffsetsInitializer.committed_offsets())
+        _check_bounded(source)
+        self._check_reader_handled_offsets_initializer(
+            source, -3, KafkaOffsetResetStrategy.NONE, False
+        )
+        source = _build_source(KafkaOffsetsInitializer.committed_offsets(
+            KafkaOffsetResetStrategy.LATEST
+        ))
+        _check_bounded(source)
+        self._check_reader_handled_offsets_initializer(
+            source, -3, KafkaOffsetResetStrategy.LATEST, False
+        )
+        source = _build_source(KafkaOffsetsInitializer.timestamp(100))
+        _check_bounded(source)
+        self._check_timestamp_offsets_initializer(source, 100, False)
+        specified_offsets = {
+            KafkaTopicPartition('test_topic1', 1): 1000,
+            KafkaTopicPartition('test_topic2', 2): 2000
+        }
+        source = 
_build_source(KafkaOffsetsInitializer.offsets(specified_offsets))
+        _check_bounded(source)
+        self._check_specified_offsets_initializer(
+            source, specified_offsets, KafkaOffsetResetStrategy.EARLIEST, False
+        )
+        source = _build_source(KafkaOffsetsInitializer.offsets(
+            specified_offsets,
+            KafkaOffsetResetStrategy.LATEST)
+        )
+        _check_bounded(source)
+        self._check_specified_offsets_initializer(
+            source, specified_offsets, KafkaOffsetResetStrategy.LATEST, False
+        )
+
+    def test_set_value_only_deserializer(self):
+        source = DynamicKafkaSource.builder() \
+            .set_stream_ids({'stream-a'}) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        deserialization_schema_wrapper = 
get_field_value(source.get_java_function(),
+                                                         
'deserializationSchema')
+        self.assertEqual(
+            deserialization_schema_wrapper.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.reader.deserializer'
+            '.KafkaValueOnlyDeserializationSchemaWrapper'
+        )
+        deserialization_schema = 
get_field_value(deserialization_schema_wrapper,
+                                                 'deserializationSchema')
+        self.assertEqual(deserialization_schema.getClass().getCanonicalName(),
+                         
'org.apache.flink.api.common.serialization.SimpleStringSchema')
+
+    def test_set_deserializer(self):
+        record_deserializer = 
KafkaRecordDeserializationSchema.value_only(SimpleStringSchema())
+        source = DynamicKafkaSource.builder() \
+            .set_stream_ids({'stream-a'}) \
+            .set_kafka_metadata_service(self._build_metadata_service()) \
+            .set_deserializer(record_deserializer) \
+            .build()
+        deserialization_schema_wrapper = 
get_field_value(source.get_java_function(),
+                                                         
'deserializationSchema')
+        self.assertEqual(
+            deserialization_schema_wrapper.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.reader.deserializer'
+            '.KafkaValueOnlyDeserializationSchemaWrapper'
+        )
+
+    @staticmethod
+    def _build_metadata_service() -> SingleClusterTopicMetadataService:
+        return SingleClusterTopicMetadataService(
+            'test-cluster', {'bootstrap.servers': 'localhost:9092'})
+
+    def _check_reader_handled_offsets_initializer(self,
+                                                  source: DynamicKafkaSource,
+                                                  offset: int,
+                                                  reset_strategy: 
KafkaOffsetResetStrategy,
+                                                  is_start: bool = True):
+        if is_start:
+            field_name = 'startingOffsetsInitializer'
+        else:
+            field_name = 'stoppingOffsetsInitializer'
+        offsets_initializer = get_field_value(source.get_java_function(), 
field_name)
+        self.assertEqual(
+            offsets_initializer.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.enumerator.initializer'
+            '.ReaderHandledOffsetsInitializer'
+        )
+
+        starting_offset = get_field_value(offsets_initializer, 
'startingOffset')
+        self.assertEqual(starting_offset, offset)
+
+        offset_reset_strategy = get_field_value(offsets_initializer, 
'offsetResetStrategy')
+        self.assertTrue(
+            
offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
+        )
+
+    def _check_timestamp_offsets_initializer(self,
+                                             source: DynamicKafkaSource,
+                                             timestamp: int,
+                                             is_start: bool = True):
+        if is_start:
+            field_name = 'startingOffsetsInitializer'
+        else:
+            field_name = 'stoppingOffsetsInitializer'
+        offsets_initializer = get_field_value(source.get_java_function(), 
field_name)
+        self.assertEqual(
+            offsets_initializer.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.enumerator.initializer'
+            '.TimestampOffsetsInitializer'
+        )
+
+        starting_timestamp = get_field_value(offsets_initializer, 
'startingTimestamp')
+        self.assertEqual(starting_timestamp, timestamp)
+
+    def _check_specified_offsets_initializer(self,
+                                             source: DynamicKafkaSource,
+                                             offsets: 
Dict[KafkaTopicPartition, int],
+                                             reset_strategy: 
KafkaOffsetResetStrategy,
+                                             is_start: bool = True):
+        if is_start:
+            field_name = 'startingOffsetsInitializer'
+        else:
+            field_name = 'stoppingOffsetsInitializer'
+        offsets_initializer = get_field_value(source.get_java_function(), 
field_name)
+        self.assertEqual(
+            offsets_initializer.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.enumerator.initializer'
+            '.SpecifiedOffsetsInitializer'
+        )
+
+        initial_offsets = get_field_value(offsets_initializer, 
'initialOffsets')
+        self.assertTrue(is_instance_of(initial_offsets, 
get_gateway().jvm.java.util.Map))
+        self.assertEqual(initial_offsets.size(), len(offsets))
+        for j_topic_partition in initial_offsets:
+            topic_partition = KafkaTopicPartition(j_topic_partition.topic(),
+                                                  
j_topic_partition.partition())
+            self.assertIsNotNone(offsets.get(topic_partition))
+            self.assertEqual(initial_offsets[j_topic_partition], 
offsets[topic_partition])
+
+        offset_reset_strategy = get_field_value(offsets_initializer, 
'offsetResetStrategy')
+        self.assertTrue(
+            
offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
+        )

Reply via email to