This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new fa8fbf5bc29 [examples][python] Add examples on how to use 
json/csv/avro formats in Python DataStream API
fa8fbf5bc29 is described below

commit fa8fbf5bc29b1432e1baaefe3049b50ffeca706a
Author: Dian Fu <[email protected]>
AuthorDate: Fri Apr 22 22:26:44 2022 +0800

    [examples][python] Add examples on how to use json/csv/avro formats in 
Python DataStream API
---
 .../examples/datastream/formats/__init__.py        | 17 ++++
 .../examples/datastream/formats/avro_format.py     | 91 ++++++++++++++++++++++
 .../examples/datastream/formats/csv_format.py      | 72 +++++++++++++++++
 .../examples/datastream/formats/json_format.py     | 73 +++++++++++++++++
 4 files changed, 253 insertions(+)

diff --git a/flink-python/pyflink/examples/datastream/formats/__init__.py 
b/flink-python/pyflink/examples/datastream/formats/__init__.py
new file mode 100644
index 00000000000..65b48d4d79b
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/formats/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-python/pyflink/examples/datastream/formats/avro_format.py 
b/flink-python/pyflink/examples/datastream/formats/avro_format.py
new file mode 100644
index 00000000000..9e3b2658d89
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/formats/avro_format.py
@@ -0,0 +1,91 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import logging
+import sys
+
+from pyflink.common import AvroRowSerializationSchema, Types, 
AvroRowDeserializationSchema
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import FlinkKafkaProducer, 
FlinkKafkaConsumer
+
+
+# Make sure that the Kafka cluster is started and the topic 'test_avro_topic' 
is
+# created before executing this job.
+def write_to_kafka(env):
+    ds = env.from_collection([
+        (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 
'hello'), (6, 'hello')],
+        type_info=Types.ROW([Types.INT(), Types.STRING()]))
+
+    serialization_schema = AvroRowSerializationSchema(
+        avro_schema_string="""
+            {
+                "type": "record",
+                "name": "TestRecord",
+                "fields": [
+                    {"name": "id", "type": "int"},
+                    {"name": "name", "type": "string"}
+                ]
+            }"""
+    )
+
+    kafka_producer = FlinkKafkaProducer(
+        topic='test_avro_topic',
+        serialization_schema=serialization_schema,
+        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group'}
+    )
+
+    # note that the output type of ds must be RowTypeInfo
+    ds.add_sink(kafka_producer)
+    env.execute()
+
+
+def read_from_kafka(env):
+    deserialization_schema = AvroRowDeserializationSchema(
+        avro_schema_string="""
+            {
+                "type": "record",
+                "name": "TestRecord",
+                "fields": [
+                    {"name": "id", "type": "int"},
+                    {"name": "name", "type": "string"}
+                ]
+            }"""
+    )
+
+    kafka_consumer = FlinkKafkaConsumer(
+        topics='test_avro_topic',
+        deserialization_schema=deserialization_schema,
+        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group_1'}
+    )
+    kafka_consumer.set_start_from_earliest()
+
+    env.add_source(kafka_consumer).print()
+    env.execute()
+
+
+if __name__ == '__main__':
+    logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")
+
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.add_jars("file:///path/to/flink-sql-avro-1.15.0.jar",
+                 "file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
+
+    print("start writing data to kafka")
+    write_to_kafka(env)
+
+    print("start reading data from kafka")
+    read_from_kafka(env)
diff --git a/flink-python/pyflink/examples/datastream/formats/csv_format.py 
b/flink-python/pyflink/examples/datastream/formats/csv_format.py
new file mode 100644
index 00000000000..21aa3ab17a6
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/formats/csv_format.py
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import logging
+import sys
+
+from pyflink.common import Types, JsonRowDeserializationSchema, 
CsvRowSerializationSchema
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import FlinkKafkaProducer, 
FlinkKafkaConsumer
+
+
+# Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is
+# created before executing this job.
+def write_to_kafka(env):
+    type_info = Types.ROW([Types.INT(), Types.STRING()])
+    ds = env.from_collection([
+        (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 
'hello'), (6, 'hello')],
+        type_info=type_info)
+
+    serialization_schema = CsvRowSerializationSchema.Builder(type_info).build()
+    kafka_producer = FlinkKafkaProducer(
+        topic='test_csv_topic',
+        serialization_schema=serialization_schema,
+        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group'}
+    )
+
+    # note that the output type of ds must be RowTypeInfo
+    ds.add_sink(kafka_producer)
+    env.execute()
+
+
+def read_from_kafka(env):
+    deserialization_schema = JsonRowDeserializationSchema.Builder() \
+        .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
+        .build()
+
+    kafka_consumer = FlinkKafkaConsumer(
+        topics='test_csv_topic',
+        deserialization_schema=deserialization_schema,
+        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group_1'}
+    )
+    kafka_consumer.set_start_from_earliest()
+
+    env.add_source(kafka_consumer).print()
+    env.execute()
+
+
+if __name__ == '__main__':
+    logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")
+
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
+
+    print("start writing data to kafka")
+    write_to_kafka(env)
+
+    print("start reading data from kafka")
+    read_from_kafka(env)
diff --git a/flink-python/pyflink/examples/datastream/formats/json_format.py 
b/flink-python/pyflink/examples/datastream/formats/json_format.py
new file mode 100644
index 00000000000..10bf85394a2
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/formats/json_format.py
@@ -0,0 +1,73 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import logging
+import sys
+
+from pyflink.common import Types, JsonRowDeserializationSchema, 
JsonRowSerializationSchema
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import FlinkKafkaProducer, 
FlinkKafkaConsumer
+
+
+# Make sure that the Kafka cluster is started and the topic 'test_json_topic' 
is
+# created before executing this job.
+def write_to_kafka(env):
+    type_info = Types.ROW([Types.INT(), Types.STRING()])
+    ds = env.from_collection(
+        [(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 
'hello'), (6, 'hello')],
+        type_info=type_info)
+
+    serialization_schema = JsonRowSerializationSchema.Builder() \
+        .with_type_info(type_info) \
+        .build()
+    kafka_producer = FlinkKafkaProducer(
+        topic='test_json_topic',
+        serialization_schema=serialization_schema,
+        producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group'}
+    )
+
+    # note that the output type of ds must be RowTypeInfo
+    ds.add_sink(kafka_producer)
+    env.execute()
+
+
+def read_from_kafka(env):
+    deserialization_schema = JsonRowDeserializationSchema.Builder() \
+        .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
+        .build()
+    kafka_consumer = FlinkKafkaConsumer(
+        topics='test_json_topic',
+        deserialization_schema=deserialization_schema,
+        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group_1'}
+    )
+    kafka_consumer.set_start_from_earliest()
+
+    env.add_source(kafka_consumer).print()
+    env.execute()
+
+
+if __name__ == '__main__':
+    logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")
+
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
+
+    print("start writing data to kafka")
+    write_to_kafka(env)
+
+    print("start reading data from kafka")
+    read_from_kafka(env)

Reply via email to