sijie closed pull request #3262: [Pulsar-Flink] Refactor Flink Batch Sink 
Examples
URL: https://github.com/apache/pulsar/pull/3262
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/examples/flink-consumer-source/pom.xml 
b/examples/flink-consumer-source/pom.xml
index 35f5924966..088444b7eb 100644
--- a/examples/flink-consumer-source/pom.xml
+++ b/examples/flink-consumer-source/pom.xml
@@ -54,12 +54,6 @@
       <version>${log4j2.version}</version>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
@@ -77,6 +71,7 @@
       <artifactId>pulsar-flink</artifactId>
       <version>${project.version}</version>
     </dependency>
+
   </dependencies>
 
   <build>
@@ -107,7 +102,7 @@
                   
<mainClass>org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount</mainClass>
                 </transformer>
               </transformers>
-              <finalName>pulsar-flink-streaming-wordcount</finalName>
+              <finalName>pulsar-flink-examples</finalName>
               <filters>
                 <filter>
                   <artifact>*</artifact>
@@ -115,6 +110,15 @@
                     
<include>org/apache/flink/streaming/examples/kafka/**</include>
                     <include>org/apache/flink/streaming/**</include>
                     <include>org/apache/pulsar/**</include>
+                    <include>org/apache/flink/batch/**</include>
+                    <include>net/jpountz/**</include>
+                    <include>com/scurrilous/circe/**</include>
+                    <include>org/apache/commons/csv/**</include>
+                    <include>org/apache/flink/avro/generated/**</include>
+                    <include>org/apache/avro/**</include>
+                    <include>org/codehaus/jackson/**</include>
+                    <include>avro/shaded/com/google/common/**</include>
+                    <include>org/apache/flink/formats/avro/**</include>
                   </includes>
                 </filter>
               </filters>
@@ -122,6 +126,20 @@
           </execution>
         </executions>
       </plugin>
+      <!-- Scala Plugin to compile Scala Files -->
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <version>3.4.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>add-source</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
       <!-- Generate Test class from avro schema -->
       <plugin>
         <groupId>org.apache.avro</groupId>
diff --git 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index ef0048cbac..584d59fec1 100644
--- 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -18,10 +18,10 @@
  */
 package org.apache.flink.batch.connectors.pulsar.example;
 
-
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.avro.generated.NasaMission;
 import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
 
@@ -40,16 +40,30 @@
             
NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
             NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test 
Project").setStartYear(1975).setEndYear(1975).build());
 
-    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-    private static final String TOPIC_NAME = "my-flink-topic";
-
     public static void main(String[] args) throws Exception {
 
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url 
<pulsar-service-url> --topic <topic>");
+            return;
+        }
+
         // set up the execution environment
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().setGlobalJobParameters(parameterTool);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String topic = parameterTool.getRequired("topic");
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tTopic:\t" + topic);
 
         // create PulsarAvroOutputFormat instance
-        final OutputFormat<NasaMission> pulsarAvroOutputFormat = new 
PulsarAvroOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+        final OutputFormat<NasaMission> pulsarAvroOutputFormat = new 
PulsarAvroOutputFormat<>(serviceUrl, topic);
 
         // create DataSet
         DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
index 6b0f0ca6cd..3e658dc66a 100644
--- 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
+++ 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat;
 
 import java.util.Arrays;
@@ -40,17 +41,31 @@
             new Tuple4(4, "Skylab", 1973, 1974),
             new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
 
-    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-    private static final String TOPIC_NAME = "my-flink-topic";
-
     public static void main(String[] args) throws Exception {
 
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url 
<pulsar-service-url> --topic <topic>");
+            return;
+        }
+
         // set up the execution environment
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().setGlobalJobParameters(parameterTool);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String topic = parameterTool.getRequired("topic");
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tTopic:\t" + topic);
 
         // create PulsarCsvOutputFormat instance
         final OutputFormat<Tuple4<Integer, String, Integer, Integer>> 
pulsarCsvOutputFormat =
-                new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+                new PulsarCsvOutputFormat<>(serviceUrl, topic);
 
         // create DataSet
         DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = 
env.fromCollection(nasaMissions);
diff --git 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
index e037616c9f..3937ae9908 100644
--- 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
+++ 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
 
 import java.util.Arrays;
@@ -38,16 +39,30 @@
             new NasaMission(4, "Skylab", 1973, 1974),
             new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
 
-    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-    private static final String TOPIC_NAME = "my-flink-topic";
-
     public static void main(String[] args) throws Exception {
 
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url 
<pulsar-service-url> --topic <topic>");
+            return;
+        }
+
         // set up the execution environment
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().setGlobalJobParameters(parameterTool);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String topic = parameterTool.getRequired("topic");
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tTopic:\t" + topic);
 
         // create PulsarJsonOutputFormat instance
-        final OutputFormat<NasaMission> pulsarJsonOutputFormat = new 
PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+        final OutputFormat<NasaMission> pulsarJsonOutputFormat = new 
PulsarJsonOutputFormat<>(serviceUrl, topic);
 
         // create DataSet
         DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
index 6724c62a9d..c90d016eaa 100644
--- 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
+++ 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
 import org.apache.flink.util.Collector;
 
@@ -35,17 +36,31 @@
     private static final String EINSTEIN_QUOTE = "Imagination is more 
important than knowledge. " +
             "Knowledge is limited. Imagination encircles the world.";
 
-    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-    private static final String TOPIC_NAME = "my-flink-topic";
-
     public static void main(String[] args) throws Exception {
 
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url 
<pulsar-service-url> --topic <topic>");
+            return;
+        }
+
         // set up the execution environment
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().setGlobalJobParameters(parameterTool);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String topic = parameterTool.getRequired("topic");
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tTopic:\t" + topic);
 
         // create PulsarOutputFormat instance
         final OutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount 
-> wordWithCount.toString().getBytes());
+                new PulsarOutputFormat(serviceUrl, topic, wordWithCount -> 
wordWithCount.toString().getBytes());
 
         // create DataSet
         DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
diff --git 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 3116b3bc10..b93f5a3f69 100644
--- 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -53,71 +53,56 @@ dependencies {
 }
 ```
 
-# PulsarOutputFormat
-### Usage
+# Example
 
-Please find a sample usage as follows:
+### PulsarOutputFormat
 
-```java
-        private static final String EINSTEIN_QUOTE = "Imagination is more 
important than knowledge. " +
-                "Knowledge is limited. Imagination encircles the world.";
+In this example, Flink DataSet is processed as word-count and being written to 
Pulsar. Please find a complete example for PulsarOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala)
 
-        private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-        private static final String TOPIC_NAME = "my-flink-topic";
+The steps to run the example:
 
-        public static void main(String[] args) throws Exception {
+1. Start Pulsar Standalone.
 
-            // set up the execution environment
-            final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+    You can follow the 
[instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar 
standalone locally.
 
-            // create PulsarOutputFormat instance
-            final OutputFormat<String> pulsarOutputFormat =
-                    new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, 
wordWithCount -> wordWithCount.toString().getBytes());
+    ```shell
+    $ bin/pulsar standalone
+    ```
 
-            // create DataSet
-            DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
+2. Start Flink locally.
 
-            textDS.flatMap(new FlatMapFunction<String, String>() {
-                @Override
-                public void flatMap(String value, Collector<String> out) 
throws Exception {
-                    String[] words = value.toLowerCase().split(" ");
-                    for(String word: words) {
-                        out.collect(word.replace(".", ""));
-                    }
-                }
-            })
-            // filter words which length is bigger than 4
-            .filter(word -> word.length() > 4)
+    You can follow the 
[instructions](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html)
 to download and start Flink.
 
-            // write batch data to Pulsar
-            .output(pulsarOutputFormat);
+    ```shell
+    $ ./bin/start-cluster.sh
+    ```
 
-            // execute program
-            env.execute("Flink - Pulsar Batch WordCount");
-        }
-```
+3. Build the examples.
 
-### Sample Output
+    ```shell
+    $ cd ${PULSAR_HOME}
+    $ mvn clean install -DskipTests
+    ```
 
-Please find sample output for above application as follows:
-```
-imagination
-important
-knowledge
-knowledge
-limited
-imagination
-encircles
-world
-```
+4. Run the word count example to print results to stdout.
+
+    ```shell
+    # java
+    $ ./bin/flink run -c 
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkExample 
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar 
--service-url pulsar://localhost:6650 --topic test_flink_topic
 
-### Complete Example
+    # scala
+    $ ./bin/flink run -c 
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkScalaExample
 ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar 
--service-url pulsar://localhost:6650 --topic test_flink_topic
+    ```
 
-You can find a complete example 
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
-In this example, Flink DataSet is processed as word-count and being written to 
Pulsar.
+5. Once the flink word count example is running, you can use 
`bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
 
-### Complete Example Output
-Please find sample output for above linked application as follows:
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
+
+6. Please find sample output for above linked application as follows:
 ```
 WordWithCount { word = important, count = 1 }
 WordWithCount { word = encircles, count = 1 }
@@ -127,230 +112,105 @@ WordWithCount { word = limited, count = 1 }
 WordWithCount { word = world, count = 1 }
 ```
 
-# PulsarCsvOutputFormat
-### Usage
-
-Please find a sample usage as follows:
-
-```java
-        private static final List<Tuple4<Integer, String, Integer, Integer>> 
nasaMissions = Arrays.asList(
-                new Tuple4(1, "Mercury program", 1959, 1963),
-                new Tuple4(2, "Apollo program", 1961, 1972),
-                new Tuple4(3, "Gemini program", 1963, 1966),
-                new Tuple4(4, "Skylab", 1973, 1974),
-                new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
-
-        private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-        private static final String TOPIC_NAME = "my-flink-topic";
-
-        public static void main(String[] args) throws Exception {
-
-            // set up the execution environment
-            final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-            // create PulsarCsvOutputFormat instance
-            final OutputFormat<Tuple4<Integer, String, Integer, Integer>> 
pulsarCsvOutputFormat =
-                    new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
-
-            // create DataSet
-            DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = 
env.fromCollection(nasaMissions);
-            // map nasa mission names to upper-case
-            nasaMissionDS.map(
-                new MapFunction<Tuple4<Integer, String, Integer, Integer>, 
Tuple4<Integer, String, Integer, Integer>>() {
-                               @Override
-                               public Tuple4<Integer, String, Integer, 
Integer> map(
-                                       Tuple4<Integer, String, Integer, 
Integer> nasaMission) throws Exception {
-                                   return new Tuple4(
-                                           nasaMission.f0,
-                                           nasaMission.f1.toUpperCase(),
-                                           nasaMission.f2,
-                                           nasaMission.f3);
-                               }
-                           }
-            )
-            // filter missions which started after 1970
-            .filter(nasaMission -> nasaMission.f2 > 1970)
-            // write batch data to Pulsar
-            .output(pulsarCsvOutputFormat);
-
-            // set parallelism to write Pulsar in parallel (optional)
-            env.setParallelism(2);
-
-            // execute program
-            env.execute("Flink - Pulsar Batch Csv");
-
-        }
-```
-
-### Sample Output
 
-Please find sample output for above application as follows:
-```
-4,SKYLAB,1973,1974
-5,APOLLO–SOYUZ TEST PROJECT,1975,1975
-```
+### PulsarCsvOutputFormat
 
-### Complete Example
+In this example, Flink DataSet is processed and written to Pulsar in Csv 
format. Please find a complete example for PulsarCsvOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala)
 
-You can find a complete example 
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Csv 
format.
+The steps to run the example:
 
+Step 1, 2 and 3 are same as above.
 
-# PulsarJsonOutputFormat
-### Usage
+4. Run the word count example to print results to stdout.
 
-Please find a sample usage as follows:
+    ```shell
+    # java
+    $ ./bin/flink run -c 
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkExample 
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar 
--service-url pulsar://localhost:6650 --topic test_flink_topic
 
-```java
-        private static final List<NasaMission> nasaMissions = Arrays.asList(
-                new NasaMission(1, "Mercury program", 1959, 1963),
-                new NasaMission(2, "Apollo program", 1961, 1972),
-                new NasaMission(3, "Gemini program", 1963, 1966),
-                new NasaMission(4, "Skylab", 1973, 1974),
-                new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
+    # scala
+    $ ./bin/flink run -c 
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkScalaExample
 ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar 
--service-url pulsar://localhost:6650 --topic test_flink_topic
+    ```
 
-        private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
-        private static final String TOPIC_NAME = "my-flink-topic";
+5. Once the flink word count example is running, you can use 
`bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
 
-        public static void main(String[] args) throws Exception {
-
-            // set up the execution environment
-            final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
 
-            // create PulsarJsonOutputFormat instance
-            final OutputFormat<NasaMission> pulsarJsonOutputFormat = new 
PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+6. Please find sample output for above linked application as follows:
+```
+4,SKYLAB,1973,1974
+5,APOLLO–SOYUZ TEST PROJECT,1975,1975
+```
 
-            // create DataSet
-            DataSet<NasaMission> nasaMissionDS = 
env.fromCollection(nasaMissions);
-            // map nasa mission names to upper-case
-            nasaMissionDS.map(nasaMission -> new NasaMission(
-                    nasaMission.id,
-                    nasaMission.missionName.toUpperCase(),
-                    nasaMission.startYear,
-                    nasaMission.endYear))
-            // filter missions which started after 1970
-            .filter(nasaMission -> nasaMission.startYear > 1970)
-            // write batch data to Pulsar
-            .output(pulsarJsonOutputFormat);
 
-            // set parallelism to write Pulsar in parallel (optional)
-            env.setParallelism(2);
+### PulsarJsonOutputFormat
 
-            // execute program
-            env.execute("Flink - Pulsar Batch Json");
-        }
+In this example, Flink DataSet is processed and written to Pulsar in Json 
format. Please find a complete example for PulsarJsonOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala)
 
-        /**
-         * NasaMission data model
-         *
-         * Note: Property definitions of the model should be public or have 
getter functions to be visible
-         */
-        private static class NasaMission {
+**Note:** Property definitions of the model should be public or have getter 
functions to be visible.
 
-            private int id;
-            private String missionName;
-            private int startYear;
-            private int endYear;
+The steps to run the example:
 
-            public NasaMission(int id, String missionName, int startYear, int 
endYear) {
-                this.id = id;
-                this.missionName = missionName;
-                this.startYear = startYear;
-                this.endYear = endYear;
-            }
+Step 1, 2 and 3 are same as above.
 
-            public int getId() {
-                return id;
-            }
+4. Run the word count example to print results to stdout.
 
-            public String getMissionName() {
-                return missionName;
-            }
+    ```shell
+    # java
+    $ ./bin/flink run -c 
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkExample
 ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar 
--service-url pulsar://localhost:6650 --topic test_flink_topic
 
-            public int getStartYear() {
-                return startYear;
-            }
+    # scala
+    $ ./bin/flink run -c 
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkScalaExample
 ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar 
--service-url pulsar://localhost:6650 --topic test_flink_topic
+    ```
 
-            public int getEndYear() {
-                return endYear;
-            }
-        }
+5. Once the flink word count example is running, you can use 
`bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
 
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
 ```
 
-**Note:** Property definitions of the model should be public or have getter 
functions to be visible
-
-### Sample Output
-
-Please find sample output for above application as follows:
+6. Please find sample output for above linked application as follows:
 ```
 {"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
 {"id":5,"missionName":"APOLLO–SOYUZ TEST 
PROJECT","startYear":1975,"endYear":1975}
 ```
 
-### Complete Example
-
-You can find a complete example 
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Json 
format.
-
-
-# PulsarAvroOutputFormat
-### Usage
-
-Please find a sample usage as follows:
-
-```java
-        private static final List<NasaMission> nasaMissions = Arrays.asList(
-                    NasaMission.newBuilder().setId(1).setName("Mercury 
program").setStartYear(1959).setEndYear(1963).build(),
-                    NasaMission.newBuilder().setId(2).setName("Apollo 
program").setStartYear(1961).setEndYear(1972).build(),
-                    NasaMission.newBuilder().setId(3).setName("Gemini 
program").setStartYear(1963).setEndYear(1966).build(),
-                    
NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
-                    NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz 
Test Project").setStartYear(1975).setEndYear(1975).build());
-        
-            private static final String SERVICE_URL = 
"pulsar://127.0.0.1:6650";
-            private static final String TOPIC_NAME = "my-flink-topic";
-        
-            public static void main(String[] args) throws Exception {
-        
-                // set up the execution environment
-                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-        
-                // create PulsarAvroOutputFormat instance
-                final OutputFormat<NasaMission> pulsarAvroOutputFormat = new 
PulsarAvroOutputFormat<>(SERVICE_URL, TOPIC_NAME);
-        
-                // create DataSet
-                DataSet<NasaMission> nasaMissionDS = 
env.fromCollection(nasaMissions);
-                // map nasa mission names to upper-case
-                nasaMissionDS.map(nasaMission -> new NasaMission(
-                        nasaMission.getId(),
-                        nasaMission.getName(),
-                        nasaMission.getStartYear(),
-                        nasaMission.getEndYear()))
-                        // filter missions which started after 1970
-                        .filter(nasaMission -> nasaMission.getStartYear() > 
1970)
-                        // write batch data to Pulsar
-                        .output(pulsarAvroOutputFormat);
-        
-                // set parallelism to write Pulsar in parallel (optional)
-                env.setParallelism(2);
-        
-                // execute program
-                env.execute("Flink - Pulsar Batch Avro");
-            }
 
-```
+### PulsarAvroOutputFormat
+
+In this example, Flink DataSet is processed and written to Pulsar in Json 
format. Please find a complete example for PulsarAvroOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala)
+
+**Note:** NasaMission class are automatically generated by Avro.
+
+The steps to run the example:
 
-**Note:** NasaMission class are automatically generated by Avro
+Step 1, 2 and 3 are same as above.
 
-### Sample Output
+4. Run the word count example to print results to stdout.
 
-Please find sample output for above application as follows:
+    ```shell
+    # java
+    $ ./bin/flink run -c 
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkExample
 ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar 
--service-url pulsar://localhost:6650 --topic test_flink_topic
+
+    # scala
+    $ ./bin/flink run -c 
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkScalaExample
 ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar 
--service-url pulsar://localhost:6650 --topic test_flink_topic
+    ```
+
+5. Once the flink word count example is running, you can use 
`bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
+
+6. Please find sample output for above linked application as follows:
 ```
  "4,SKYLAB,1973,1974"
  "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
 ```
-
-### Complete Example
-
-You can find a complete example 
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Avro 
format.
diff --git 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
index 16c88f1c43..463e805853 100644
--- 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
+++ 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
@@ -30,7 +30,7 @@ See the [Pulsar 
Concepts](https://pulsar.apache.org/docs/en/concepts-overview/)
 
 ### PulsarConsumerSourceWordCount
 
-This Flink streaming job is consuming from a Pulsar topic and couting the 
wordcount in a streaming fashion. The job can write the word count results
+This Flink streaming job is consuming from a Pulsar topic and counting the 
wordcount in a streaming fashion. The job can write the word count results
 to stdout or another Pulsar topic.
 
 The steps to run the example:
@@ -61,7 +61,7 @@ The steps to run the example:
 4. Run the word count example to print results to stdout.
 
     ```shell
-    $ ./bin/flink run  
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar
 --service-url pulsar://localhost:6650 --input-topic test_src --subscription 
test_sub
+    $ ./bin/flink run  
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar 
--service-url pulsar://localhost:6650 --input-topic test_src --subscription 
test_sub
     ```
 
 5. Produce messages to topic `test_src`.
@@ -85,7 +85,7 @@ The steps to run the example:
 Alternatively, when you run the flink word count example at step 4, you can 
choose dump the result to another pulsar topic.
 
 ```shell
-$ ./bin/flink run  
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar
 --service-url pulsar://localhost:6650 --input-topic test_src --subscription 
test_sub --output-topic test_dest
+$ ./bin/flink run  
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar 
--service-url pulsar://localhost:6650 --input-topic test_src --subscription 
test_sub --output-topic test_dest
 ```
 
 Once the flink word count example is running, you can use `bin/pulsar-client` 
to tail the results produced into topic `test_dest`.
diff --git 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
index 0d255f2a1a..f10d6c1274 100644
--- 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
+++ 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.batch.connectors.pulsar.example
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.avro.generated.NasaMission
 import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
@@ -27,10 +28,7 @@ import 
org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
   */
 object FlinkPulsarBatchAvroSinkScalaExample {
 
-  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-  private val TOPIC_NAME = "my-flink-topic"
-
-  val nasaMissions = List(
+  private val nasaMissions = List(
     NasaMission.newBuilder.setId(1).setName("Mercury 
program").setStartYear(1959).setEndYear(1963).build,
     NasaMission.newBuilder.setId(2).setName("Apollo 
program").setStartYear(1961).setEndYear(1972).build,
     NasaMission.newBuilder.setId(3).setName("Gemini 
program").setStartYear(1963).setEndYear(1966).build,
@@ -39,12 +37,29 @@ object FlinkPulsarBatchAvroSinkScalaExample {
 
   def main(args: Array[String]): Unit = {
 
+    // parse input arguments
+    val parameterTool = ParameterTool.fromArgs(args)
+
+    if (parameterTool.getNumberOfParameters < 2) {
+      println("Missing parameters!")
+      println("Usage: pulsar --service-url <pulsar-service-url> --topic 
<topic>")
+      return
+    }
+
     // set up the execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.setGlobalJobParameters(parameterTool)
+
+    val serviceUrl = parameterTool.getRequired("service-url")
+    val topic = parameterTool.getRequired("topic")
+
+    println("Parameters:")
+    println("\tServiceUrl:\t" + serviceUrl)
+    println("\tTopic:\t" + topic)
 
     // create PulsarCsvOutputFormat instance
     val pulsarAvroOutputFormat =
-      new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+      new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic)
 
     // create DataSet
     val textDS = env.fromCollection(nasaMissions)
diff --git 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
index 7db844b3c2..3233616857 100644
--- 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
+++ 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.batch.connectors.pulsar.example
 
 import org.apache.flink.api.java.tuple.Tuple4
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat
 
@@ -33,9 +34,6 @@ object FlinkPulsarBatchCsvSinkScalaExample {
   private case class NasaMission(id: Int, missionName: String, startYear: Int, 
endYear: Int)
     extends Tuple4(id, missionName, startYear, endYear)
 
-  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-  private val TOPIC_NAME = "my-flink-topic"
-
   private val nasaMissions = List(
     NasaMission(1, "Mercury program", 1959, 1963),
     NasaMission(2, "Apollo program", 1961, 1972),
@@ -45,12 +43,29 @@ object FlinkPulsarBatchCsvSinkScalaExample {
 
   def main(args: Array[String]): Unit = {
 
+    // parse input arguments
+    val parameterTool = ParameterTool.fromArgs(args)
+
+    if (parameterTool.getNumberOfParameters < 2) {
+      println("Missing parameters!")
+      println("Usage: pulsar --service-url <pulsar-service-url> --topic 
<topic>")
+      return
+    }
+
     // set up the execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.setGlobalJobParameters(parameterTool)
+
+    val serviceUrl = parameterTool.getRequired("service-url")
+    val topic = parameterTool.getRequired("topic")
+
+    println("Parameters:")
+    println("\tServiceUrl:\t" + serviceUrl)
+    println("\tTopic:\t" + topic)
 
     // create PulsarCsvOutputFormat instance
     val pulsarCsvOutputFormat =
-      new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+      new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic)
 
     // create DataSet
     val textDS = env.fromCollection(nasaMissions)
diff --git 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
index 1f7fc19b0f..60d02e5629 100644
--- 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
+++ 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
@@ -18,9 +18,9 @@
  */
 package org.apache.flink.batch.connectors.pulsar.example
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat
-
 import scala.beans.BeanProperty
 
 /**
@@ -43,16 +43,30 @@ object FlinkPulsarBatchJsonSinkScalaExample {
     NasaMission(4, "Skylab", 1973, 1974),
     NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
 
-  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-  private val TOPIC_NAME = "my-flink-topic"
-
   def main(args: Array[String]): Unit = {
 
+    // parse input arguments
+    val parameterTool = ParameterTool.fromArgs(args)
+
+    if (parameterTool.getNumberOfParameters < 2) {
+      println("Missing parameters!")
+      println("Usage: pulsar --service-url <pulsar-service-url> --topic 
<topic>")
+      return
+    }
+
     // set up the execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.setGlobalJobParameters(parameterTool)
+
+    val serviceUrl = parameterTool.getRequired("service-url")
+    val topic = parameterTool.getRequired("topic")
+
+    println("Parameters:")
+    println("\tServiceUrl:\t" + serviceUrl)
+    println("\tTopic:\t" + topic)
 
     // create PulsarJsonOutputFormat instance
-    val pulsarJsonOutputFormat = new 
PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+    val pulsarJsonOutputFormat = new 
PulsarJsonOutputFormat[NasaMission](serviceUrl, topic)
 
     // create DataSet
     val nasaMissionDS = env.fromCollection(nasaMissions)
diff --git 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
index 5e536cfa3c..4de0dcb321 100644
--- 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
+++ 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.batch.connectors.pulsar.example
 
 import org.apache.flink.api.common.serialization.SerializationSchema
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
 import org.apache.flink.util.Collector
@@ -37,17 +38,32 @@ object FlinkPulsarBatchSinkScalaExample {
 
   private val EINSTEIN_QUOTE = "Imagination is more important than knowledge. 
" +
     "Knowledge is limited. Imagination encircles the world."
-  private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-  private val TOPIC_NAME = "my-flink-topic"
 
   def main(args: Array[String]): Unit = {
 
+    // parse input arguments
+    val parameterTool = ParameterTool.fromArgs(args)
+
+    if (parameterTool.getNumberOfParameters < 2) {
+      println("Missing parameters!")
+      println("Usage: pulsar --service-url <pulsar-service-url> --topic 
<topic>")
+      return
+    }
+
     // set up the execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.setGlobalJobParameters(parameterTool)
+
+    val serviceUrl = parameterTool.getRequired("service-url")
+    val topic = parameterTool.getRequired("topic")
+
+    println("Parameters:")
+    println("\tServiceUrl:\t" + serviceUrl)
+    println("\tTopic:\t" + topic)
 
     // create PulsarOutputFormat instance
     val pulsarOutputFormat =
-      new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new 
SerializationSchema[WordWithCount] {
+      new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new 
SerializationSchema[WordWithCount] {
         override def serialize(wordWithCount: WordWithCount): Array[Byte] = 
wordWithCount.toString.getBytes
       })
 
diff --git 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
deleted file mode 100644
index e206392b46..0000000000
--- 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ /dev/null
@@ -1,345 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-
-The Flink Batch Sink for Pulsar is a custom sink that enables Apache 
[Flink](https://flink.apache.org/) to write 
[DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html)
 to Pulsar.
-This document explains how to develop Scala Applications by using Flink Batch 
Sink.
-# Prerequisites
-
-To use this sink, include a dependency for the `pulsar-flink` library in your 
Java configuration.
-
-# Maven
-
-If you're using Maven, add this to your `pom.xml`:
-
-```xml
-<!-- in your <properties> block -->
-<pulsar.version>{{pulsar:version}}</pulsar.version>
-
-<!-- in your <dependencies> block -->
-<dependency>
-  <groupId>org.apache.pulsar</groupId>
-  <artifactId>pulsar-flink</artifactId>
-  <version>${pulsar.version}</version>
-</dependency>
-```
-
-# Gradle
-
-If you're using Gradle, add this to your `build.gradle` file:
-
-```groovy
-def pulsarVersion = "{{pulsar:version}}"
-
-dependencies {
-    compile group: 'org.apache.pulsar', name: 'pulsar-flink', version: 
pulsarVersion
-}
-```
-
-# PulsarOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarOutputFormat` as follows:
-
-```scala
-      /**
-        * Data type for words with count.
-        */
-      case class WordWithCount(word: String, count: Long) {
-        override def toString: String = "WordWithCount { word = " + word + ", 
count = " + count + " }"
-      }
-
-      /**
-        * Implementation
-        */
-      private val EINSTEIN_QUOTE = "Imagination is more important than 
knowledge. " +
-        "Knowledge is limited. Imagination encircles the world."
-      private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-      private val TOPIC_NAME = "my-flink-topic"
-
-      def main(args: Array[String]): Unit = {
-
-        // set up the execution environment
-        val env = ExecutionEnvironment.getExecutionEnvironment
-
-        // create PulsarOutputFormat instance
-        val pulsarOutputFormat =
-          new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new 
SerializationSchema[WordWithCount] {
-            override def serialize(wordWithCount: WordWithCount): Array[Byte] 
= wordWithCount.toString.getBytes
-          })
-
-        // create DataSet
-        val textDS = env.fromElements[String](EINSTEIN_QUOTE)
-
-        // convert sentence to words
-        textDS.flatMap((value: String, out: Collector[WordWithCount]) => {
-          val words = value.toLowerCase.split(" ")
-          for (word <- words) {
-            out.collect(new WordWithCount(word.replace(".", ""), 1))
-          }
-        })
-
-        // filter words which length is bigger than 4
-        .filter((wordWithCount: WordWithCount) => wordWithCount.word.length > 
4)
-
-        // group the words
-        .groupBy((wordWithCount: WordWithCount) => wordWithCount.word)
-
-        // sum the word counts
-        .reduce((wordWithCount1: WordWithCount, wordWithCount2: WordWithCount) 
=>
-          new WordWithCount(wordWithCount1.word, wordWithCount1.count + 
wordWithCount2.count))
-
-        // write batch data to Pulsar
-        .output(pulsarOutputFormat)
-
-        // set parallelism to write Pulsar in parallel (optional)
-        env.setParallelism(2)
-
-        // execute program
-        env.execute("Flink - Pulsar Batch WordCount")
-      }
-```
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-WordWithCount { word = encircles, count = 1 }
-WordWithCount { word = important, count = 1 }
-WordWithCount { word = imagination, count = 2 }
-WordWithCount { word = limited, count = 1 }
-WordWithCount { word = knowledge, count = 2 }
-WordWithCount { word = world, count = 1 }
-```
-
-### Complete Example
-
-You can find a complete example 
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala).
-In this example, Flink DataSet is processed as word-count and being written to 
Pulsar.
-
-
-# PulsarCsvOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarCsvOutputFormat` as follows:
-
-```scala
-      /**
-        * NasaMission Model
-        */
-      private case class NasaMission(id: Int, missionName: String, startYear: 
Int, endYear: Int)
-        extends Tuple4(id, missionName, startYear, endYear)
-
-      /**
-        * Implementation
-        */
-      private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-      private val TOPIC_NAME = "my-flink-topic"
-
-      private val nasaMissions = List(
-        NasaMission(1, "Mercury program", 1959, 1963),
-        NasaMission(2, "Apollo program", 1961, 1972),
-        NasaMission(3, "Gemini program", 1963, 1966),
-        NasaMission(4, "Skylab", 1973, 1974),
-        NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
-      def main(args: Array[String]): Unit = {
-
-        // set up the execution environment
-        val env = ExecutionEnvironment.getExecutionEnvironment
-
-        // create PulsarCsvOutputFormat instance
-        val pulsarCsvOutputFormat =
-          new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-
-        // create DataSet
-        val textDS = env.fromCollection(nasaMissions)
-
-        // map nasa mission names to upper-case
-        textDS.map(nasaMission => NasaMission(
-          nasaMission.id,
-          nasaMission.missionName.toUpperCase,
-          nasaMission.startYear,
-          nasaMission.endYear))
-
-        // filter missions which started after 1970
-        .filter(_.startYear > 1970)
-
-        // write batch data to Pulsar as Csv
-        .output(pulsarCsvOutputFormat)
-
-        // set parallelism to write Pulsar in parallel (optional)
-        env.setParallelism(2)
-
-        // execute program
-        env.execute("Flink - Pulsar Batch Csv")
-      }
-```
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-4,SKYLAB,1973,1974
-5,APOLLO–SOYUZ TEST PROJECT,1975,1975
-```
-
-### Complete Example
-
-You can find a complete example 
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Csv 
format.
-
-
-# PulsarJsonOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarJsonOutputFormat` as follows:
-
-```scala
-      /**
-        * NasaMission Model
-        */
-      private case class NasaMission(@BeanProperty id: Int,
-                             @BeanProperty missionName: String,
-                             @BeanProperty startYear: Int,
-                             @BeanProperty endYear: Int)
-
-      /**
-        * Implementation
-        */
-      private val nasaMissions = List(
-        NasaMission(1, "Mercury program", 1959, 1963),
-        NasaMission(2, "Apollo program", 1961, 1972),
-        NasaMission(3, "Gemini program", 1963, 1966),
-        NasaMission(4, "Skylab", 1973, 1974),
-        NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
-      private val SERVICE_URL = "pulsar://127.0.0.1:6650"
-      private val TOPIC_NAME = "my-flink-topic"
-
-      def main(args: Array[String]): Unit = {
-
-        // set up the execution environment
-        val env = ExecutionEnvironment.getExecutionEnvironment
-
-        // create PulsarJsonOutputFormat instance
-        val pulsarJsonOutputFormat = new 
PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-
-        // create DataSet
-        val nasaMissionDS = env.fromCollection(nasaMissions)
-
-        // map nasa mission names to upper-case
-        nasaMissionDS.map(nasaMission =>
-          NasaMission(
-            nasaMission.id,
-            nasaMission.missionName.toUpperCase,
-            nasaMission.startYear,
-            nasaMission.endYear))
-
-        // filter missions which started after 1970
-        .filter(_.startYear > 1970)
-
-        // write batch data to Pulsar
-        .output(pulsarJsonOutputFormat)
-
-        // set parallelism to write Pulsar in parallel (optional)
-        env.setParallelism(2)
-
-        // execute program
-        env.execute("Flink - Pulsar Batch Json")
-      }
-```
-
-**Note:** Property definitions of the model should cover `@BeanProperty` to be 
visible.
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
-{"id":5,"missionName":"APOLLO–SOYUZ TEST 
PROJECT","startYear":1975,"endYear":1975}
-```
-
-### Complete Example
-
-You can find a complete example 
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Json 
format.
-
-
-# PulsarAvroOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarAvroOutputFormat` as follows:
-
-```scala
-      val nasaMissions = List(
-          NasaMission.newBuilder.setId(1).setName("Mercury 
program").setStartYear(1959).setEndYear(1963).build,
-          NasaMission.newBuilder.setId(2).setName("Apollo 
program").setStartYear(1961).setEndYear(1972).build,
-          NasaMission.newBuilder.setId(3).setName("Gemini 
program").setStartYear(1963).setEndYear(1966).build,
-          
NasaMission.newBuilder.setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build,
-          NasaMission.newBuilder.setId(5).setName("Apollo–Soyuz Test 
Project").setStartYear(1975).setEndYear(1975).build)
-      
-        def main(args: Array[String]): Unit = {
-      
-          // set up the execution environment
-          val env = ExecutionEnvironment.getExecutionEnvironment
-      
-          // create PulsarCsvOutputFormat instance
-          val pulsarAvroOutputFormat =
-            new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-      
-          // create DataSet
-          val textDS = env.fromCollection(nasaMissions)
-      
-          // map nasa mission names to upper-case
-          textDS.map(nasaMission => new NasaMission(
-            nasaMission.getId,
-            nasaMission.getName,
-            nasaMission.getStartYear,
-            nasaMission.getEndYear))
-      
-            // filter missions which started after 1970
-            .filter(_.getStartYear > 1970)
-      
-            // write batch data to Pulsar as Avro
-            .output(pulsarAvroOutputFormat)
-      
-          // set parallelism to write Pulsar in parallel (optional)
-          env.setParallelism(2)
-      
-          // execute program
-          env.execute("Flink - Pulsar Batch Avro")
-        }
-```
-
-**Note:** NasaMission class are automatically generated by Avro
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
- "4,SKYLAB,1973,1974"
- "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
-```
-
-### Complete Example
-
-You can find a complete example 
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Avro 
format.
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
index d5f4af5b3f..ca34327873 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
@@ -44,18 +44,18 @@
     private transient Function<Throwable, MessageId> failureCallback;
     private static volatile Producer<byte[]> producer;
 
-    protected static String serviceUrl;
-    protected static String topicName;
+    protected final String serviceUrl;
+    protected final String topicName;
     protected SerializationSchema<T> serializationSchema;
 
-    protected BasePulsarOutputFormat(String serviceUrl, String topicName) {
+    protected BasePulsarOutputFormat(final String serviceUrl, final String 
topicName) {
         Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), 
"serviceUrl cannot be blank.");
         Preconditions.checkArgument(StringUtils.isNotBlank(topicName),  
"topicName cannot be blank.");
 
         this.serviceUrl = serviceUrl;
         this.topicName = topicName;
 
-        LOG.info("PulsarOutputFormat is being started to write batches to 
Pulsar topic {}", this.topicName);
+        LOG.info("PulsarOutputFormat is being started to write batches to 
Pulsar topic: {}", this.topicName);
     }
 
     @Override
@@ -65,10 +65,10 @@ public void configure(Configuration configuration) {
 
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
-        this.producer = getProducerInstance();
+        this.producer = getProducerInstance(serviceUrl, topicName);
 
         this.failureCallback = cause -> {
-            LOG.error("Error while sending record to Pulsar : " + 
cause.getMessage(), cause);
+            LOG.error("Error while sending record to Pulsar: " + 
cause.getMessage(), cause);
             return null;
         };
     }
@@ -85,11 +85,11 @@ public void close() throws IOException {
 
     }
 
-    private static Producer<byte[]> getProducerInstance() throws 
PulsarClientException {
+    private static Producer<byte[]> getProducerInstance(String serviceUrl, 
String topicName) throws PulsarClientException {
         if(producer == null){
             synchronized (PulsarOutputFormat.class) {
                 if(producer == null){
-                    producer = 
Preconditions.checkNotNull(createPulsarProducer(),
+                    producer = 
Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName),
                             "Pulsar producer cannot be null.");
                 }
             }
@@ -97,7 +97,7 @@ public void close() throws IOException {
         return producer;
     }
 
-    private static Producer<byte[]> createPulsarProducer() throws 
PulsarClientException {
+    private static Producer<byte[]> createPulsarProducer(String serviceUrl, 
String topicName) throws PulsarClientException {
         try {
             PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
             return client.newProducer().topic(topicName).create();
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index e532bfd180..889970f5fb 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -28,9 +28,9 @@
 
     private static final long serialVersionUID = 2997027580167793000L;
 
-    public PulsarOutputFormat(String serviceUrl, String topicName, 
SerializationSchema<T> serializationSchema) {
+    public PulsarOutputFormat(String serviceUrl, String topicName, final 
SerializationSchema<T> serializationSchema) {
         super(serviceUrl, topicName);
-        Preconditions.checkNotNull(serializationSchema,  "serializationSchema 
cannot be null.");
+        Preconditions.checkNotNull(serializationSchema, "serializationSchema 
cannot be null.");
         this.serializationSchema = serializationSchema;
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to