This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch dynamick in repository https://gitbox.apache.org/repos/asf/flink.git
commit c21bb0d6286aa8b484725b5c7b23804ca0e20646 Author: Bowen Li <[email protected]> AuthorDate: Tue Jan 13 13:15:50 2026 -0800 [FLINK-38530] create Dynamic Kafka Source for pyflink --- .../pyflink/datastream/connectors/__init__.py | 10 + .../pyflink/datastream/connectors/dynamic_kafka.py | 237 ++++++++++++++++ .../connectors/tests/test_dynamic_kafka.py | 306 +++++++++++++++++++++ 3 files changed, 553 insertions(+) diff --git a/flink-python/pyflink/datastream/connectors/__init__.py b/flink-python/pyflink/datastream/connectors/__init__.py index f8a29f60978..f9bd1d90258 100644 --- a/flink-python/pyflink/datastream/connectors/__init__.py +++ b/flink-python/pyflink/datastream/connectors/__init__.py @@ -44,6 +44,16 @@ def _install(): setattr(connectors, 'FlinkKafkaConsumer', kafka.FlinkKafkaConsumer) setattr(connectors, 'FlinkKafkaProducer', kafka.FlinkKafkaProducer) setattr(connectors, 'Semantic', kafka.Semantic) + # dynamic kafka + from pyflink.datastream.connectors import dynamic_kafka + setattr(connectors, 'DynamicKafkaSource', dynamic_kafka.DynamicKafkaSource) + setattr(connectors, 'DynamicKafkaSourceBuilder', dynamic_kafka.DynamicKafkaSourceBuilder) + setattr(connectors, 'KafkaMetadataService', dynamic_kafka.KafkaMetadataService) + setattr(connectors, 'KafkaRecordDeserializationSchema', + dynamic_kafka.KafkaRecordDeserializationSchema) + setattr(connectors, 'KafkaStreamSubscriber', dynamic_kafka.KafkaStreamSubscriber) + setattr(connectors, 'SingleClusterTopicMetadataService', + dynamic_kafka.SingleClusterTopicMetadataService) # pulsar from pyflink.datastream.connectors import pulsar diff --git a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py new file mode 100644 index 00000000000..3b75811f686 --- /dev/null +++ b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py @@ -0,0 +1,237 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from typing import Dict, Set, Union + +from py4j.java_gateway import JavaObject + +from pyflink.common import DeserializationSchema +from pyflink.datastream.connectors import Source +from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer +from pyflink.java_gateway import get_gateway + +__all__ = [ + 'DynamicKafkaSource', + 'DynamicKafkaSourceBuilder', + 'KafkaMetadataService', + 'KafkaRecordDeserializationSchema', + 'KafkaStreamSubscriber', + 'SingleClusterTopicMetadataService' +] + + +class KafkaMetadataService(object): + """ + Base class for Kafka metadata service wrappers. + """ + + def __init__(self, j_metadata_service: JavaObject): + self._j_metadata_service = j_metadata_service + + +class SingleClusterTopicMetadataService(KafkaMetadataService): + """ + A KafkaMetadataService backed by a single Kafka cluster where stream ids map to topics. + """ + + def __init__(self, kafka_cluster_id: str, properties: Dict[str, str]): + gateway = get_gateway() + j_properties = gateway.jvm.java.util.Properties() + for key, value in properties.items(): + j_properties.setProperty(key, value) + j_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \ + .SingleClusterTopicMetadataService(kafka_cluster_id, j_properties) + super().__init__(j_service) + + +class KafkaStreamSubscriber(object): + """ + Wrapper for Java KafkaStreamSubscriber implementations. + """ + + def __init__(self, j_kafka_stream_subscriber: JavaObject): + self._j_kafka_stream_subscriber = j_kafka_stream_subscriber + + +class KafkaRecordDeserializationSchema(DeserializationSchema): + """ + Wrapper for KafkaRecordDeserializationSchema. + """ + + def __init__(self, j_deserialization_schema: JavaObject): + super().__init__(j_deserialization_schema) + + @staticmethod + def value_only(deserialization_schema: DeserializationSchema) -> \ + 'KafkaRecordDeserializationSchema': + jvm = get_gateway().jvm + j_deserializer = jvm.org.apache.flink.connector.kafka.source.reader.deserializer \ + .KafkaRecordDeserializationSchema.valueOnly( + deserialization_schema._j_deserialization_schema) + return KafkaRecordDeserializationSchema(j_deserializer) + + +class DynamicKafkaSource(Source): + """ + Source implementation for dynamic Kafka streams. + + Example: + :: + + >>> metadata_service = SingleClusterTopicMetadataService( + ... 'cluster-a', {'bootstrap.servers': 'localhost:9092'}) + >>> source = DynamicKafkaSource.builder() \\ + ... .set_stream_ids({'stream-a'}) \\ + ... .set_kafka_metadata_service(metadata_service) \\ + ... .set_value_only_deserializer(SimpleStringSchema()) \\ + ... .build() + """ + + def __init__(self, j_dynamic_kafka_source: JavaObject): + super().__init__(j_dynamic_kafka_source) + + @staticmethod + def builder() -> 'DynamicKafkaSourceBuilder': + return DynamicKafkaSourceBuilder() + + +class DynamicKafkaSourceBuilder(object): + """ + Builder for DynamicKafkaSource. + """ + + def __init__(self): + self._j_builder = get_gateway().jvm.org.apache.flink.connector.kafka.dynamic.source \ + .DynamicKafkaSource.builder() + + def build(self) -> DynamicKafkaSource: + return DynamicKafkaSource(self._j_builder.build()) + + def set_stream_ids(self, stream_ids: Set[str]) -> 'DynamicKafkaSourceBuilder': + """ + Set the stream ids to consume. + """ + j_set = get_gateway().jvm.java.util.HashSet() + for stream_id in stream_ids: + j_set.add(stream_id) + self._j_builder.setStreamIds(j_set) + return self + + def set_stream_pattern(self, stream_pattern: str) -> 'DynamicKafkaSourceBuilder': + """ + Set a regex pattern to match stream ids. + """ + self._j_builder.setStreamPattern(get_gateway().jvm.java.util.regex + .Pattern.compile(stream_pattern)) + return self + + def set_kafka_stream_subscriber( + self, + kafka_stream_subscriber: Union[KafkaStreamSubscriber, JavaObject]) \ + -> 'DynamicKafkaSourceBuilder': + """ + Set a custom KafkaStreamSubscriber. + """ + if isinstance(kafka_stream_subscriber, KafkaStreamSubscriber): + j_subscriber = kafka_stream_subscriber._j_kafka_stream_subscriber + else: + j_subscriber = kafka_stream_subscriber + self._j_builder.setKafkaStreamSubscriber(j_subscriber) + return self + + def set_kafka_metadata_service( + self, + kafka_metadata_service: Union[KafkaMetadataService, JavaObject]) \ + -> 'DynamicKafkaSourceBuilder': + """ + Set the KafkaMetadataService. + """ + if isinstance(kafka_metadata_service, KafkaMetadataService): + j_metadata_service = kafka_metadata_service._j_metadata_service + else: + j_metadata_service = kafka_metadata_service + self._j_builder.setKafkaMetadataService(j_metadata_service) + return self + + def set_deserializer(self, + deserializer: Union[KafkaRecordDeserializationSchema, JavaObject]) \ + -> 'DynamicKafkaSourceBuilder': + """ + Set the KafkaRecordDeserializationSchema. + """ + if isinstance(deserializer, KafkaRecordDeserializationSchema): + j_deserializer = deserializer._j_deserialization_schema + else: + j_deserializer = deserializer + self._j_builder.setDeserializer(j_deserializer) + return self + + def set_value_only_deserializer(self, deserialization_schema: DeserializationSchema) \ + -> 'DynamicKafkaSourceBuilder': + """ + Set a value-only DeserializationSchema. + """ + return self.set_deserializer( + KafkaRecordDeserializationSchema.value_only(deserialization_schema)) + + def set_starting_offsets(self, starting_offsets_initializer: KafkaOffsetsInitializer) \ + -> 'DynamicKafkaSourceBuilder': + """ + Set the starting offsets for all streams. + """ + self._j_builder.setStartingOffsets(starting_offsets_initializer._j_initializer) + return self + + def set_bounded(self, stopping_offsets_initializer: KafkaOffsetsInitializer) \ + -> 'DynamicKafkaSourceBuilder': + """ + Set the source to bounded mode with stopping offsets. + """ + self._j_builder.setBounded(stopping_offsets_initializer._j_initializer) + return self + + def set_properties(self, props: Dict[str, str]) -> 'DynamicKafkaSourceBuilder': + """ + Set consumer properties for all clusters. + """ + gateway = get_gateway() + j_properties = gateway.jvm.java.util.Properties() + for key, value in props.items(): + j_properties.setProperty(key, value) + self._j_builder.setProperties(j_properties) + return self + + def set_property(self, key: str, value: str) -> 'DynamicKafkaSourceBuilder': + """ + Set a consumer property for all clusters. + """ + self._j_builder.setProperty(key, value) + return self + + def set_group_id(self, group_id: str) -> 'DynamicKafkaSourceBuilder': + """ + Set the consumer group id for all clusters. + """ + self._j_builder.setGroupId(group_id) + return self + + def set_client_id_prefix(self, prefix: str) -> 'DynamicKafkaSourceBuilder': + """ + Set the client id prefix for all clusters. + """ + self._j_builder.setClientIdPrefix(prefix) + return self diff --git a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py new file mode 100644 index 00000000000..e3812558487 --- /dev/null +++ b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py @@ -0,0 +1,306 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import json +from typing import Dict + +from pyflink.common.serialization import SimpleStringSchema +from pyflink.common.watermark_strategy import WatermarkStrategy +from pyflink.datastream.connectors.dynamic_kafka import DynamicKafkaSource, \ + KafkaRecordDeserializationSchema, SingleClusterTopicMetadataService +from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaOffsetResetStrategy, \ + KafkaTopicPartition +from pyflink.java_gateway import get_gateway +from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase +from pyflink.util.java_utils import get_field_value, is_instance_of + + +class DynamicKafkaSourceTests(PyFlinkStreamingTestCase): + + def test_compiling(self): + source = DynamicKafkaSource.builder() \ + .set_stream_ids({'test-stream'}) \ + .set_kafka_metadata_service(self._build_metadata_service()) \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .build() + + ds = self.env.from_source(source=source, + watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), + source_name='dynamic kafka source') + ds.print() + plan = json.loads(self.env.get_execution_plan()) + self.assertEqual('Source: dynamic kafka source', plan['nodes'][0]['type']) + + def test_set_stream_ids(self): + source = DynamicKafkaSource.builder() \ + .set_stream_ids({'stream-a', 'stream-b'}) \ + .set_kafka_metadata_service(self._build_metadata_service()) \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .build() + subscriber = get_field_value(source.get_java_function(), 'kafkaStreamSubscriber') + self.assertEqual( + subscriber.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber' + '.KafkaStreamSetSubscriber' + ) + stream_ids = get_field_value(subscriber, 'streamIds') + self.assertTrue(is_instance_of(stream_ids, get_gateway().jvm.java.util.Set)) + self.assertEqual(stream_ids.size(), 2) + self.assertTrue('stream-a' in stream_ids) + self.assertTrue('stream-b' in stream_ids) + + def test_set_stream_pattern(self): + source = DynamicKafkaSource.builder() \ + .set_stream_pattern('stream-*') \ + .set_kafka_metadata_service(self._build_metadata_service()) \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .build() + subscriber = get_field_value(source.get_java_function(), 'kafkaStreamSubscriber') + self.assertEqual( + subscriber.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber' + '.StreamPatternSubscriber' + ) + stream_pattern = get_field_value(subscriber, 'streamPattern') + self.assertTrue(is_instance_of(stream_pattern, get_gateway().jvm.java.util.regex.Pattern)) + self.assertEqual(stream_pattern.toString(), 'stream-*') + + def test_set_properties(self): + source = DynamicKafkaSource.builder() \ + .set_stream_ids({'stream-a'}) \ + .set_kafka_metadata_service(self._build_metadata_service()) \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .set_group_id('test-group') \ + .set_client_id_prefix('test-client-id') \ + .set_property('test-property', 'test-value') \ + .build() + properties = get_field_value(source.get_java_function(), 'properties') + self.assertEqual(properties.getProperty('group.id'), 'test-group') + self.assertEqual(properties.getProperty('client.id.prefix'), 'test-client-id') + self.assertEqual(properties.getProperty('test-property'), 'test-value') + + def test_set_starting_offsets(self): + def _build_source(initializer: KafkaOffsetsInitializer): + return DynamicKafkaSource.builder() \ + .set_stream_ids({'stream-a'}) \ + .set_kafka_metadata_service(self._build_metadata_service()) \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .set_starting_offsets(initializer) \ + .build() + + self._check_reader_handled_offsets_initializer( + _build_source(KafkaOffsetsInitializer.latest()), -1, KafkaOffsetResetStrategy.LATEST + ) + self._check_reader_handled_offsets_initializer( + _build_source(KafkaOffsetsInitializer.earliest()), -2, + KafkaOffsetResetStrategy.EARLIEST + ) + self._check_reader_handled_offsets_initializer( + _build_source(KafkaOffsetsInitializer.committed_offsets()), -3, + KafkaOffsetResetStrategy.NONE + ) + self._check_reader_handled_offsets_initializer( + _build_source(KafkaOffsetsInitializer.committed_offsets( + KafkaOffsetResetStrategy.LATEST + )), -3, KafkaOffsetResetStrategy.LATEST + ) + self._check_timestamp_offsets_initializer( + _build_source(KafkaOffsetsInitializer.timestamp(100)), 100 + ) + specified_offsets = { + KafkaTopicPartition('test_topic1', 1): 1000, + KafkaTopicPartition('test_topic2', 2): 2000 + } + self._check_specified_offsets_initializer( + _build_source(KafkaOffsetsInitializer.offsets(specified_offsets)), specified_offsets, + KafkaOffsetResetStrategy.EARLIEST + ) + self._check_specified_offsets_initializer( + _build_source(KafkaOffsetsInitializer.offsets( + specified_offsets, + KafkaOffsetResetStrategy.LATEST + )), + specified_offsets, + KafkaOffsetResetStrategy.LATEST + ) + + def test_bounded(self): + def _build_source(initializer: KafkaOffsetsInitializer): + return DynamicKafkaSource.builder() \ + .set_stream_ids({'stream-a'}) \ + .set_kafka_metadata_service(self._build_metadata_service()) \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .set_bounded(initializer) \ + .build() + + def _check_bounded(source: DynamicKafkaSource): + self.assertEqual( + get_field_value(source.get_java_function(), 'boundedness').toString(), 'BOUNDED' + ) + + source = _build_source(KafkaOffsetsInitializer.latest()) + _check_bounded(source) + self._check_reader_handled_offsets_initializer( + source, -1, KafkaOffsetResetStrategy.LATEST, False + ) + source = _build_source(KafkaOffsetsInitializer.earliest()) + _check_bounded(source) + self._check_reader_handled_offsets_initializer( + source, -2, KafkaOffsetResetStrategy.EARLIEST, False + ) + source = _build_source(KafkaOffsetsInitializer.committed_offsets()) + _check_bounded(source) + self._check_reader_handled_offsets_initializer( + source, -3, KafkaOffsetResetStrategy.NONE, False + ) + source = _build_source(KafkaOffsetsInitializer.committed_offsets( + KafkaOffsetResetStrategy.LATEST + )) + _check_bounded(source) + self._check_reader_handled_offsets_initializer( + source, -3, KafkaOffsetResetStrategy.LATEST, False + ) + source = _build_source(KafkaOffsetsInitializer.timestamp(100)) + _check_bounded(source) + self._check_timestamp_offsets_initializer(source, 100, False) + specified_offsets = { + KafkaTopicPartition('test_topic1', 1): 1000, + KafkaTopicPartition('test_topic2', 2): 2000 + } + source = _build_source(KafkaOffsetsInitializer.offsets(specified_offsets)) + _check_bounded(source) + self._check_specified_offsets_initializer( + source, specified_offsets, KafkaOffsetResetStrategy.EARLIEST, False + ) + source = _build_source(KafkaOffsetsInitializer.offsets( + specified_offsets, + KafkaOffsetResetStrategy.LATEST) + ) + _check_bounded(source) + self._check_specified_offsets_initializer( + source, specified_offsets, KafkaOffsetResetStrategy.LATEST, False + ) + + def test_set_value_only_deserializer(self): + source = DynamicKafkaSource.builder() \ + .set_stream_ids({'stream-a'}) \ + .set_kafka_metadata_service(self._build_metadata_service()) \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .build() + deserialization_schema_wrapper = get_field_value(source.get_java_function(), + 'deserializationSchema') + self.assertEqual( + deserialization_schema_wrapper.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.reader.deserializer' + '.KafkaValueOnlyDeserializationSchemaWrapper' + ) + deserialization_schema = get_field_value(deserialization_schema_wrapper, + 'deserializationSchema') + self.assertEqual(deserialization_schema.getClass().getCanonicalName(), + 'org.apache.flink.api.common.serialization.SimpleStringSchema') + + def test_set_deserializer(self): + record_deserializer = KafkaRecordDeserializationSchema.value_only(SimpleStringSchema()) + source = DynamicKafkaSource.builder() \ + .set_stream_ids({'stream-a'}) \ + .set_kafka_metadata_service(self._build_metadata_service()) \ + .set_deserializer(record_deserializer) \ + .build() + deserialization_schema_wrapper = get_field_value(source.get_java_function(), + 'deserializationSchema') + self.assertEqual( + deserialization_schema_wrapper.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.reader.deserializer' + '.KafkaValueOnlyDeserializationSchemaWrapper' + ) + + @staticmethod + def _build_metadata_service() -> SingleClusterTopicMetadataService: + return SingleClusterTopicMetadataService( + 'test-cluster', {'bootstrap.servers': 'localhost:9092'}) + + def _check_reader_handled_offsets_initializer(self, + source: DynamicKafkaSource, + offset: int, + reset_strategy: KafkaOffsetResetStrategy, + is_start: bool = True): + if is_start: + field_name = 'startingOffsetsInitializer' + else: + field_name = 'stoppingOffsetsInitializer' + offsets_initializer = get_field_value(source.get_java_function(), field_name) + self.assertEqual( + offsets_initializer.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.enumerator.initializer' + '.ReaderHandledOffsetsInitializer' + ) + + starting_offset = get_field_value(offsets_initializer, 'startingOffset') + self.assertEqual(starting_offset, offset) + + offset_reset_strategy = get_field_value(offsets_initializer, 'offsetResetStrategy') + self.assertTrue( + offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy()) + ) + + def _check_timestamp_offsets_initializer(self, + source: DynamicKafkaSource, + timestamp: int, + is_start: bool = True): + if is_start: + field_name = 'startingOffsetsInitializer' + else: + field_name = 'stoppingOffsetsInitializer' + offsets_initializer = get_field_value(source.get_java_function(), field_name) + self.assertEqual( + offsets_initializer.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.enumerator.initializer' + '.TimestampOffsetsInitializer' + ) + + starting_timestamp = get_field_value(offsets_initializer, 'startingTimestamp') + self.assertEqual(starting_timestamp, timestamp) + + def _check_specified_offsets_initializer(self, + source: DynamicKafkaSource, + offsets: Dict[KafkaTopicPartition, int], + reset_strategy: KafkaOffsetResetStrategy, + is_start: bool = True): + if is_start: + field_name = 'startingOffsetsInitializer' + else: + field_name = 'stoppingOffsetsInitializer' + offsets_initializer = get_field_value(source.get_java_function(), field_name) + self.assertEqual( + offsets_initializer.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.enumerator.initializer' + '.SpecifiedOffsetsInitializer' + ) + + initial_offsets = get_field_value(offsets_initializer, 'initialOffsets') + self.assertTrue(is_instance_of(initial_offsets, get_gateway().jvm.java.util.Map)) + self.assertEqual(initial_offsets.size(), len(offsets)) + for j_topic_partition in initial_offsets: + topic_partition = KafkaTopicPartition(j_topic_partition.topic(), + j_topic_partition.partition()) + self.assertIsNotNone(offsets.get(topic_partition)) + self.assertEqual(initial_offsets[j_topic_partition], offsets[topic_partition]) + + offset_reset_strategy = get_field_value(offsets_initializer, 'offsetResetStrategy') + self.assertTrue( + offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy()) + )
