This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4def6efb47e [FLINK-38530][python] Add support of Dynamic Kafka Source
for PyFlink (#27409)
4def6efb47e is described below
commit 4def6efb47ebc2dff46f444a3319d3c2ae02c3a6
Author: bowenli86 <[email protected]>
AuthorDate: Wed Jan 14 17:37:20 2026 -0800
[FLINK-38530][python] Add support of Dynamic Kafka Source for PyFlink
(#27409)
---
flink-python/pom.xml | 2 +-
.../pyflink/datastream/connectors/__init__.py | 12 +
.../pyflink/datastream/connectors/dynamic_kafka.py | 267 +++++++++++++
.../connectors/tests/test_dynamic_kafka.py | 413 +++++++++++++++++++++
4 files changed, 693 insertions(+), 1 deletion(-)
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 411fa96d63c..dbace7043c1 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -289,7 +289,7 @@ under the License.
<!-- Indirectly accessed in pyflink_gateway_server -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
- <version>3.0.0-1.17</version>
+ <version>4.0.1-2.0</version>
<scope>test</scope>
</dependency>
diff --git a/flink-python/pyflink/datastream/connectors/__init__.py
b/flink-python/pyflink/datastream/connectors/__init__.py
index f8a29f60978..4f2e1036b20 100644
--- a/flink-python/pyflink/datastream/connectors/__init__.py
+++ b/flink-python/pyflink/datastream/connectors/__init__.py
@@ -44,6 +44,18 @@ def _install():
setattr(connectors, 'FlinkKafkaConsumer', kafka.FlinkKafkaConsumer)
setattr(connectors, 'FlinkKafkaProducer', kafka.FlinkKafkaProducer)
setattr(connectors, 'Semantic', kafka.Semantic)
+ # dynamic kafka
+ from pyflink.datastream.connectors import dynamic_kafka
+ setattr(connectors, 'DynamicKafkaSource', dynamic_kafka.DynamicKafkaSource)
+ setattr(connectors, 'DynamicKafkaSourceBuilder',
dynamic_kafka.DynamicKafkaSourceBuilder)
+ setattr(connectors, 'KafkaMetadataService',
dynamic_kafka.KafkaMetadataService)
+ setattr(connectors, 'KafkaRecordDeserializationSchema',
+ dynamic_kafka.KafkaRecordDeserializationSchema)
+ setattr(connectors, 'KafkaStreamSetSubscriber',
dynamic_kafka.KafkaStreamSetSubscriber)
+ setattr(connectors, 'KafkaStreamSubscriber',
dynamic_kafka.KafkaStreamSubscriber)
+ setattr(connectors, 'StreamPatternSubscriber',
dynamic_kafka.StreamPatternSubscriber)
+ setattr(connectors, 'SingleClusterTopicMetadataService',
+ dynamic_kafka.SingleClusterTopicMetadataService)
# pulsar
from pyflink.datastream.connectors import pulsar
diff --git a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py
b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py
new file mode 100644
index 00000000000..7473077a08e
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py
@@ -0,0 +1,267 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Set, Union
+
+from py4j.java_gateway import JavaObject
+
+from pyflink.common import DeserializationSchema
+from pyflink.datastream.connectors import Source
+from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+ 'DynamicKafkaSource',
+ 'DynamicKafkaSourceBuilder',
+ 'KafkaMetadataService',
+ 'KafkaRecordDeserializationSchema',
+ 'KafkaStreamSetSubscriber',
+ 'KafkaStreamSubscriber',
+ 'StreamPatternSubscriber',
+ 'SingleClusterTopicMetadataService'
+]
+
+
+class KafkaMetadataService(object):
+ """
+ Base class for Kafka metadata service wrappers.
+ """
+
+ def __init__(self, j_metadata_service: JavaObject):
+ self._j_metadata_service = j_metadata_service
+
+
+class SingleClusterTopicMetadataService(KafkaMetadataService):
+ """
+ A KafkaMetadataService backed by a single Kafka cluster where stream ids
map to topics.
+ """
+
+ def __init__(self, kafka_cluster_id: str, properties: Dict[str, str]):
+ gateway = get_gateway()
+ j_properties = gateway.jvm.java.util.Properties()
+ for key, value in properties.items():
+ j_properties.setProperty(key, value)
+ j_service =
gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \
+ .SingleClusterTopicMetadataService(kafka_cluster_id, j_properties)
+ super().__init__(j_service)
+
+
+class KafkaStreamSubscriber(object):
+ """
+ Wrapper for Java KafkaStreamSubscriber implementations.
+ """
+
+ def __init__(self, j_kafka_stream_subscriber: JavaObject):
+ self._j_kafka_stream_subscriber = j_kafka_stream_subscriber
+
+
+class KafkaStreamSetSubscriber(KafkaStreamSubscriber):
+ """
+ Subscriber that consumes from a fixed set of stream ids.
+ """
+
+ def __init__(self, stream_ids: Set[str]):
+ gateway = get_gateway()
+ j_stream_ids = gateway.jvm.java.util.HashSet()
+ for stream_id in stream_ids:
+ j_stream_ids.add(stream_id)
+ j_subscriber =
gateway.jvm.org.apache.flink.connector.kafka.dynamic.source.enumerator \
+ .subscriber.KafkaStreamSetSubscriber(j_stream_ids)
+ super().__init__(j_subscriber)
+
+
+class StreamPatternSubscriber(KafkaStreamSubscriber):
+ """
+ Subscriber that consumes from stream ids matching a regex pattern.
+ """
+
+ def __init__(self, stream_pattern: str):
+ gateway = get_gateway()
+ j_pattern = gateway.jvm.java.util.regex.Pattern.compile(stream_pattern)
+ j_subscriber =
gateway.jvm.org.apache.flink.connector.kafka.dynamic.source.enumerator \
+ .subscriber.StreamPatternSubscriber(j_pattern)
+ super().__init__(j_subscriber)
+
+
+class KafkaRecordDeserializationSchema(DeserializationSchema):
+ """
+ Wrapper for KafkaRecordDeserializationSchema.
+ """
+
+ def __init__(self, j_deserialization_schema: JavaObject):
+ super().__init__(j_deserialization_schema)
+
+ @staticmethod
+ def value_only(deserialization_schema: DeserializationSchema) -> \
+ 'KafkaRecordDeserializationSchema':
+ jvm = get_gateway().jvm
+ j_deserializer =
jvm.org.apache.flink.connector.kafka.source.reader.deserializer \
+ .KafkaRecordDeserializationSchema.valueOnly(
+ deserialization_schema._j_deserialization_schema)
+ return KafkaRecordDeserializationSchema(j_deserializer)
+
+
+class DynamicKafkaSource(Source):
+ """
+ Source implementation for dynamic Kafka streams.
+
+ Example:
+ ::
+
+ >>> metadata_service = SingleClusterTopicMetadataService(
+ ... 'cluster-a', {'bootstrap.servers': 'localhost:9092'})
+ >>> source = DynamicKafkaSource.builder() \\
+ ... .set_stream_ids({'stream-a'}) \\
+ ... .set_kafka_metadata_service(metadata_service) \\
+ ... .set_value_only_deserializer(SimpleStringSchema()) \\
+ ... .build()
+ """
+
+ def __init__(self, j_dynamic_kafka_source: JavaObject):
+ super().__init__(j_dynamic_kafka_source)
+
+ @staticmethod
+ def builder() -> 'DynamicKafkaSourceBuilder':
+ return DynamicKafkaSourceBuilder()
+
+
+class DynamicKafkaSourceBuilder(object):
+ """
+ Builder for DynamicKafkaSource.
+ """
+
+ def __init__(self):
+ self._j_builder =
get_gateway().jvm.org.apache.flink.connector.kafka.dynamic.source \
+ .DynamicKafkaSource.builder()
+
+ def build(self) -> DynamicKafkaSource:
+ return DynamicKafkaSource(self._j_builder.build())
+
+ def set_stream_ids(self, stream_ids: Set[str]) ->
'DynamicKafkaSourceBuilder':
+ """
+ Set the stream ids to consume.
+ """
+ j_set = get_gateway().jvm.java.util.HashSet()
+ for stream_id in stream_ids:
+ j_set.add(stream_id)
+ self._j_builder.setStreamIds(j_set)
+ return self
+
+ def set_stream_pattern(self, stream_pattern: str) ->
'DynamicKafkaSourceBuilder':
+ """
+ Set a regex pattern to match stream ids.
+ """
+ self._j_builder.setStreamPattern(get_gateway().jvm.java.util.regex
+ .Pattern.compile(stream_pattern))
+ return self
+
+ def set_kafka_stream_subscriber(
+ self,
+ kafka_stream_subscriber: Union[KafkaStreamSubscriber, JavaObject])
\
+ -> 'DynamicKafkaSourceBuilder':
+ """
+ Set a custom KafkaStreamSubscriber.
+ """
+ if isinstance(kafka_stream_subscriber, KafkaStreamSubscriber):
+ j_subscriber = kafka_stream_subscriber._j_kafka_stream_subscriber
+ else:
+ j_subscriber = kafka_stream_subscriber
+ self._j_builder.setKafkaStreamSubscriber(j_subscriber)
+ return self
+
+ def set_kafka_metadata_service(
+ self,
+ kafka_metadata_service: Union[KafkaMetadataService, JavaObject]) \
+ -> 'DynamicKafkaSourceBuilder':
+ """
+ Set the KafkaMetadataService.
+ """
+ if isinstance(kafka_metadata_service, KafkaMetadataService):
+ j_metadata_service = kafka_metadata_service._j_metadata_service
+ else:
+ j_metadata_service = kafka_metadata_service
+ self._j_builder.setKafkaMetadataService(j_metadata_service)
+ return self
+
+ def set_deserializer(self,
+ deserializer: Union[KafkaRecordDeserializationSchema,
JavaObject]) \
+ -> 'DynamicKafkaSourceBuilder':
+ """
+ Set the KafkaRecordDeserializationSchema.
+ """
+ if isinstance(deserializer, KafkaRecordDeserializationSchema):
+ j_deserializer = deserializer._j_deserialization_schema
+ else:
+ j_deserializer = deserializer
+ self._j_builder.setDeserializer(j_deserializer)
+ return self
+
+ def set_value_only_deserializer(self, deserialization_schema:
DeserializationSchema) \
+ -> 'DynamicKafkaSourceBuilder':
+ """
+ Set a value-only DeserializationSchema.
+ """
+ return self.set_deserializer(
+
KafkaRecordDeserializationSchema.value_only(deserialization_schema))
+
+ def set_starting_offsets(self, starting_offsets_initializer:
KafkaOffsetsInitializer) \
+ -> 'DynamicKafkaSourceBuilder':
+ """
+ Set the starting offsets for all streams.
+ """
+
self._j_builder.setStartingOffsets(starting_offsets_initializer._j_initializer)
+ return self
+
+ def set_bounded(self, stopping_offsets_initializer:
KafkaOffsetsInitializer) \
+ -> 'DynamicKafkaSourceBuilder':
+ """
+ Set the source to bounded mode with stopping offsets.
+ """
+ self._j_builder.setBounded(stopping_offsets_initializer._j_initializer)
+ return self
+
+ def set_properties(self, props: Dict[str, str]) ->
'DynamicKafkaSourceBuilder':
+ """
+ Set consumer properties for all clusters.
+ """
+ gateway = get_gateway()
+ j_properties = gateway.jvm.java.util.Properties()
+ for key, value in props.items():
+ j_properties.setProperty(key, value)
+ self._j_builder.setProperties(j_properties)
+ return self
+
+ def set_property(self, key: str, value: str) ->
'DynamicKafkaSourceBuilder':
+ """
+ Set a consumer property for all clusters.
+ """
+ self._j_builder.setProperty(key, value)
+ return self
+
+ def set_group_id(self, group_id: str) -> 'DynamicKafkaSourceBuilder':
+ """
+ Set the consumer group id for all clusters.
+ """
+ self._j_builder.setGroupId(group_id)
+ return self
+
+ def set_client_id_prefix(self, prefix: str) -> 'DynamicKafkaSourceBuilder':
+ """
+ Set the client id prefix for all clusters.
+ """
+ self._j_builder.setClientIdPrefix(prefix)
+ return self
diff --git
a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py
b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py
new file mode 100644
index 00000000000..448289f2727
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py
@@ -0,0 +1,413 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import json
+from typing import Dict
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.watermark_strategy import WatermarkStrategy
+from pyflink.datastream.connectors.dynamic_kafka import DynamicKafkaSource, \
+ KafkaRecordDeserializationSchema, KafkaStreamSetSubscriber,
StreamPatternSubscriber, \
+ SingleClusterTopicMetadataService
+from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer,
KafkaOffsetResetStrategy, \
+ KafkaTopicPartition
+from pyflink.java_gateway import get_gateway
+from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
+from pyflink.util.java_utils import get_field, get_field_value, is_instance_of
+
+
+class DynamicKafkaSourceTests(PyFlinkStreamingTestCase):
+
+ def test_compiling(self):
+ source = DynamicKafkaSource.builder() \
+ .set_stream_ids({'test-stream'}) \
+ .set_kafka_metadata_service(self._build_metadata_service()) \
+ .set_value_only_deserializer(SimpleStringSchema()) \
+ .build()
+
+ ds = self.env.from_source(source=source,
+
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+ source_name='dynamic kafka source')
+ ds.print()
+ plan = json.loads(self.env.get_execution_plan())
+ self.assertEqual('Source: dynamic kafka source',
plan['nodes'][0]['type'])
+
+ def test_set_stream_ids(self):
+ source = DynamicKafkaSource.builder() \
+ .set_stream_ids({'stream-a', 'stream-b'}) \
+ .set_kafka_metadata_service(self._build_metadata_service()) \
+ .set_value_only_deserializer(SimpleStringSchema()) \
+ .build()
+ subscriber = get_field_value(source.get_java_function(),
'kafkaStreamSubscriber')
+ self.assertEqual(
+ subscriber.getClass().getCanonicalName(),
+
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber'
+ '.KafkaStreamSetSubscriber'
+ )
+ stream_ids = get_field_value(subscriber, 'streamIds')
+ self.assertTrue(is_instance_of(stream_ids,
get_gateway().jvm.java.util.Set))
+ self.assertEqual(stream_ids.size(), 2)
+ self.assertTrue('stream-a' in stream_ids)
+ self.assertTrue('stream-b' in stream_ids)
+
+ def test_set_stream_pattern(self):
+ source = DynamicKafkaSource.builder() \
+ .set_stream_pattern('stream-*') \
+ .set_kafka_metadata_service(self._build_metadata_service()) \
+ .set_value_only_deserializer(SimpleStringSchema()) \
+ .build()
+ subscriber = get_field_value(source.get_java_function(),
'kafkaStreamSubscriber')
+ self.assertEqual(
+ subscriber.getClass().getCanonicalName(),
+
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber'
+ '.StreamPatternSubscriber'
+ )
+ stream_pattern = get_field_value(subscriber, 'streamPattern')
+ self.assertTrue(is_instance_of(stream_pattern,
get_gateway().jvm.java.util.regex.Pattern))
+ self.assertEqual(stream_pattern.toString(), 'stream-*')
+
+ def test_set_stream_set_subscriber(self):
+ subscriber = KafkaStreamSetSubscriber({'stream-a', 'stream-b'})
+ source = DynamicKafkaSource.builder() \
+ .set_kafka_stream_subscriber(subscriber) \
+ .set_kafka_metadata_service(self._build_metadata_service()) \
+ .set_value_only_deserializer(SimpleStringSchema()) \
+ .build()
+ j_subscriber = get_field_value(source.get_java_function(),
'kafkaStreamSubscriber')
+ self.assertEqual(
+ j_subscriber.getClass().getCanonicalName(),
+
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber'
+ '.KafkaStreamSetSubscriber'
+ )
+ stream_ids = get_field_value(j_subscriber, 'streamIds')
+ self.assertTrue(is_instance_of(stream_ids,
get_gateway().jvm.java.util.Set))
+ self.assertEqual(stream_ids.size(), 2)
+ self.assertTrue('stream-a' in stream_ids)
+ self.assertTrue('stream-b' in stream_ids)
+
+ def test_set_stream_pattern_subscriber(self):
+ subscriber = StreamPatternSubscriber('stream-*')
+ source = DynamicKafkaSource.builder() \
+ .set_kafka_stream_subscriber(subscriber) \
+ .set_kafka_metadata_service(self._build_metadata_service()) \
+ .set_value_only_deserializer(SimpleStringSchema()) \
+ .build()
+ j_subscriber = get_field_value(source.get_java_function(),
'kafkaStreamSubscriber')
+ self.assertEqual(
+ j_subscriber.getClass().getCanonicalName(),
+
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber'
+ '.StreamPatternSubscriber'
+ )
+ stream_pattern = get_field_value(j_subscriber, 'streamPattern')
+ self.assertTrue(is_instance_of(stream_pattern,
get_gateway().jvm.java.util.regex.Pattern))
+ self.assertEqual(stream_pattern.toString(), 'stream-*')
+
+ def test_set_properties(self):
+ source = DynamicKafkaSource.builder() \
+ .set_stream_ids({'stream-a'}) \
+ .set_kafka_metadata_service(self._build_metadata_service()) \
+ .set_value_only_deserializer(SimpleStringSchema()) \
+ .set_group_id('test-group') \
+ .set_client_id_prefix('test-client-id') \
+ .set_property('test-property', 'test-value') \
+ .build()
+ properties = get_field_value(source.get_java_function(), 'properties')
+ self.assertEqual(properties.getProperty('group.id'), 'test-group')
+ self.assertEqual(properties.getProperty('client.id.prefix'),
'test-client-id')
+ self.assertEqual(properties.getProperty('test-property'), 'test-value')
+
+ def test_set_starting_offsets(self):
+ def _build_source(initializer: KafkaOffsetsInitializer):
+ return DynamicKafkaSource.builder() \
+ .set_stream_ids({'stream-a'}) \
+ .set_kafka_metadata_service(self._build_metadata_service()) \
+ .set_value_only_deserializer(SimpleStringSchema()) \
+ .set_starting_offsets(initializer) \
+ .build()
+
+ self._check_offsets_initializer(
+ _build_source(KafkaOffsetsInitializer.latest()),
+ {
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'LatestOffsetsInitializer',
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'ReaderHandledOffsetsInitializer',
+ },
+ reset_strategy=KafkaOffsetResetStrategy.LATEST,
+ offset=-1
+ )
+ self._check_offsets_initializer(
+ _build_source(KafkaOffsetsInitializer.earliest()),
+ {
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'EarliestOffsetsInitializer',
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'ReaderHandledOffsetsInitializer',
+ },
+ reset_strategy=KafkaOffsetResetStrategy.EARLIEST,
+ offset=-2
+ )
+ self._check_offsets_initializer(
+ _build_source(KafkaOffsetsInitializer.committed_offsets()),
+ {
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'CommittedOffsetsInitializer',
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'ReaderHandledOffsetsInitializer',
+ },
+ reset_strategy=KafkaOffsetResetStrategy.NONE,
+ offset=-3
+ )
+ self._check_offsets_initializer(
+ _build_source(KafkaOffsetsInitializer.committed_offsets(
+ KafkaOffsetResetStrategy.LATEST
+ )),
+ {
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'CommittedOffsetsInitializer',
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'ReaderHandledOffsetsInitializer',
+ },
+ reset_strategy=KafkaOffsetResetStrategy.LATEST,
+ offset=-3
+ )
+ self._check_timestamp_offsets_initializer(
+ _build_source(KafkaOffsetsInitializer.timestamp(100)), 100
+ )
+ specified_offsets = {
+ KafkaTopicPartition('test_topic1', 1): 1000,
+ KafkaTopicPartition('test_topic2', 2): 2000
+ }
+ self._check_specified_offsets_initializer(
+ _build_source(KafkaOffsetsInitializer.offsets(specified_offsets)),
specified_offsets,
+ KafkaOffsetResetStrategy.EARLIEST
+ )
+ self._check_specified_offsets_initializer(
+ _build_source(KafkaOffsetsInitializer.offsets(
+ specified_offsets,
+ KafkaOffsetResetStrategy.LATEST
+ )),
+ specified_offsets,
+ KafkaOffsetResetStrategy.LATEST
+ )
+
+ def test_bounded(self):
+ def _build_source(initializer: KafkaOffsetsInitializer):
+ return DynamicKafkaSource.builder() \
+ .set_stream_ids({'stream-a'}) \
+ .set_kafka_metadata_service(self._build_metadata_service()) \
+ .set_value_only_deserializer(SimpleStringSchema()) \
+ .set_bounded(initializer) \
+ .build()
+
+ def _check_bounded(source: DynamicKafkaSource):
+ self.assertEqual(
+ get_field_value(source.get_java_function(),
'boundedness').toString(), 'BOUNDED'
+ )
+
+ source = _build_source(KafkaOffsetsInitializer.latest())
+ _check_bounded(source)
+ self._check_offsets_initializer(
+ source,
+ {
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'LatestOffsetsInitializer',
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'ReaderHandledOffsetsInitializer',
+ },
+ reset_strategy=KafkaOffsetResetStrategy.LATEST,
+ offset=-1,
+ is_start=False
+ )
+ source = _build_source(KafkaOffsetsInitializer.earliest())
+ _check_bounded(source)
+ self._check_offsets_initializer(
+ source,
+ {
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'EarliestOffsetsInitializer',
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'ReaderHandledOffsetsInitializer',
+ },
+ reset_strategy=KafkaOffsetResetStrategy.EARLIEST,
+ offset=-2,
+ is_start=False
+ )
+ source = _build_source(KafkaOffsetsInitializer.committed_offsets())
+ _check_bounded(source)
+ self._check_offsets_initializer(
+ source,
+ {
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'CommittedOffsetsInitializer',
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'ReaderHandledOffsetsInitializer',
+ },
+ reset_strategy=KafkaOffsetResetStrategy.NONE,
+ offset=-3,
+ is_start=False
+ )
+ source = _build_source(KafkaOffsetsInitializer.committed_offsets(
+ KafkaOffsetResetStrategy.LATEST
+ ))
+ _check_bounded(source)
+ self._check_offsets_initializer(
+ source,
+ {
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'CommittedOffsetsInitializer',
+
'org.apache.flink.connector.kafka.source.enumerator.initializer.'
+ 'ReaderHandledOffsetsInitializer',
+ },
+ reset_strategy=KafkaOffsetResetStrategy.LATEST,
+ offset=-3,
+ is_start=False
+ )
+ source = _build_source(KafkaOffsetsInitializer.timestamp(100))
+ _check_bounded(source)
+ self._check_timestamp_offsets_initializer(source, 100, False)
+ specified_offsets = {
+ KafkaTopicPartition('test_topic1', 1): 1000,
+ KafkaTopicPartition('test_topic2', 2): 2000
+ }
+ source =
_build_source(KafkaOffsetsInitializer.offsets(specified_offsets))
+ _check_bounded(source)
+ self._check_specified_offsets_initializer(
+ source, specified_offsets, KafkaOffsetResetStrategy.EARLIEST, False
+ )
+ source = _build_source(KafkaOffsetsInitializer.offsets(
+ specified_offsets,
+ KafkaOffsetResetStrategy.LATEST)
+ )
+ _check_bounded(source)
+ self._check_specified_offsets_initializer(
+ source, specified_offsets, KafkaOffsetResetStrategy.LATEST, False
+ )
+
+ def test_set_value_only_deserializer(self):
+ source = DynamicKafkaSource.builder() \
+ .set_stream_ids({'stream-a'}) \
+ .set_kafka_metadata_service(self._build_metadata_service()) \
+ .set_value_only_deserializer(SimpleStringSchema()) \
+ .build()
+ deserialization_schema_wrapper =
get_field_value(source.get_java_function(),
+
'deserializationSchema')
+ self.assertEqual(
+ deserialization_schema_wrapper.getClass().getCanonicalName(),
+ 'org.apache.flink.connector.kafka.source.reader.deserializer'
+ '.KafkaValueOnlyDeserializationSchemaWrapper'
+ )
+ deserialization_schema =
get_field_value(deserialization_schema_wrapper,
+ 'deserializationSchema')
+ self.assertEqual(deserialization_schema.getClass().getCanonicalName(),
+
'org.apache.flink.api.common.serialization.SimpleStringSchema')
+
+ def test_set_deserializer(self):
+ record_deserializer =
KafkaRecordDeserializationSchema.value_only(SimpleStringSchema())
+ source = DynamicKafkaSource.builder() \
+ .set_stream_ids({'stream-a'}) \
+ .set_kafka_metadata_service(self._build_metadata_service()) \
+ .set_deserializer(record_deserializer) \
+ .build()
+ deserialization_schema_wrapper =
get_field_value(source.get_java_function(),
+
'deserializationSchema')
+ self.assertEqual(
+ deserialization_schema_wrapper.getClass().getCanonicalName(),
+ 'org.apache.flink.connector.kafka.source.reader.deserializer'
+ '.KafkaValueOnlyDeserializationSchemaWrapper'
+ )
+
+ @staticmethod
+ def _build_metadata_service() -> SingleClusterTopicMetadataService:
+ return SingleClusterTopicMetadataService(
+ 'test-cluster', {'bootstrap.servers': 'localhost:9092'})
+
+ def _check_offsets_initializer(self,
+ source: DynamicKafkaSource,
+ expected_class_names,
+ reset_strategy: KafkaOffsetResetStrategy =
None,
+ offset: int = None,
+ is_start: bool = True):
+ if is_start:
+ field_name = 'startingOffsetsInitializer'
+ else:
+ field_name = 'stoppingOffsetsInitializer'
+ offsets_initializer = get_field_value(source.get_java_function(),
field_name)
+ class_name = offsets_initializer.getClass().getCanonicalName()
+ self.assertIn(class_name, expected_class_names)
+
+ if offset is not None:
+ starting_offset_field = get_field(offsets_initializer.getClass(),
'startingOffset')
+ if starting_offset_field is not None:
+ starting_offset =
starting_offset_field.get(offsets_initializer)
+ self.assertEqual(starting_offset, offset)
+
+ if reset_strategy is not None:
+ offset_reset_strategy_field =
get_field(offsets_initializer.getClass(),
+ 'offsetResetStrategy')
+ if offset_reset_strategy_field is not None:
+ offset_reset_strategy =
offset_reset_strategy_field.get(offsets_initializer)
+ self.assertTrue(
+
offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
+ )
+
+ def _check_timestamp_offsets_initializer(self,
+ source: DynamicKafkaSource,
+ timestamp: int,
+ is_start: bool = True):
+ if is_start:
+ field_name = 'startingOffsetsInitializer'
+ else:
+ field_name = 'stoppingOffsetsInitializer'
+ offsets_initializer = get_field_value(source.get_java_function(),
field_name)
+ self.assertEqual(
+ offsets_initializer.getClass().getCanonicalName(),
+ 'org.apache.flink.connector.kafka.source.enumerator.initializer'
+ '.TimestampOffsetsInitializer'
+ )
+
+ starting_timestamp = get_field_value(offsets_initializer,
'startingTimestamp')
+ self.assertEqual(starting_timestamp, timestamp)
+
+ def _check_specified_offsets_initializer(self,
+ source: DynamicKafkaSource,
+ offsets:
Dict[KafkaTopicPartition, int],
+ reset_strategy:
KafkaOffsetResetStrategy,
+ is_start: bool = True):
+ if is_start:
+ field_name = 'startingOffsetsInitializer'
+ else:
+ field_name = 'stoppingOffsetsInitializer'
+ offsets_initializer = get_field_value(source.get_java_function(),
field_name)
+ self.assertEqual(
+ offsets_initializer.getClass().getCanonicalName(),
+ 'org.apache.flink.connector.kafka.source.enumerator.initializer'
+ '.SpecifiedOffsetsInitializer'
+ )
+
+ initial_offsets = get_field_value(offsets_initializer,
'initialOffsets')
+ self.assertTrue(is_instance_of(initial_offsets,
get_gateway().jvm.java.util.Map))
+ self.assertEqual(initial_offsets.size(), len(offsets))
+ for j_topic_partition in initial_offsets:
+ topic_partition = KafkaTopicPartition(j_topic_partition.topic(),
+
j_topic_partition.partition())
+ self.assertIsNotNone(offsets.get(topic_partition))
+ self.assertEqual(initial_offsets[j_topic_partition],
offsets[topic_partition])
+
+ offset_reset_strategy = get_field_value(offsets_initializer,
'offsetResetStrategy')
+ self.assertTrue(
+
offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
+ )