This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new fa8fbf5bc29 [examples][python] Add examples on how to use
json/csv/avro formats in Python DataStream API
fa8fbf5bc29 is described below
commit fa8fbf5bc29b1432e1baaefe3049b50ffeca706a
Author: Dian Fu <[email protected]>
AuthorDate: Fri Apr 22 22:26:44 2022 +0800
[examples][python] Add examples on how to use json/csv/avro formats in
Python DataStream API
---
.../examples/datastream/formats/__init__.py | 17 ++++
.../examples/datastream/formats/avro_format.py | 91 ++++++++++++++++++++++
.../examples/datastream/formats/csv_format.py | 72 +++++++++++++++++
.../examples/datastream/formats/json_format.py | 73 +++++++++++++++++
4 files changed, 253 insertions(+)
diff --git a/flink-python/pyflink/examples/datastream/formats/__init__.py
b/flink-python/pyflink/examples/datastream/formats/__init__.py
new file mode 100644
index 00000000000..65b48d4d79b
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/formats/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-python/pyflink/examples/datastream/formats/avro_format.py
b/flink-python/pyflink/examples/datastream/formats/avro_format.py
new file mode 100644
index 00000000000..9e3b2658d89
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/formats/avro_format.py
@@ -0,0 +1,91 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import logging
+import sys
+
+from pyflink.common import AvroRowSerializationSchema, Types,
AvroRowDeserializationSchema
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer
+
+
+# Make sure that the Kafka cluster is started and the topic 'test_avro_topic'
is
+# created before executing this job.
+def write_to_kafka(env):
+ ds = env.from_collection([
+ (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6,
'hello'), (6, 'hello')],
+ type_info=Types.ROW([Types.INT(), Types.STRING()]))
+
+ serialization_schema = AvroRowSerializationSchema(
+ avro_schema_string="""
+ {
+ "type": "record",
+ "name": "TestRecord",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"}
+ ]
+ }"""
+ )
+
+ kafka_producer = FlinkKafkaProducer(
+ topic='test_avro_topic',
+ serialization_schema=serialization_schema,
+ producer_config={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group'}
+ )
+
+ # note that the output type of ds must be RowTypeInfo
+ ds.add_sink(kafka_producer)
+ env.execute()
+
+
+def read_from_kafka(env):
+ deserialization_schema = AvroRowDeserializationSchema(
+ avro_schema_string="""
+ {
+ "type": "record",
+ "name": "TestRecord",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"}
+ ]
+ }"""
+ )
+
+ kafka_consumer = FlinkKafkaConsumer(
+ topics='test_avro_topic',
+ deserialization_schema=deserialization_schema,
+ properties={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group_1'}
+ )
+ kafka_consumer.set_start_from_earliest()
+
+ env.add_source(kafka_consumer).print()
+ env.execute()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
+
+ env = StreamExecutionEnvironment.get_execution_environment()
+ env.add_jars("file:///path/to/flink-sql-avro-1.15.0.jar",
+ "file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
+
+ print("start writing data to kafka")
+ write_to_kafka(env)
+
+ print("start reading data from kafka")
+ read_from_kafka(env)
diff --git a/flink-python/pyflink/examples/datastream/formats/csv_format.py
b/flink-python/pyflink/examples/datastream/formats/csv_format.py
new file mode 100644
index 00000000000..21aa3ab17a6
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/formats/csv_format.py
@@ -0,0 +1,72 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import logging
+import sys
+
+from pyflink.common import Types, JsonRowDeserializationSchema,
CsvRowSerializationSchema
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer
+
+
+# Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is
+# created before executing this job.
+def write_to_kafka(env):
+ type_info = Types.ROW([Types.INT(), Types.STRING()])
+ ds = env.from_collection([
+ (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6,
'hello'), (6, 'hello')],
+ type_info=type_info)
+
+ serialization_schema = CsvRowSerializationSchema.Builder(type_info).build()
+ kafka_producer = FlinkKafkaProducer(
+ topic='test_csv_topic',
+ serialization_schema=serialization_schema,
+ producer_config={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group'}
+ )
+
+ # note that the output type of ds must be RowTypeInfo
+ ds.add_sink(kafka_producer)
+ env.execute()
+
+
+def read_from_kafka(env):
+ deserialization_schema = JsonRowDeserializationSchema.Builder() \
+ .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
+ .build()
+
+ kafka_consumer = FlinkKafkaConsumer(
+ topics='test_csv_topic',
+ deserialization_schema=deserialization_schema,
+ properties={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group_1'}
+ )
+ kafka_consumer.set_start_from_earliest()
+
+ env.add_source(kafka_consumer).print()
+ env.execute()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
+
+ env = StreamExecutionEnvironment.get_execution_environment()
+ env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
+
+ print("start writing data to kafka")
+ write_to_kafka(env)
+
+ print("start reading data from kafka")
+ read_from_kafka(env)
diff --git a/flink-python/pyflink/examples/datastream/formats/json_format.py
b/flink-python/pyflink/examples/datastream/formats/json_format.py
new file mode 100644
index 00000000000..10bf85394a2
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/formats/json_format.py
@@ -0,0 +1,73 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import logging
+import sys
+
+from pyflink.common import Types, JsonRowDeserializationSchema,
JsonRowSerializationSchema
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer
+
+
+# Make sure that the Kafka cluster is started and the topic 'test_json_topic'
is
+# created before executing this job.
+def write_to_kafka(env):
+ type_info = Types.ROW([Types.INT(), Types.STRING()])
+ ds = env.from_collection(
+ [(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6,
'hello'), (6, 'hello')],
+ type_info=type_info)
+
+ serialization_schema = JsonRowSerializationSchema.Builder() \
+ .with_type_info(type_info) \
+ .build()
+ kafka_producer = FlinkKafkaProducer(
+ topic='test_json_topic',
+ serialization_schema=serialization_schema,
+ producer_config={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group'}
+ )
+
+ # note that the output type of ds must be RowTypeInfo
+ ds.add_sink(kafka_producer)
+ env.execute()
+
+
+def read_from_kafka(env):
+ deserialization_schema = JsonRowDeserializationSchema.Builder() \
+ .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
+ .build()
+ kafka_consumer = FlinkKafkaConsumer(
+ topics='test_json_topic',
+ deserialization_schema=deserialization_schema,
+ properties={'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group_1'}
+ )
+ kafka_consumer.set_start_from_earliest()
+
+ env.add_source(kafka_consumer).print()
+ env.execute()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
+
+ env = StreamExecutionEnvironment.get_execution_environment()
+ env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
+
+ print("start writing data to kafka")
+ write_to_kafka(env)
+
+ print("start reading data from kafka")
+ read_from_kafka(env)