morsapaes commented on a change in pull request #16:
URL: https://github.com/apache/flink-playgrounds/pull/16#discussion_r485975110



##########
File path: pyflink-walkthrough/docker-compose.yml
##########
@@ -0,0 +1,96 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+version: '2.1'
+services:
+  jobmanager:
+    build: .
+    image: pyflink/pyflink:1.11.0-scala_2.11
+    volumes:
+      - .:/opt/pyflink-walkthrough
+    hostname: "jobmanager"
+    expose:
+      - "6123"
+    ports:
+      - "8081:8081"
+    command: jobmanager
+    environment:
+      - JOB_MANAGER_RPC_ADDRESS=jobmanager
+  taskmanager:
+    image: pyflink/pyflink:1.11.0-scala_2.11
+    volumes:
+    - .:/opt/pyflink-walkthrough
+    expose:
+      - "6121"
+      - "6122"
+    depends_on:
+      - jobmanager
+    command: taskmanager
+    links:
+      - jobmanager:jobmanager
+    environment:
+      - JOB_MANAGER_RPC_ADDRESS=jobmanager
+  zookeeper:
+    image: wurstmeister/zookeeper:3.4.6
+    ports:
+      - "2181:2181"
+  kafka:
+    image: wurstmeister/kafka:2.12-2.2.1
+    ports:
+      - "9092"
+    depends_on:
+      - zookeeper
+    environment:
+      HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_CREATE_TOPICS: "payment_msg:1:1"
+    volumes:
+      - /var/run/docker.sock:/var/run/docker.sock
+  generator:
+    build: generator
+    image: generator:1.0
+    depends_on:
+      - kafka
+  elasticsearch:
+    image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
+    environment:
+      - cluster.name=docker-cluster
+      - bootstrap.memory_lock=true
+      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
+      - discovery.type=single-node
+    ports:
+      - "9200:9200"
+      - "9300:9300"
+    ulimits:
+      memlock:
+        soft: -1
+        hard: -1
+      nofile:
+        soft: 65536
+        hard: 65536
+  kibana:
+    image: docker.elastic.co/kibana/kibana:7.8.0
+    ports:
+      - "5601:5601"
+    depends_on:
+      - elasticsearch
+  load-kibaba-dashboad:

Review comment:
       ```suggestion
     load-kibana-dashboard:
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 

Review comment:
       ```suggestion
   The transaction data will be processed with PyFlink using the Python script 
`[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py)`.
 This script will first map the `provinceId` in the input records to its 
corresponding province name using a Python UDF, and then sum the transaction 
amount for each province using a group aggregate. 
   ```

##########
File path: pyflink-walkthrough/payment_msg_proccessing.py
##########
@@ -0,0 +1,93 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.table import StreamTableEnvironment, DataTypes, 
EnvironmentSettings
+from pyflink.table.udf import udf
+
+
+provinces = ("Beijing", "Shanghai", "Hangzhou", "Shenzhen", "Jiangxi", 
"Chongqing", "Xizang")
+
+
+@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
+def province_id_to_name(id):
+    return provinces[id]
+
+
+def log_processing():
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env, 
environment_settings=env_settings)
+    
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
 True)
+
+    source_ddl = """
+            CREATE TABLE payment_msg(
+                createTime VARCHAR,
+                orderId BIGINT,
+                payAmount DOUBLE,
+                payPlatform INT,
+                provinceId INT
+            ) WITH (
+              'connector.type' = 'kafka',
+              'connector.version' = 'universal',
+              'connector.topic' = 'payment_msg',
+              'connector.properties.bootstrap.servers' = 'kafka:9092',
+              'connector.properties.group.id' = 'test_3',
+              'connector.startup-mode' = 'latest-offset',
+              'format.type' = 'json'
+            )
+            """
+
+    es_sink_ddl = """
+            CREATE TABLE es_sink (
+                province VARCHAR PRIMARY KEY,
+                pay_amount DOUBLE
+            ) with (
+                'connector.type' = 'elasticsearch',
+                'connector.version' = '7',
+                'connector.hosts' = 'http://elasticsearch:9200',
+                'connector.index' = 'platform_pay_amount_1',
+                'connector.document-type' = 'payment',
+                'update-mode' = 'upsert',
+                'connector.flush-on-checkpoint' = 'true',
+                'connector.key-delimiter' = '$',
+                'connector.key-null-literal' = 'n/a',
+                'connector.bulk-flush.max-size' = '42mb',
+                'connector.bulk-flush.max-actions' = '32',
+                'connector.bulk-flush.interval' = '1000',
+                'connector.bulk-flush.backoff.delay' = '1000',
+                'format.type' = 'json'
+            )
+    """
+
+    t_env.sql_update(source_ddl)
+    t_env.sql_update(es_sink_ddl)
+    t_env.register_function('province_id_to_name', province_id_to_name)
+
+    t_env.from_path("payment_msg") \
+        .select("province_id_to_name(provinceId) as province, payAmount") \
+        .group_by("province") \
+        .select("province, sum(payAmount) as pay_amount") \
+        .insert_into("es_sink")
+
+    t_env.execute("payment_demo")

Review comment:
       ```suggestion
       t_env.execute_sql(source_ddl)
       t_env.execute_sql(es_sink_ddl)
       t_env.register_function('province_id_to_name', province_id_to_name)
   
       t_env.from_path("payment_msg") \
           .select("province_id_to_name(provinceId) as province, payAmount") \
           .group_by("province") \
           .select("province, sum(payAmount) as pay_amount") \
           .execute_insert("es_sink")
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.

Review comment:
       ```suggestion
   In this playground, you will learn how to build and run an end-to-end 
PyFlink pipeline for data analytics, covering the following steps:
   
    * Reading data from a Kafka source;
    * Creating and using a 
[UDF](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html);
    * Performing a simple aggregation over the source data;
    * Writing the results to Elasticsearch and visualizing them in Kibana. 
    
    The environment is based on Docker Compose, so the only requirement is that 
you have [Docker](https://docs.docker.com/get-docker/) installed in your 
machine.
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.
+
+## Setup
+
+The pyflink-walkthrough requires a custom Docker image, as well as public 
images for Flink, Elasticsearch, Kafka, and ZooKeeper. 
+
+The 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file of the pyflink-walkthrough is located in the `pyflink-walkthrough` root 
directory.
+
+### Building the custom Docker image
+
+Build the Docker image by running

Review comment:
       ```suggestion
   First, build the Docker image by running:
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.
+
+## Setup
+
+The pyflink-walkthrough requires a custom Docker image, as well as public 
images for Flink, Elasticsearch, Kafka, and ZooKeeper. 
+
+The 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file of the pyflink-walkthrough is located in the `pyflink-walkthrough` root 
directory.
+
+### Building the custom Docker image

Review comment:
       ```suggestion
   ### Building the Docker image
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.
+
+## Setup
+
+The pyflink-walkthrough requires a custom Docker image, as well as public 
images for Flink, Elasticsearch, Kafka, and ZooKeeper. 
+
+The 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file of the pyflink-walkthrough is located in the `pyflink-walkthrough` root 
directory.
+
+### Building the custom Docker image
+
+Build the Docker image by running
+
+```bash
+docker-compose build
+```
+
+### Starting the Playground
+
+Once you built the Docker image, run the following command to start the 
playground

Review comment:
       ```suggestion
   Once the Docker image build is complete, run the following command to start 
the playground:
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.
+
+## Setup
+
+The pyflink-walkthrough requires a custom Docker image, as well as public 
images for Flink, Elasticsearch, Kafka, and ZooKeeper. 
+
+The 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file of the pyflink-walkthrough is located in the `pyflink-walkthrough` root 
directory.
+
+### Building the custom Docker image
+
+Build the Docker image by running
+
+```bash
+docker-compose build
+```
+
+### Starting the Playground
+
+Once you built the Docker image, run the following command to start the 
playground
+
+```bash
+docker-compose up -d
+```
+
+You can check if the playground was successfully started by accessing the 
WebUI of(You may need to wait about 1 min before all services come up.):

Review comment:
       ```suggestion
   One way of checking if the playground was successfully started is accessing 
some of the services exposed:
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.
+
+## Setup
+
+The pyflink-walkthrough requires a custom Docker image, as well as public 
images for Flink, Elasticsearch, Kafka, and ZooKeeper. 
+
+The 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file of the pyflink-walkthrough is located in the `pyflink-walkthrough` root 
directory.

Review comment:
       ```suggestion
   As mentioned, the environment for this walkthrough is based on Docker 
Compose; and uses a custom image to spin up Flink (JobManager+TaskManager), 
Kafka+Zookeeper, the data generator and Elasticsearch+Kibana containers.
   
   You can find the 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file in the `pyflink-walkthrough` root directory.
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.
+
+## Setup
+
+The pyflink-walkthrough requires a custom Docker image, as well as public 
images for Flink, Elasticsearch, Kafka, and ZooKeeper. 
+
+The 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file of the pyflink-walkthrough is located in the `pyflink-walkthrough` root 
directory.
+
+### Building the custom Docker image
+
+Build the Docker image by running
+
+```bash
+docker-compose build
+```
+
+### Starting the Playground
+
+Once you built the Docker image, run the following command to start the 
playground
+
+```bash
+docker-compose up -d
+```
+
+You can check if the playground was successfully started by accessing the 
WebUI of(You may need to wait about 1 min before all services come up.):
+
+1. visiting Flink Web UI [http://localhost:8081](http://localhost:8081).
+2. visiting Elasticsearch [http://localhost:9200](http://localhost:9200).
+3. visiting Kibana [http://localhost:5601](http://localhost:5601).
+
+
+### Stopping the Playground
+
+To stop the playground, run the following command
+
+```bash
+docker-compose down
+```
+
+
+## Run jobs

Review comment:
       ```suggestion
   ## Running the PyFlink Job
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.

Review comment:
       ```suggestion
   You will be using Kafka to store sample input data about payment 
transactions. A simple data generator 
`[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)`
 is provided to continuously write new records to the `payment_msg` Kafka 
topic. Each record is structured as follows: 
   
   `{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}`
   
   * `createTime`: The creation time of the transaction. 
   * `orderId`: The ID of the current transaction.
   * `payAmount`: The pay amount of the current transaction.
   * `payPlatform`: The platform used to pay the order (pc or mobile).
   * `provinceId`: The ID of the province the user is located in. 
   
   You can use the following command to read data from the Kafka topic and 
check whether it's generated correctly:
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.

Review comment:
       ```suggestion
   Kibana is an open source data visualization dashboard for ElasticSearch. You 
will use it to visualize the results of your PyFlink pipeline.
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.
+
+## Setup
+
+The pyflink-walkthrough requires a custom Docker image, as well as public 
images for Flink, Elasticsearch, Kafka, and ZooKeeper. 
+
+The 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file of the pyflink-walkthrough is located in the `pyflink-walkthrough` root 
directory.
+
+### Building the custom Docker image
+
+Build the Docker image by running
+
+```bash
+docker-compose build
+```
+
+### Starting the Playground
+
+Once you built the Docker image, run the following command to start the 
playground
+
+```bash
+docker-compose up -d
+```
+
+You can check if the playground was successfully started by accessing the 
WebUI of(You may need to wait about 1 min before all services come up.):
+
+1. visiting Flink Web UI [http://localhost:8081](http://localhost:8081).
+2. visiting Elasticsearch [http://localhost:9200](http://localhost:9200).
+3. visiting Kibana [http://localhost:5601](http://localhost:5601).
+
+
+### Stopping the Playground
+
+To stop the playground, run the following command

Review comment:
       ```suggestion
   To stop the playground, run the following command:
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.
+
+## Setup
+
+The pyflink-walkthrough requires a custom Docker image, as well as public 
images for Flink, Elasticsearch, Kafka, and ZooKeeper. 
+
+The 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file of the pyflink-walkthrough is located in the `pyflink-walkthrough` root 
directory.
+
+### Building the custom Docker image
+
+Build the Docker image by running
+
+```bash
+docker-compose build
+```
+
+### Starting the Playground
+
+Once you built the Docker image, run the following command to start the 
playground
+
+```bash
+docker-compose up -d
+```
+
+You can check if the playground was successfully started by accessing the 
WebUI of(You may need to wait about 1 min before all services come up.):
+
+1. visiting Flink Web UI [http://localhost:8081](http://localhost:8081).
+2. visiting Elasticsearch [http://localhost:9200](http://localhost:9200).
+3. visiting Kibana [http://localhost:5601](http://localhost:5601).
+
+

Review comment:
       ```suggestion
   **Note:** you may need to wait around 1 minute before all the services come 
up.
   
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.
+
+## Setup
+
+The pyflink-walkthrough requires a custom Docker image, as well as public 
images for Flink, Elasticsearch, Kafka, and ZooKeeper. 
+
+The 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file of the pyflink-walkthrough is located in the `pyflink-walkthrough` root 
directory.
+
+### Building the custom Docker image
+
+Build the Docker image by running
+
+```bash
+docker-compose build
+```
+
+### Starting the Playground
+
+Once you built the Docker image, run the following command to start the 
playground
+
+```bash
+docker-compose up -d
+```
+
+You can check if the playground was successfully started by accessing the 
WebUI of(You may need to wait about 1 min before all services come up.):
+
+1. visiting Flink Web UI [http://localhost:8081](http://localhost:8081).
+2. visiting Elasticsearch [http://localhost:9200](http://localhost:9200).
+3. visiting Kibana [http://localhost:5601](http://localhost:5601).
+
+
+### Stopping the Playground
+
+To stop the playground, run the following command
+
+```bash
+docker-compose down
+```
+
+
+## Run jobs
+
+1. Submit the PyFlink job.
+```shell script
+$ docker-compose exec jobmanager ./bin/flink run -py 
/opt/pyflink-walkthrough/payment_msg_proccessing.py -d
+```
+
+2. Open [kibana ui](http://localhost:5601) and choose the dashboard: 
payment_dashboard
+
+![image](pic/dash_board.png)
+
+![image](pic/final.png)
+
+3. Stop PyFlink job:
+
+Visit [http://localhost:8081/#/overview](http://localhost:8081/#/overview) , 
select the job and click `Cancle`.

Review comment:
       ```suggestion
   Visit the Flink Web UI at 
[http://localhost:8081/#/overview](http://localhost:8081/#/overview), select 
the job and click `Cancel` on the upper right side.
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.
+
+## Setup
+
+The pyflink-walkthrough requires a custom Docker image, as well as public 
images for Flink, Elasticsearch, Kafka, and ZooKeeper. 
+
+The 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file of the pyflink-walkthrough is located in the `pyflink-walkthrough` root 
directory.
+
+### Building the custom Docker image
+
+Build the Docker image by running
+
+```bash
+docker-compose build
+```
+
+### Starting the Playground
+
+Once you built the Docker image, run the following command to start the 
playground
+
+```bash
+docker-compose up -d
+```
+
+You can check if the playground was successfully started by accessing the 
WebUI of(You may need to wait about 1 min before all services come up.):
+
+1. visiting Flink Web UI [http://localhost:8081](http://localhost:8081).
+2. visiting Elasticsearch [http://localhost:9200](http://localhost:9200).
+3. visiting Kibana [http://localhost:5601](http://localhost:5601).
+
+
+### Stopping the Playground
+
+To stop the playground, run the following command
+
+```bash
+docker-compose down
+```
+
+
+## Run jobs
+
+1. Submit the PyFlink job.
+```shell script
+$ docker-compose exec jobmanager ./bin/flink run -py 
/opt/pyflink-walkthrough/payment_msg_proccessing.py -d
+```
+
+2. Open [kibana ui](http://localhost:5601) and choose the dashboard: 
payment_dashboard
+
+![image](pic/dash_board.png)
+
+![image](pic/final.png)
+
+3. Stop PyFlink job:

Review comment:
       ```suggestion
   3. Stop the PyFlink job:
   ```

##########
File path: pyflink-walkthrough/README.md
##########
@@ -0,0 +1,102 @@
+# pyflink-walkthrough
+
+## Background
+
+In this playground, you will learn how to manage and run PyFlink Jobs. The 
pipeline of this walkthrough reads data from Kafka, performs aggregations and 
writes results to Elasticsearch visualized via Kibana. The environment is 
managed by Docker so that all you need is a docker on your computer.
+
+- Kafka
+
+Kafka is used to store input data in this walkthrough. The script 
[generate_source_data.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/generate_source_data.py)
 is used to generate transaction data and writes into the payment_msg kafka 
topic. Each record includes 5 fields: 
+```text
+{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 
28306.44976403719, "payPlatform": 0, "provinceId": 4}
+```
+```text
+createTime: The creation time of the transaction. 
+orderId: The id of the current transaction.
+payAmount: The pay amount of the current transaction.
+payPlatform: The platform used to pay the order, pc or mobile.
+provinceId: The id of the province for the user. 
+```
+
+- Generator 
+
+A simple data generator is provided that continuously writes new records into 
Kafka. 
+You can use the following command to read data in kafka and check whether the 
data is generated correctly.
+
+```shell script
+$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server 
kafka:9092 --topic payment_msg
+{"createTime":"2020-07-27 
09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
+{"createTime":"2020-07-27 
09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
+{"createTime":"2020-07-27 
09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
+{"createTime":"2020-07-27 
09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
+```
+
+- PyFlink
+
+The transaction data is processed by a PyFlink job, 
[payment_msg_proccessing.py](https://github.com/hequn8128/pyflink-walkthrough/blob/master/payment_msg_proccessing.py).
 The job maps the province id to province name for better demonstration using a 
Python UDF and then sums the payment for each province using a group aggregate. 
+
+- ElasticSearch
+
+ElasticSearch is used to store upstream processing results and provide 
efficient query service.
+
+- Kibana
+
+Kibana is an open source data visualization dashboard for ElasticSearch. We 
use it to visualize our processing results.
+
+## Setup
+
+The pyflink-walkthrough requires a custom Docker image, as well as public 
images for Flink, Elasticsearch, Kafka, and ZooKeeper. 
+
+The 
[docker-compose.yaml](https://github.com/hequn8128/pyflink-walkthrough/blob/master/docker-compose.yml)
 file of the pyflink-walkthrough is located in the `pyflink-walkthrough` root 
directory.
+
+### Building the custom Docker image
+
+Build the Docker image by running
+
+```bash
+docker-compose build
+```
+
+### Starting the Playground
+
+Once you built the Docker image, run the following command to start the 
playground
+
+```bash
+docker-compose up -d
+```
+
+You can check if the playground was successfully started by accessing the 
WebUI of(You may need to wait about 1 min before all services come up.):
+
+1. visiting Flink Web UI [http://localhost:8081](http://localhost:8081).
+2. visiting Elasticsearch [http://localhost:9200](http://localhost:9200).
+3. visiting Kibana [http://localhost:5601](http://localhost:5601).
+
+
+### Stopping the Playground
+
+To stop the playground, run the following command
+
+```bash
+docker-compose down
+```
+
+
+## Run jobs
+
+1. Submit the PyFlink job.
+```shell script
+$ docker-compose exec jobmanager ./bin/flink run -py 
/opt/pyflink-walkthrough/payment_msg_proccessing.py -d
+```
+
+2. Open [kibana ui](http://localhost:5601) and choose the dashboard: 
payment_dashboard

Review comment:
       ```suggestion
   2. Navigate to the [Kibana UI](http://localhost:5601) and choose the 
pre-created dashboard `payment_dashboard`.
   ```

##########
File path: pyflink-walkthrough/payment_msg_proccessing.py
##########
@@ -0,0 +1,93 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.table import StreamTableEnvironment, DataTypes, 
EnvironmentSettings
+from pyflink.table.udf import udf
+
+
+provinces = ("Beijing", "Shanghai", "Hangzhou", "Shenzhen", "Jiangxi", 
"Chongqing", "Xizang")
+
+
+@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
+def province_id_to_name(id):
+    return provinces[id]
+
+
+def log_processing():
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env, 
environment_settings=env_settings)
+    
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
 True)
+
+    source_ddl = """
+            CREATE TABLE payment_msg(
+                createTime VARCHAR,
+                orderId BIGINT,
+                payAmount DOUBLE,
+                payPlatform INT,
+                provinceId INT
+            ) WITH (
+              'connector.type' = 'kafka',
+              'connector.version' = 'universal',
+              'connector.topic' = 'payment_msg',
+              'connector.properties.bootstrap.servers' = 'kafka:9092',
+              'connector.properties.group.id' = 'test_3',
+              'connector.startup-mode' = 'latest-offset',
+              'format.type' = 'json'
+            )
+            """
+
+    es_sink_ddl = """
+            CREATE TABLE es_sink (
+                province VARCHAR PRIMARY KEY,
+                pay_amount DOUBLE
+            ) with (
+                'connector.type' = 'elasticsearch',
+                'connector.version' = '7',
+                'connector.hosts' = 'http://elasticsearch:9200',
+                'connector.index' = 'platform_pay_amount_1',
+                'connector.document-type' = 'payment',
+                'update-mode' = 'upsert',
+                'connector.flush-on-checkpoint' = 'true',
+                'connector.key-delimiter' = '$',
+                'connector.key-null-literal' = 'n/a',
+                'connector.bulk-flush.max-size' = '42mb',
+                'connector.bulk-flush.max-actions' = '32',
+                'connector.bulk-flush.interval' = '1000',
+                'connector.bulk-flush.backoff.delay' = '1000',
+                'format.type' = 'json'
+            )
+    """
+
+    t_env.sql_update(source_ddl)
+    t_env.sql_update(es_sink_ddl)
+    t_env.register_function('province_id_to_name', province_id_to_name)
+
+    t_env.from_path("payment_msg") \
+        .select("province_id_to_name(provinceId) as province, payAmount") \
+        .group_by("province") \
+        .select("province, sum(payAmount) as pay_amount") \
+        .insert_into("es_sink")
+
+    t_env.execute("payment_demo")

Review comment:
       Use 1.11 methods instead of the deprecated ones.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to