This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.2 by this push:
new f9ad1b140e8 [hotfix][examples] Add Python examples on how to read
binary data from Kafka
f9ad1b140e8 is described below
commit f9ad1b140e8bac2c7023ed61d19d4ae4c3685b56
Author: Dian Fu <[email protected]>
AuthorDate: Wed Nov 12 22:07:46 2025 +0800
[hotfix][examples] Add Python examples on how to read binary data from Kafka
---
.../datastream/connectors/kafka_avro_format.py | 41 ++++++---
.../connectors/kafka_bytearray_format.py | 102 +++++++++++++++++++++
.../datastream/connectors/kafka_csv_format.py | 41 ++++++---
.../datastream/connectors/kafka_json_format.py | 41 ++++++---
4 files changed, 186 insertions(+), 39 deletions(-)
diff --git
a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
index 2a66bc4e8b1..e6256d85493 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
@@ -18,9 +18,10 @@
import logging
import sys
-from pyflink.common import Types
+from pyflink.common import Types, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors.kafka import FlinkKafkaProducer,
FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import
(KafkaRecordSerializationSchema, KafkaSink,
+ KafkaSource,
KafkaOffsetsInitializer)
from pyflink.datastream.formats.avro import AvroRowSerializationSchema,
AvroRowDeserializationSchema
@@ -43,14 +44,20 @@ def write_to_kafka(env):
}"""
)
- kafka_producer = FlinkKafkaProducer(
- topic='test_avro_topic',
- serialization_schema=serialization_schema,
- producer_config={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group'}
+ record_serializer = KafkaRecordSerializationSchema.builder() \
+ .set_topic('test_avro_topic') \
+ .set_value_serialization_schema(serialization_schema) \
+ .build()
+ kafka_sink = (
+ KafkaSink.builder()
+ .set_record_serializer(record_serializer)
+ .set_bootstrap_servers('localhost:9092')
+ .set_property("group.id", "test_group")
+ .build()
)
# note that the output type of ds must be RowTypeInfo
- ds.add_sink(kafka_producer)
+ ds.sink_to(kafka_sink)
env.execute()
@@ -67,14 +74,22 @@ def read_from_kafka(env):
}"""
)
- kafka_consumer = FlinkKafkaConsumer(
- topics='test_avro_topic',
- deserialization_schema=deserialization_schema,
- properties={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group_1'}
+ kafka_source = (
+ KafkaSource.builder()
+ .set_topics('test_avro_topic')
+ .set_value_only_deserializer(deserialization_schema)
+ .set_properties({'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group_1'})
+ .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+ .build()
)
- kafka_consumer.set_start_from_earliest()
- env.add_source(kafka_consumer).print()
+ ds = env.from_source(
+ kafka_source,
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="kafka source"
+ )
+
+ ds.print()
env.execute()
diff --git
a/flink-python/pyflink/examples/datastream/connectors/kafka_bytearray_format.py
b/flink-python/pyflink/examples/datastream/connectors/kafka_bytearray_format.py
new file mode 100644
index 00000000000..d2e311cd2e6
--- /dev/null
+++
b/flink-python/pyflink/examples/datastream/connectors/kafka_bytearray_format.py
@@ -0,0 +1,102 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import json
+import logging
+import sys
+
+from pyflink.common import Types, ByteArraySchema, WatermarkStrategy
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors.kafka import KafkaSource, \
+ KafkaOffsetsInitializer, KafkaSink, KafkaRecordSerializationSchema
+
+
+# This example works since Flink 2.0 since ByteArraySchema was introduced in
Flink 2.0
+
+# Make sure that the Kafka cluster is started and the topic 'test_json_topic'
is
+# created before executing this job.
+def write_to_kafka(env):
+ data = [
+ (json.dumps({
+ "id": 1,
+ "country": "USA"
+ }).encode("utf-8"),),
+ (json.dumps({
+ "id": 2,
+ "country": "Canada"
+ }).encode("utf-8"),),
+ (json.dumps({
+ "id": 3,
+ "country": "Germany"
+ }).encode("utf-8"),)
+ ]
+ type_info = Types.ROW([Types.PRIMITIVE_ARRAY(Types.BYTE())])
+ ds = env.from_collection(data, type_info=type_info)
+
+ # declare the output type as Types.PRIMITIVE_ARRAY(Types.BYTE()),
+ # otherwise, Types.PICKLED_BYTE_ARRAY() will be used by default, it will
+ # use pickler to serialize the result byte array which is unnecessary
+ ds = ds.map(lambda x: x[0],
output_type=Types.PRIMITIVE_ARRAY(Types.BYTE()))
+
+ record_serializer = KafkaRecordSerializationSchema.builder() \
+ .set_topic('test_bytearray_topic') \
+ .set_value_serialization_schema(ByteArraySchema()) \
+ .build()
+ kafka_sink = (
+ KafkaSink.builder()
+ .set_record_serializer(record_serializer)
+ .set_bootstrap_servers('localhost:9092')
+ .set_property("group.id", "test_group")
+ .build()
+ )
+
+ ds.sink_to(kafka_sink)
+ env.execute()
+
+
+def read_from_kafka(env):
+ kafka_source = (
+ KafkaSource.builder()
+ .set_topics('test_bytearray_topic')
+ .set_value_only_deserializer(ByteArraySchema())
+ .set_properties({'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group_1'})
+ .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+ .build()
+ )
+
+ ds = env.from_source(
+ kafka_source,
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="kafka source"
+ )
+
+ # the data read out from the source is byte array, decode it as a string
+ ds.map(lambda data: data.decode("utf-8")).print()
+ env.execute()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
+
+ env = StreamExecutionEnvironment.get_execution_environment()
+ env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
+
+ print("start writing data to kafka")
+ write_to_kafka(env)
+
+ print("start reading data from kafka")
+ read_from_kafka(env)
diff --git
a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
index 39c134a8ed3..3fca6240e66 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
@@ -18,9 +18,10 @@
import logging
import sys
-from pyflink.common import Types
+from pyflink.common import Types, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors.kafka import FlinkKafkaProducer,
FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import
(KafkaRecordSerializationSchema, KafkaSink,
+ KafkaSource,
KafkaOffsetsInitializer)
from pyflink.datastream.formats.csv import CsvRowSerializationSchema,
CsvRowDeserializationSchema
@@ -33,14 +34,20 @@ def write_to_kafka(env):
type_info=type_info)
serialization_schema = CsvRowSerializationSchema.Builder(type_info).build()
- kafka_producer = FlinkKafkaProducer(
- topic='test_csv_topic',
- serialization_schema=serialization_schema,
- producer_config={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group'}
+ record_serializer = KafkaRecordSerializationSchema.builder() \
+ .set_topic('test_csv_topic') \
+ .set_value_serialization_schema(serialization_schema) \
+ .build()
+ kafka_sink = (
+ KafkaSink.builder()
+ .set_record_serializer(record_serializer)
+ .set_bootstrap_servers('localhost:9092')
+ .set_property("group.id", "test_group")
+ .build()
)
# note that the output type of ds must be RowTypeInfo
- ds.add_sink(kafka_producer)
+ ds.sink_to(kafka_sink)
env.execute()
@@ -48,14 +55,22 @@ def read_from_kafka(env):
type_info = Types.ROW([Types.INT(), Types.STRING()])
deserialization_schema =
CsvRowDeserializationSchema.Builder(type_info).build()
- kafka_consumer = FlinkKafkaConsumer(
- topics='test_csv_topic',
- deserialization_schema=deserialization_schema,
- properties={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group_1'}
+ kafka_source = (
+ KafkaSource.builder()
+ .set_topics('test_csv_topic')
+ .set_value_only_deserializer(deserialization_schema)
+ .set_properties({'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group_1'})
+ .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+ .build()
)
- kafka_consumer.set_start_from_earliest()
- env.add_source(kafka_consumer).print()
+ ds = env.from_source(
+ kafka_source,
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="kafka source"
+ )
+
+ ds.print()
env.execute()
diff --git
a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
index 3cae241ba43..25480b95cef 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
@@ -18,9 +18,10 @@
import logging
import sys
-from pyflink.common import Types
+from pyflink.common import Types, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors.kafka import FlinkKafkaProducer,
FlinkKafkaConsumer
+from pyflink.datastream.connectors.kafka import
(KafkaRecordSerializationSchema, KafkaSink,
+ KafkaSource,
KafkaOffsetsInitializer)
from pyflink.datastream.formats.json import JsonRowSerializationSchema,
JsonRowDeserializationSchema
@@ -35,14 +36,20 @@ def write_to_kafka(env):
serialization_schema = JsonRowSerializationSchema.Builder() \
.with_type_info(type_info) \
.build()
- kafka_producer = FlinkKafkaProducer(
- topic='test_json_topic',
- serialization_schema=serialization_schema,
- producer_config={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group'}
+ record_serializer = KafkaRecordSerializationSchema.builder() \
+ .set_topic('test_json_topic') \
+ .set_value_serialization_schema(serialization_schema) \
+ .build()
+ kafka_sink = (
+ KafkaSink.builder()
+ .set_record_serializer(record_serializer)
+ .set_bootstrap_servers('localhost:9092')
+ .set_property("group.id", "test_group")
+ .build()
)
# note that the output type of ds must be RowTypeInfo
- ds.add_sink(kafka_producer)
+ ds.sink_to(kafka_sink)
env.execute()
@@ -50,14 +57,22 @@ def read_from_kafka(env):
deserialization_schema = JsonRowDeserializationSchema.Builder() \
.type_info(Types.ROW([Types.INT(), Types.STRING()])) \
.build()
- kafka_consumer = FlinkKafkaConsumer(
- topics='test_json_topic',
- deserialization_schema=deserialization_schema,
- properties={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group_1'}
+ kafka_source = (
+ KafkaSource.builder()
+ .set_topics('test_json_topic')
+ .set_value_only_deserializer(deserialization_schema)
+ .set_properties({'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group_1'})
+ .set_starting_offsets(KafkaOffsetsInitializer.earliest())
+ .build()
+ )
+
+ ds = env.from_source(
+ kafka_source,
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="kafka source"
)
- kafka_consumer.set_start_from_earliest()
- env.add_source(kafka_consumer).print()
+ ds.print()
env.execute()