This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new dc4efea9 [FLINK-38917][docs] Add documentation about Python
DynamicKafkaSource (#211)
dc4efea9 is described below
commit dc4efea959842463da95359a29c34b7a07211032
Author: bowenli86 <[email protected]>
AuthorDate: Thu Jan 15 18:08:15 2026 -0800
[FLINK-38917][docs] Add documentation about Python DynamicKafkaSource (#211)
Co-authored-by: Bowen Li <[email protected]>
---
.../docs/connectors/datastream/dynamic-kafka.md | 27 ++++++++++++++++++++++
.../docs/connectors/datastream/dynamic-kafka.md | 27 ++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
index 3c9ba7cd..2dc9d8ad 100644
--- a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
@@ -70,6 +70,23 @@ DynamicKafkaSource<String> source =
DynamicKafkaSource.<String>builder()
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Dynamic
Kafka Source");
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+metadata_service = SingleClusterTopicMetadataService(
+ "cluster-a",
+ {"bootstrap.servers": "localhost:9092"})
+
+source = DynamicKafkaSource.builder() \
+ .set_kafka_metadata_service(metadata_service) \
+ .set_stream_ids({"input-stream"}) \
+ .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
+ .set_value_only_deserializer(SimpleStringSchema()) \
+ .set_properties(properties) \
+ .build()
+
+env.from_source(source, WatermarkStrategy.no_watermarks(), "Dynamic Kafka
Source")
+```
+{{< /tab >}}
{{< /tabs >}}
The following properties are **required** for building a DynamicKafkaSource:
@@ -132,6 +149,11 @@ The Dynamic Kafka Source provides 2 ways of subscribing to
Kafka stream(s).
DynamicKafkaSource.builder().setStreamIds(Set.of("stream-a", "stream-b"));
```
{{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ DynamicKafkaSource.builder().set_stream_ids({"stream-a", "stream-b"})
+ ```
+ {{< /tab >}}
{{< /tabs >}}
* A regex pattern that subscribes to all Kafka stream ids that match the
provided regex. For example:
{{< tabs "DynamicKafkaSource#setStreamPattern" >}}
@@ -140,6 +162,11 @@ The Dynamic Kafka Source provides 2 ways of subscribing to
Kafka stream(s).
DynamicKafkaSource.builder().setStreamPattern(Pattern.of("stream.*"));
```
{{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ DynamicKafkaSource.builder().set_stream_pattern("stream.*")
+ ```
+ {{< /tab >}}
{{< /tabs >}}
### Kafka Metadata Service
diff --git a/docs/content/docs/connectors/datastream/dynamic-kafka.md
b/docs/content/docs/connectors/datastream/dynamic-kafka.md
index cad80cdd..2d5c73f2 100644
--- a/docs/content/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content/docs/connectors/datastream/dynamic-kafka.md
@@ -70,6 +70,23 @@ DynamicKafkaSource<String> source =
DynamicKafkaSource.<String>builder()
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Dynamic Kafka
Source");
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+metadata_service = SingleClusterTopicMetadataService(
+ "cluster-a",
+ {"bootstrap.servers": "localhost:9092"})
+
+source = DynamicKafkaSource.builder() \
+ .set_kafka_metadata_service(metadata_service) \
+ .set_stream_ids({"input-stream"}) \
+ .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
+ .set_value_only_deserializer(SimpleStringSchema()) \
+ .set_properties(properties) \
+ .build()
+
+env.from_source(source, WatermarkStrategy.no_watermarks(), "Dynamic Kafka
Source")
+```
+{{< /tab >}}
{{< /tabs >}}
The following properties are **required** for building a DynamicKafkaSource:
@@ -135,6 +152,11 @@ The Dynamic Kafka Source provides 2 ways of subscribing to
Kafka stream(s).
DynamicKafkaSource.builder().setStreamIds(Set.of("stream-a", "stream-b"));
```
{{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ DynamicKafkaSource.builder().set_stream_ids({"stream-a", "stream-b"})
+ ```
+ {{< /tab >}}
{{< /tabs >}}
* A regex pattern that subscribes to all Kafka stream ids that match the
provided regex. For example:
{{< tabs "DynamicKafkaSource#setStreamPattern" >}}
@@ -143,6 +165,11 @@ The Dynamic Kafka Source provides 2 ways of subscribing to
Kafka stream(s).
DynamicKafkaSource.builder().setStreamPattern(Pattern.of("stream.*"));
```
{{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ DynamicKafkaSource.builder().set_stream_pattern("stream.*")
+ ```
+ {{< /tab >}}
{{< /tabs >}}
### Kafka Metadata Service