This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.2 by this push:
     new f9ad1b140e8 [hotfix][examples] Add Python examples on how to read 
binary data from Kafka
f9ad1b140e8 is described below

commit f9ad1b140e8bac2c7023ed61d19d4ae4c3685b56
Author: Dian Fu <[email protected]>
AuthorDate: Wed Nov 12 22:07:46 2025 +0800

    [hotfix][examples] Add Python examples on how to read binary data from Kafka
---
 .../datastream/connectors/kafka_avro_format.py     |  41 ++++++---
 .../connectors/kafka_bytearray_format.py           | 102 +++++++++++++++++++++
 .../datastream/connectors/kafka_csv_format.py      |  41 ++++++---
 .../datastream/connectors/kafka_json_format.py     |  41 ++++++---
 4 files changed, 186 insertions(+), 39 deletions(-)

diff --git 
a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py 
b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
index 2a66bc4e8b1..e6256d85493 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
@@ -18,9 +18,10 @@
 import logging
 import sys
 
-from pyflink.common import Types
+from pyflink.common import Types, WatermarkStrategy
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, 
FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import 
(KafkaRecordSerializationSchema, KafkaSink,
+                                                 KafkaSource, 
KafkaOffsetsInitializer)
 from pyflink.datastream.formats.avro import AvroRowSerializationSchema, 
AvroRowDeserializationSchema
 
 
@@ -43,14 +44,20 @@ def write_to_kafka(env):
             }"""
     )
 
-    kafka_producer = FlinkKafkaProducer(
-        topic='test_avro_topic',
-        serialization_schema=serialization_schema,
-        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group'}
+    record_serializer = KafkaRecordSerializationSchema.builder() \
+        .set_topic('test_avro_topic') \
+        .set_value_serialization_schema(serialization_schema) \
+        .build()
+    kafka_sink = (
+        KafkaSink.builder()
+        .set_record_serializer(record_serializer)
+        .set_bootstrap_servers('localhost:9092')
+        .set_property("group.id", "test_group")
+        .build()
     )
 
     # note that the output type of ds must be RowTypeInfo
-    ds.add_sink(kafka_producer)
+    ds.sink_to(kafka_sink)
     env.execute()
 
 
@@ -67,14 +74,22 @@ def read_from_kafka(env):
             }"""
     )
 
-    kafka_consumer = FlinkKafkaConsumer(
-        topics='test_avro_topic',
-        deserialization_schema=deserialization_schema,
-        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group_1'}
+    kafka_source = (
+        KafkaSource.builder()
+        .set_topics('test_avro_topic')
+        .set_value_only_deserializer(deserialization_schema)
+        .set_properties({'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group_1'})
+        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+        .build()
     )
-    kafka_consumer.set_start_from_earliest()
 
-    env.add_source(kafka_consumer).print()
+    ds = env.from_source(
+        kafka_source,
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="kafka source"
+    )
+
+    ds.print()
     env.execute()
 
 
diff --git 
a/flink-python/pyflink/examples/datastream/connectors/kafka_bytearray_format.py 
b/flink-python/pyflink/examples/datastream/connectors/kafka_bytearray_format.py
new file mode 100644
index 00000000000..d2e311cd2e6
--- /dev/null
+++ 
b/flink-python/pyflink/examples/datastream/connectors/kafka_bytearray_format.py
@@ -0,0 +1,102 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import json
+import logging
+import sys
+
+from pyflink.common import Types, ByteArraySchema, WatermarkStrategy
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors.kafka import KafkaSource, \
+    KafkaOffsetsInitializer, KafkaSink, KafkaRecordSerializationSchema
+
+
+# This example works since Flink 2.0 since ByteArraySchema was introduced in 
Flink 2.0
+
+# Make sure that the Kafka cluster is started and the topic 'test_json_topic' 
is
+# created before executing this job.
+def write_to_kafka(env):
+    data = [
+        (json.dumps({
+            "id": 1,
+            "country": "USA"
+        }).encode("utf-8"),),
+        (json.dumps({
+            "id": 2,
+            "country": "Canada"
+        }).encode("utf-8"),),
+        (json.dumps({
+            "id": 3,
+            "country": "Germany"
+        }).encode("utf-8"),)
+    ]
+    type_info = Types.ROW([Types.PRIMITIVE_ARRAY(Types.BYTE())])
+    ds = env.from_collection(data, type_info=type_info)
+
+    # declare the output type as Types.PRIMITIVE_ARRAY(Types.BYTE()),
+    # otherwise, Types.PICKLED_BYTE_ARRAY() will be used by default, it will
+    # use pickler to serialize the result byte array which is unnecessary
+    ds = ds.map(lambda x: x[0], 
output_type=Types.PRIMITIVE_ARRAY(Types.BYTE()))
+
+    record_serializer = KafkaRecordSerializationSchema.builder() \
+        .set_topic('test_bytearray_topic') \
+        .set_value_serialization_schema(ByteArraySchema()) \
+        .build()
+    kafka_sink = (
+        KafkaSink.builder()
+        .set_record_serializer(record_serializer)
+        .set_bootstrap_servers('localhost:9092')
+        .set_property("group.id", "test_group")
+        .build()
+    )
+
+    ds.sink_to(kafka_sink)
+    env.execute()
+
+
+def read_from_kafka(env):
+    kafka_source = (
+        KafkaSource.builder()
+        .set_topics('test_bytearray_topic')
+        .set_value_only_deserializer(ByteArraySchema())
+        .set_properties({'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group_1'})
+        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+        .build()
+    )
+
+    ds = env.from_source(
+        kafka_source,
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="kafka source"
+    )
+
+    # the data read out from the source is byte array, decode it as a string
+    ds.map(lambda data: data.decode("utf-8")).print()
+    env.execute()
+
+
+if __name__ == '__main__':
+    logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")
+
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
+
+    print("start writing data to kafka")
+    write_to_kafka(env)
+
+    print("start reading data from kafka")
+    read_from_kafka(env)
diff --git 
a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py 
b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
index 39c134a8ed3..3fca6240e66 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
@@ -18,9 +18,10 @@
 import logging
 import sys
 
-from pyflink.common import Types
+from pyflink.common import Types, WatermarkStrategy
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, 
FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import 
(KafkaRecordSerializationSchema, KafkaSink,
+                                                 KafkaSource, 
KafkaOffsetsInitializer)
 from pyflink.datastream.formats.csv import CsvRowSerializationSchema, 
CsvRowDeserializationSchema
 
 
@@ -33,14 +34,20 @@ def write_to_kafka(env):
         type_info=type_info)
 
     serialization_schema = CsvRowSerializationSchema.Builder(type_info).build()
-    kafka_producer = FlinkKafkaProducer(
-        topic='test_csv_topic',
-        serialization_schema=serialization_schema,
-        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group'}
+    record_serializer = KafkaRecordSerializationSchema.builder() \
+        .set_topic('test_csv_topic') \
+        .set_value_serialization_schema(serialization_schema) \
+        .build()
+    kafka_sink = (
+        KafkaSink.builder()
+        .set_record_serializer(record_serializer)
+        .set_bootstrap_servers('localhost:9092')
+        .set_property("group.id", "test_group")
+        .build()
     )
 
     # note that the output type of ds must be RowTypeInfo
-    ds.add_sink(kafka_producer)
+    ds.sink_to(kafka_sink)
     env.execute()
 
 
@@ -48,14 +55,22 @@ def read_from_kafka(env):
     type_info = Types.ROW([Types.INT(), Types.STRING()])
     deserialization_schema = 
CsvRowDeserializationSchema.Builder(type_info).build()
 
-    kafka_consumer = FlinkKafkaConsumer(
-        topics='test_csv_topic',
-        deserialization_schema=deserialization_schema,
-        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group_1'}
+    kafka_source = (
+        KafkaSource.builder()
+        .set_topics('test_csv_topic')
+        .set_value_only_deserializer(deserialization_schema)
+        .set_properties({'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group_1'})
+        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+        .build()
     )
-    kafka_consumer.set_start_from_earliest()
 
-    env.add_source(kafka_consumer).print()
+    ds = env.from_source(
+        kafka_source,
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="kafka source"
+    )
+
+    ds.print()
     env.execute()
 
 
diff --git 
a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py 
b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
index 3cae241ba43..25480b95cef 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
@@ -18,9 +18,10 @@
 import logging
 import sys
 
-from pyflink.common import Types
+from pyflink.common import Types, WatermarkStrategy
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, 
FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import 
(KafkaRecordSerializationSchema, KafkaSink,
+                                                 KafkaSource, 
KafkaOffsetsInitializer)
 from pyflink.datastream.formats.json import JsonRowSerializationSchema, 
JsonRowDeserializationSchema
 
 
@@ -35,14 +36,20 @@ def write_to_kafka(env):
     serialization_schema = JsonRowSerializationSchema.Builder() \
         .with_type_info(type_info) \
         .build()
-    kafka_producer = FlinkKafkaProducer(
-        topic='test_json_topic',
-        serialization_schema=serialization_schema,
-        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group'}
+    record_serializer = KafkaRecordSerializationSchema.builder() \
+        .set_topic('test_json_topic') \
+        .set_value_serialization_schema(serialization_schema) \
+        .build()
+    kafka_sink = (
+        KafkaSink.builder()
+        .set_record_serializer(record_serializer)
+        .set_bootstrap_servers('localhost:9092')
+        .set_property("group.id", "test_group")
+        .build()
     )
 
     # note that the output type of ds must be RowTypeInfo
-    ds.add_sink(kafka_producer)
+    ds.sink_to(kafka_sink)
     env.execute()
 
 
@@ -50,14 +57,22 @@ def read_from_kafka(env):
     deserialization_schema = JsonRowDeserializationSchema.Builder() \
         .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
         .build()
-    kafka_consumer = FlinkKafkaConsumer(
-        topics='test_json_topic',
-        deserialization_schema=deserialization_schema,
-        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group_1'}
+    kafka_source = (
+        KafkaSource.builder()
+        .set_topics('test_json_topic')
+        .set_value_only_deserializer(deserialization_schema)
+        .set_properties({'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group_1'})
+        .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+        .build()
+    )
+
+    ds = env.from_source(
+        kafka_source,
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="kafka source"
     )
-    kafka_consumer.set_start_from_earliest()
 
-    env.add_source(kafka_consumer).print()
+    ds.print()
     env.execute()
 
 

Reply via email to