sijie closed pull request #3262: [Pulsar-Flink] Refactor Flink Batch Sink
Examples
URL: https://github.com/apache/pulsar/pull/3262
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/examples/flink-consumer-source/pom.xml
b/examples/flink-consumer-source/pom.xml
index 35f5924966..088444b7eb 100644
--- a/examples/flink-consumer-source/pom.xml
+++ b/examples/flink-consumer-source/pom.xml
@@ -54,12 +54,6 @@
<version>${log4j2.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
@@ -77,6 +71,7 @@
<artifactId>pulsar-flink</artifactId>
<version>${project.version}</version>
</dependency>
+
</dependencies>
<build>
@@ -107,7 +102,7 @@
<mainClass>org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount</mainClass>
</transformer>
</transformers>
- <finalName>pulsar-flink-streaming-wordcount</finalName>
+ <finalName>pulsar-flink-examples</finalName>
<filters>
<filter>
<artifact>*</artifact>
@@ -115,6 +110,15 @@
<include>org/apache/flink/streaming/examples/kafka/**</include>
<include>org/apache/flink/streaming/**</include>
<include>org/apache/pulsar/**</include>
+ <include>org/apache/flink/batch/**</include>
+ <include>net/jpountz/**</include>
+ <include>com/scurrilous/circe/**</include>
+ <include>org/apache/commons/csv/**</include>
+ <include>org/apache/flink/avro/generated/**</include>
+ <include>org/apache/avro/**</include>
+ <include>org/codehaus/jackson/**</include>
+ <include>avro/shaded/com/google/common/**</include>
+ <include>org/apache/flink/formats/avro/**</include>
</includes>
</filter>
</filters>
@@ -122,6 +126,20 @@
</execution>
</executions>
</plugin>
+ <!-- Scala Plugin to compile Scala Files -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.4.4</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>add-source</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
<!-- Generate Test class from avro schema -->
<plugin>
<groupId>org.apache.avro</groupId>
diff --git
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index ef0048cbac..584d59fec1 100644
---
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -18,10 +18,10 @@
*/
package org.apache.flink.batch.connectors.pulsar.example;
-
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.avro.generated.NasaMission;
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
@@ -40,16 +40,30 @@
NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test
Project").setStartYear(1975).setEndYear(1975).build());
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!");
+ System.out.println("Usage: pulsar --service-url
<pulsar-service-url> --topic <topic>");
+ return;
+ }
+
// set up the execution environment
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setGlobalJobParameters(parameterTool);
+
+ String serviceUrl = parameterTool.getRequired("service-url");
+ String topic = parameterTool.getRequired("topic");
+
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + serviceUrl);
+ System.out.println("\tTopic:\t" + topic);
// create PulsarAvroOutputFormat instance
- final OutputFormat<NasaMission> pulsarAvroOutputFormat = new
PulsarAvroOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+ final OutputFormat<NasaMission> pulsarAvroOutputFormat = new
PulsarAvroOutputFormat<>(serviceUrl, topic);
// create DataSet
DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
index 6b0f0ca6cd..3e658dc66a 100644
---
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
+++
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
@@ -23,6 +23,7 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat;
import java.util.Arrays;
@@ -40,17 +41,31 @@
new Tuple4(4, "Skylab", 1973, 1974),
new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!");
+ System.out.println("Usage: pulsar --service-url
<pulsar-service-url> --topic <topic>");
+ return;
+ }
+
// set up the execution environment
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setGlobalJobParameters(parameterTool);
+
+ String serviceUrl = parameterTool.getRequired("service-url");
+ String topic = parameterTool.getRequired("topic");
+
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + serviceUrl);
+ System.out.println("\tTopic:\t" + topic);
// create PulsarCsvOutputFormat instance
final OutputFormat<Tuple4<Integer, String, Integer, Integer>>
pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+ new PulsarCsvOutputFormat<>(serviceUrl, topic);
// create DataSet
DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS =
env.fromCollection(nasaMissions);
diff --git
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
index e037616c9f..3937ae9908 100644
---
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
+++
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
import java.util.Arrays;
@@ -38,16 +39,30 @@
new NasaMission(4, "Skylab", 1973, 1974),
new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!");
+ System.out.println("Usage: pulsar --service-url
<pulsar-service-url> --topic <topic>");
+ return;
+ }
+
// set up the execution environment
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setGlobalJobParameters(parameterTool);
+
+ String serviceUrl = parameterTool.getRequired("service-url");
+ String topic = parameterTool.getRequired("topic");
+
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + serviceUrl);
+ System.out.println("\tTopic:\t" + topic);
// create PulsarJsonOutputFormat instance
- final OutputFormat<NasaMission> pulsarJsonOutputFormat = new
PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+ final OutputFormat<NasaMission> pulsarJsonOutputFormat = new
PulsarJsonOutputFormat<>(serviceUrl, topic);
// create DataSet
DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
index 6724c62a9d..c90d016eaa 100644
---
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
+++
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
@@ -24,6 +24,7 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
import org.apache.flink.util.Collector;
@@ -35,17 +36,31 @@
private static final String EINSTEIN_QUOTE = "Imagination is more
important than knowledge. " +
"Knowledge is limited. Imagination encircles the world.";
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 2) {
+ System.out.println("Missing parameters!");
+ System.out.println("Usage: pulsar --service-url
<pulsar-service-url> --topic <topic>");
+ return;
+ }
+
// set up the execution environment
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setGlobalJobParameters(parameterTool);
+
+ String serviceUrl = parameterTool.getRequired("service-url");
+ String topic = parameterTool.getRequired("topic");
+
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + serviceUrl);
+ System.out.println("\tTopic:\t" + topic);
// create PulsarOutputFormat instance
final OutputFormat pulsarOutputFormat =
- new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount
-> wordWithCount.toString().getBytes());
+ new PulsarOutputFormat(serviceUrl, topic, wordWithCount ->
wordWithCount.toString().getBytes());
// create DataSet
DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
diff --git
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 3116b3bc10..b93f5a3f69 100644
---
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -53,71 +53,56 @@ dependencies {
}
```
-# PulsarOutputFormat
-### Usage
+# Example
-Please find a sample usage as follows:
+### PulsarOutputFormat
-```java
- private static final String EINSTEIN_QUOTE = "Imagination is more
important than knowledge. " +
- "Knowledge is limited. Imagination encircles the world.";
+In this example, Flink DataSet is processed as word-count and being written to
Pulsar. Please find a complete example for PulsarOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala)
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
+The steps to run the example:
- public static void main(String[] args) throws Exception {
+1. Start Pulsar Standalone.
- // set up the execution environment
- final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ You can follow the
[instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar
standalone locally.
- // create PulsarOutputFormat instance
- final OutputFormat<String> pulsarOutputFormat =
- new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME,
wordWithCount -> wordWithCount.toString().getBytes());
+ ```shell
+ $ bin/pulsar standalone
+ ```
- // create DataSet
- DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
+2. Start Flink locally.
- textDS.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String value, Collector<String> out)
throws Exception {
- String[] words = value.toLowerCase().split(" ");
- for(String word: words) {
- out.collect(word.replace(".", ""));
- }
- }
- })
- // filter words which length is bigger than 4
- .filter(word -> word.length() > 4)
+ You can follow the
[instructions](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html)
to download and start Flink.
- // write batch data to Pulsar
- .output(pulsarOutputFormat);
+ ```shell
+ $ ./bin/start-cluster.sh
+ ```
- // execute program
- env.execute("Flink - Pulsar Batch WordCount");
- }
-```
+3. Build the examples.
-### Sample Output
+ ```shell
+ $ cd ${PULSAR_HOME}
+ $ mvn clean install -DskipTests
+ ```
-Please find sample output for above application as follows:
-```
-imagination
-important
-knowledge
-knowledge
-limited
-imagination
-encircles
-world
-```
+4. Run the word count example to print results to stdout.
+
+ ```shell
+ # java
+ $ ./bin/flink run -c
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkExample
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar
--service-url pulsar://localhost:6650 --topic test_flink_topic
-### Complete Example
+ # scala
+ $ ./bin/flink run -c
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchSinkScalaExample
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar
--service-url pulsar://localhost:6650 --topic test_flink_topic
+ ```
-You can find a complete example
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
-In this example, Flink DataSet is processed as word-count and being written to
Pulsar.
+5. Once the flink word count example is running, you can use
`bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
-### Complete Example Output
-Please find sample output for above linked application as follows:
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
+
+6. Please find sample output for above linked application as follows:
```
WordWithCount { word = important, count = 1 }
WordWithCount { word = encircles, count = 1 }
@@ -127,230 +112,105 @@ WordWithCount { word = limited, count = 1 }
WordWithCount { word = world, count = 1 }
```
-# PulsarCsvOutputFormat
-### Usage
-
-Please find a sample usage as follows:
-
-```java
- private static final List<Tuple4<Integer, String, Integer, Integer>>
nasaMissions = Arrays.asList(
- new Tuple4(1, "Mercury program", 1959, 1963),
- new Tuple4(2, "Apollo program", 1961, 1972),
- new Tuple4(3, "Gemini program", 1963, 1966),
- new Tuple4(4, "Skylab", 1973, 1974),
- new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
-
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
- public static void main(String[] args) throws Exception {
-
- // set up the execution environment
- final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
-
- // create PulsarCsvOutputFormat instance
- final OutputFormat<Tuple4<Integer, String, Integer, Integer>>
pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
-
- // create DataSet
- DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS =
env.fromCollection(nasaMissions);
- // map nasa mission names to upper-case
- nasaMissionDS.map(
- new MapFunction<Tuple4<Integer, String, Integer, Integer>,
Tuple4<Integer, String, Integer, Integer>>() {
- @Override
- public Tuple4<Integer, String, Integer,
Integer> map(
- Tuple4<Integer, String, Integer,
Integer> nasaMission) throws Exception {
- return new Tuple4(
- nasaMission.f0,
- nasaMission.f1.toUpperCase(),
- nasaMission.f2,
- nasaMission.f3);
- }
- }
- )
- // filter missions which started after 1970
- .filter(nasaMission -> nasaMission.f2 > 1970)
- // write batch data to Pulsar
- .output(pulsarCsvOutputFormat);
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2);
-
- // execute program
- env.execute("Flink - Pulsar Batch Csv");
-
- }
-```
-
-### Sample Output
-Please find sample output for above application as follows:
-```
-4,SKYLAB,1973,1974
-5,APOLLO–SOYUZ TEST PROJECT,1975,1975
-```
+### PulsarCsvOutputFormat
-### Complete Example
+In this example, Flink DataSet is processed and written to Pulsar in Csv
format. Please find a complete example for PulsarCsvOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala)
-You can find a complete example
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Csv
format.
+The steps to run the example:
+Step 1, 2 and 3 are same as above.
-# PulsarJsonOutputFormat
-### Usage
+4. Run the word count example to print results to stdout.
-Please find a sample usage as follows:
+ ```shell
+ # java
+ $ ./bin/flink run -c
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkExample
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar
--service-url pulsar://localhost:6650 --topic test_flink_topic
-```java
- private static final List<NasaMission> nasaMissions = Arrays.asList(
- new NasaMission(1, "Mercury program", 1959, 1963),
- new NasaMission(2, "Apollo program", 1961, 1972),
- new NasaMission(3, "Gemini program", 1963, 1966),
- new NasaMission(4, "Skylab", 1973, 1974),
- new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
+ # scala
+ $ ./bin/flink run -c
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchCsvSinkScalaExample
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar
--service-url pulsar://localhost:6650 --topic test_flink_topic
+ ```
- private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
+5. Once the flink word count example is running, you can use
`bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
- public static void main(String[] args) throws Exception {
-
- // set up the execution environment
- final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
- // create PulsarJsonOutputFormat instance
- final OutputFormat<NasaMission> pulsarJsonOutputFormat = new
PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+6. Please find sample output for above linked application as follows:
+```
+4,SKYLAB,1973,1974
+5,APOLLO–SOYUZ TEST PROJECT,1975,1975
+```
- // create DataSet
- DataSet<NasaMission> nasaMissionDS =
env.fromCollection(nasaMissions);
- // map nasa mission names to upper-case
- nasaMissionDS.map(nasaMission -> new NasaMission(
- nasaMission.id,
- nasaMission.missionName.toUpperCase(),
- nasaMission.startYear,
- nasaMission.endYear))
- // filter missions which started after 1970
- .filter(nasaMission -> nasaMission.startYear > 1970)
- // write batch data to Pulsar
- .output(pulsarJsonOutputFormat);
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2);
+### PulsarJsonOutputFormat
- // execute program
- env.execute("Flink - Pulsar Batch Json");
- }
+In this example, Flink DataSet is processed and written to Pulsar in Json
format. Please find a complete example for PulsarJsonOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala)
- /**
- * NasaMission data model
- *
- * Note: Property definitions of the model should be public or have
getter functions to be visible
- */
- private static class NasaMission {
+**Note:** Property definitions of the model should be public or have getter
functions to be visible.
- private int id;
- private String missionName;
- private int startYear;
- private int endYear;
+The steps to run the example:
- public NasaMission(int id, String missionName, int startYear, int
endYear) {
- this.id = id;
- this.missionName = missionName;
- this.startYear = startYear;
- this.endYear = endYear;
- }
+Step 1, 2 and 3 are same as above.
- public int getId() {
- return id;
- }
+4. Run the word count example to print results to stdout.
- public String getMissionName() {
- return missionName;
- }
+ ```shell
+ # java
+ $ ./bin/flink run -c
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkExample
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar
--service-url pulsar://localhost:6650 --topic test_flink_topic
- public int getStartYear() {
- return startYear;
- }
+ # scala
+ $ ./bin/flink run -c
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchJsonSinkScalaExample
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar
--service-url pulsar://localhost:6650 --topic test_flink_topic
+ ```
- public int getEndYear() {
- return endYear;
- }
- }
+5. Once the flink word count example is running, you can use
`bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
```
-**Note:** Property definitions of the model should be public or have getter
functions to be visible
-
-### Sample Output
-
-Please find sample output for above application as follows:
+6. Please find sample output for above linked application as follows:
```
{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
{"id":5,"missionName":"APOLLO–SOYUZ TEST
PROJECT","startYear":1975,"endYear":1975}
```
-### Complete Example
-
-You can find a complete example
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Json
format.
-
-
-# PulsarAvroOutputFormat
-### Usage
-
-Please find a sample usage as follows:
-
-```java
- private static final List<NasaMission> nasaMissions = Arrays.asList(
- NasaMission.newBuilder().setId(1).setName("Mercury
program").setStartYear(1959).setEndYear(1963).build(),
- NasaMission.newBuilder().setId(2).setName("Apollo
program").setStartYear(1961).setEndYear(1972).build(),
- NasaMission.newBuilder().setId(3).setName("Gemini
program").setStartYear(1963).setEndYear(1966).build(),
-
NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
- NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz
Test Project").setStartYear(1975).setEndYear(1975).build());
-
- private static final String SERVICE_URL =
"pulsar://127.0.0.1:6650";
- private static final String TOPIC_NAME = "my-flink-topic";
-
- public static void main(String[] args) throws Exception {
-
- // set up the execution environment
- final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
-
- // create PulsarAvroOutputFormat instance
- final OutputFormat<NasaMission> pulsarAvroOutputFormat = new
PulsarAvroOutputFormat<>(SERVICE_URL, TOPIC_NAME);
-
- // create DataSet
- DataSet<NasaMission> nasaMissionDS =
env.fromCollection(nasaMissions);
- // map nasa mission names to upper-case
- nasaMissionDS.map(nasaMission -> new NasaMission(
- nasaMission.getId(),
- nasaMission.getName(),
- nasaMission.getStartYear(),
- nasaMission.getEndYear()))
- // filter missions which started after 1970
- .filter(nasaMission -> nasaMission.getStartYear() >
1970)
- // write batch data to Pulsar
- .output(pulsarAvroOutputFormat);
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2);
-
- // execute program
- env.execute("Flink - Pulsar Batch Avro");
- }
-```
+### PulsarAvroOutputFormat
+
+In this example, Flink DataSet is processed and written to Pulsar in Json
format. Please find a complete example for PulsarAvroOutputFormat as follows:
+[java](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java)
+[scala](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala)
+
+**Note:** NasaMission class are automatically generated by Avro.
+
+The steps to run the example:
-**Note:** NasaMission class are automatically generated by Avro
+Step 1, 2 and 3 are same as above.
-### Sample Output
+4. Run the word count example to print results to stdout.
-Please find sample output for above application as follows:
+ ```shell
+ # java
+ $ ./bin/flink run -c
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkExample
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar
--service-url pulsar://localhost:6650 --topic test_flink_topic
+
+ # scala
+ $ ./bin/flink run -c
org.apache.flink.batch.connectors.pulsar.example.FlinkPulsarBatchAvroSinkScalaExample
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar
--service-url pulsar://localhost:6650 --topic test_flink_topic
+ ```
+
+5. Once the flink word count example is running, you can use
`bin/pulsar-client` to tail the results produced into topic `test_flink_topic`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_flink_topic
+```
+
+6. Please find sample output for above linked application as follows:
```
"4,SKYLAB,1973,1974"
"5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
```
-
-### Complete Example
-
-You can find a complete example
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java).
-In this example, Flink DataSet is processed and written to Pulsar in Avro
format.
diff --git
a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
index 16c88f1c43..463e805853 100644
---
a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
+++
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
@@ -30,7 +30,7 @@ See the [Pulsar
Concepts](https://pulsar.apache.org/docs/en/concepts-overview/)
### PulsarConsumerSourceWordCount
-This Flink streaming job is consuming from a Pulsar topic and couting the
wordcount in a streaming fashion. The job can write the word count results
+This Flink streaming job is consuming from a Pulsar topic and counting the
wordcount in a streaming fashion. The job can write the word count results
to stdout or another Pulsar topic.
The steps to run the example:
@@ -61,7 +61,7 @@ The steps to run the example:
4. Run the word count example to print results to stdout.
```shell
- $ ./bin/flink run
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar
--service-url pulsar://localhost:6650 --input-topic test_src --subscription
test_sub
+ $ ./bin/flink run
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar
--service-url pulsar://localhost:6650 --input-topic test_src --subscription
test_sub
```
5. Produce messages to topic `test_src`.
@@ -85,7 +85,7 @@ The steps to run the example:
Alternatively, when you run the flink word count example at step 4, you can
choose dump the result to another pulsar topic.
```shell
-$ ./bin/flink run
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar
--service-url pulsar://localhost:6650 --input-topic test_src --subscription
test_sub --output-topic test_dest
+$ ./bin/flink run
${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-examples.jar
--service-url pulsar://localhost:6650 --input-topic test_src --subscription
test_sub --output-topic test_dest
```
Once the flink word count example is running, you can use `bin/pulsar-client`
to tail the results produced into topic `test_dest`.
diff --git
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
index 0d255f2a1a..f10d6c1274 100644
---
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
+++
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
@@ -18,6 +18,7 @@
*/
package org.apache.flink.batch.connectors.pulsar.example
+import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.avro.generated.NasaMission
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
@@ -27,10 +28,7 @@ import
org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
*/
object FlinkPulsarBatchAvroSinkScalaExample {
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
- val nasaMissions = List(
+ private val nasaMissions = List(
NasaMission.newBuilder.setId(1).setName("Mercury
program").setStartYear(1959).setEndYear(1963).build,
NasaMission.newBuilder.setId(2).setName("Apollo
program").setStartYear(1961).setEndYear(1972).build,
NasaMission.newBuilder.setId(3).setName("Gemini
program").setStartYear(1963).setEndYear(1966).build,
@@ -39,12 +37,29 @@ object FlinkPulsarBatchAvroSinkScalaExample {
def main(args: Array[String]): Unit = {
+ // parse input arguments
+ val parameterTool = ParameterTool.fromArgs(args)
+
+ if (parameterTool.getNumberOfParameters < 2) {
+ println("Missing parameters!")
+ println("Usage: pulsar --service-url <pulsar-service-url> --topic
<topic>")
+ return
+ }
+
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.setGlobalJobParameters(parameterTool)
+
+ val serviceUrl = parameterTool.getRequired("service-url")
+ val topic = parameterTool.getRequired("topic")
+
+ println("Parameters:")
+ println("\tServiceUrl:\t" + serviceUrl)
+ println("\tTopic:\t" + topic)
// create PulsarCsvOutputFormat instance
val pulsarAvroOutputFormat =
- new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+ new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic)
// create DataSet
val textDS = env.fromCollection(nasaMissions)
diff --git
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
index 7db844b3c2..3233616857 100644
---
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
+++
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
@@ -19,6 +19,7 @@
package org.apache.flink.batch.connectors.pulsar.example
import org.apache.flink.api.java.tuple.Tuple4
+import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat
@@ -33,9 +34,6 @@ object FlinkPulsarBatchCsvSinkScalaExample {
private case class NasaMission(id: Int, missionName: String, startYear: Int,
endYear: Int)
extends Tuple4(id, missionName, startYear, endYear)
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
private val nasaMissions = List(
NasaMission(1, "Mercury program", 1959, 1963),
NasaMission(2, "Apollo program", 1961, 1972),
@@ -45,12 +43,29 @@ object FlinkPulsarBatchCsvSinkScalaExample {
def main(args: Array[String]): Unit = {
+ // parse input arguments
+ val parameterTool = ParameterTool.fromArgs(args)
+
+ if (parameterTool.getNumberOfParameters < 2) {
+ println("Missing parameters!")
+ println("Usage: pulsar --service-url <pulsar-service-url> --topic
<topic>")
+ return
+ }
+
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.setGlobalJobParameters(parameterTool)
+
+ val serviceUrl = parameterTool.getRequired("service-url")
+ val topic = parameterTool.getRequired("topic")
+
+ println("Parameters:")
+ println("\tServiceUrl:\t" + serviceUrl)
+ println("\tTopic:\t" + topic)
// create PulsarCsvOutputFormat instance
val pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+ new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic)
// create DataSet
val textDS = env.fromCollection(nasaMissions)
diff --git
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
index 1f7fc19b0f..60d02e5629 100644
---
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
+++
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
@@ -18,9 +18,9 @@
*/
package org.apache.flink.batch.connectors.pulsar.example
+import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat
-
import scala.beans.BeanProperty
/**
@@ -43,16 +43,30 @@ object FlinkPulsarBatchJsonSinkScalaExample {
NasaMission(4, "Skylab", 1973, 1974),
NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
def main(args: Array[String]): Unit = {
+ // parse input arguments
+ val parameterTool = ParameterTool.fromArgs(args)
+
+ if (parameterTool.getNumberOfParameters < 2) {
+ println("Missing parameters!")
+ println("Usage: pulsar --service-url <pulsar-service-url> --topic
<topic>")
+ return
+ }
+
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.setGlobalJobParameters(parameterTool)
+
+ val serviceUrl = parameterTool.getRequired("service-url")
+ val topic = parameterTool.getRequired("topic")
+
+ println("Parameters:")
+ println("\tServiceUrl:\t" + serviceUrl)
+ println("\tTopic:\t" + topic)
// create PulsarJsonOutputFormat instance
- val pulsarJsonOutputFormat = new
PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+ val pulsarJsonOutputFormat = new
PulsarJsonOutputFormat[NasaMission](serviceUrl, topic)
// create DataSet
val nasaMissionDS = env.fromCollection(nasaMissions)
diff --git
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
index 5e536cfa3c..4de0dcb321 100644
---
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
+++
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
@@ -19,6 +19,7 @@
package org.apache.flink.batch.connectors.pulsar.example
import org.apache.flink.api.common.serialization.SerializationSchema
+import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
import org.apache.flink.util.Collector
@@ -37,17 +38,32 @@ object FlinkPulsarBatchSinkScalaExample {
private val EINSTEIN_QUOTE = "Imagination is more important than knowledge.
" +
"Knowledge is limited. Imagination encircles the world."
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
def main(args: Array[String]): Unit = {
+ // parse input arguments
+ val parameterTool = ParameterTool.fromArgs(args)
+
+ if (parameterTool.getNumberOfParameters < 2) {
+ println("Missing parameters!")
+ println("Usage: pulsar --service-url <pulsar-service-url> --topic
<topic>")
+ return
+ }
+
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.setGlobalJobParameters(parameterTool)
+
+ val serviceUrl = parameterTool.getRequired("service-url")
+ val topic = parameterTool.getRequired("topic")
+
+ println("Parameters:")
+ println("\tServiceUrl:\t" + serviceUrl)
+ println("\tTopic:\t" + topic)
// create PulsarOutputFormat instance
val pulsarOutputFormat =
- new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new
SerializationSchema[WordWithCount] {
+ new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new
SerializationSchema[WordWithCount] {
override def serialize(wordWithCount: WordWithCount): Array[Byte] =
wordWithCount.toString.getBytes
})
diff --git
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
deleted file mode 100644
index e206392b46..0000000000
---
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ /dev/null
@@ -1,345 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-The Flink Batch Sink for Pulsar is a custom sink that enables Apache
[Flink](https://flink.apache.org/) to write
[DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html)
to Pulsar.
-This document explains how to develop Scala Applications by using Flink Batch
Sink.
-# Prerequisites
-
-To use this sink, include a dependency for the `pulsar-flink` library in your
Java configuration.
-
-# Maven
-
-If you're using Maven, add this to your `pom.xml`:
-
-```xml
-<!-- in your <properties> block -->
-<pulsar.version>{{pulsar:version}}</pulsar.version>
-
-<!-- in your <dependencies> block -->
-<dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-flink</artifactId>
- <version>${pulsar.version}</version>
-</dependency>
-```
-
-# Gradle
-
-If you're using Gradle, add this to your `build.gradle` file:
-
-```groovy
-def pulsarVersion = "{{pulsar:version}}"
-
-dependencies {
- compile group: 'org.apache.pulsar', name: 'pulsar-flink', version:
pulsarVersion
-}
-```
-
-# PulsarOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarOutputFormat` as follows:
-
-```scala
- /**
- * Data type for words with count.
- */
- case class WordWithCount(word: String, count: Long) {
- override def toString: String = "WordWithCount { word = " + word + ",
count = " + count + " }"
- }
-
- /**
- * Implementation
- */
- private val EINSTEIN_QUOTE = "Imagination is more important than
knowledge. " +
- "Knowledge is limited. Imagination encircles the world."
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
- def main(args: Array[String]): Unit = {
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- // create PulsarOutputFormat instance
- val pulsarOutputFormat =
- new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new
SerializationSchema[WordWithCount] {
- override def serialize(wordWithCount: WordWithCount): Array[Byte]
= wordWithCount.toString.getBytes
- })
-
- // create DataSet
- val textDS = env.fromElements[String](EINSTEIN_QUOTE)
-
- // convert sentence to words
- textDS.flatMap((value: String, out: Collector[WordWithCount]) => {
- val words = value.toLowerCase.split(" ")
- for (word <- words) {
- out.collect(new WordWithCount(word.replace(".", ""), 1))
- }
- })
-
- // filter words which length is bigger than 4
- .filter((wordWithCount: WordWithCount) => wordWithCount.word.length >
4)
-
- // group the words
- .groupBy((wordWithCount: WordWithCount) => wordWithCount.word)
-
- // sum the word counts
- .reduce((wordWithCount1: WordWithCount, wordWithCount2: WordWithCount)
=>
- new WordWithCount(wordWithCount1.word, wordWithCount1.count +
wordWithCount2.count))
-
- // write batch data to Pulsar
- .output(pulsarOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch WordCount")
- }
-```
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-WordWithCount { word = encircles, count = 1 }
-WordWithCount { word = important, count = 1 }
-WordWithCount { word = imagination, count = 2 }
-WordWithCount { word = limited, count = 1 }
-WordWithCount { word = knowledge, count = 2 }
-WordWithCount { word = world, count = 1 }
-```
-
-### Complete Example
-
-You can find a complete example
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala).
-In this example, Flink DataSet is processed as word-count and being written to
Pulsar.
-
-
-# PulsarCsvOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarCsvOutputFormat` as follows:
-
-```scala
- /**
- * NasaMission Model
- */
- private case class NasaMission(id: Int, missionName: String, startYear:
Int, endYear: Int)
- extends Tuple4(id, missionName, startYear, endYear)
-
- /**
- * Implementation
- */
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
- private val nasaMissions = List(
- NasaMission(1, "Mercury program", 1959, 1963),
- NasaMission(2, "Apollo program", 1961, 1972),
- NasaMission(3, "Gemini program", 1963, 1966),
- NasaMission(4, "Skylab", 1973, 1974),
- NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
- def main(args: Array[String]): Unit = {
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- // create PulsarCsvOutputFormat instance
- val pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-
- // create DataSet
- val textDS = env.fromCollection(nasaMissions)
-
- // map nasa mission names to upper-case
- textDS.map(nasaMission => NasaMission(
- nasaMission.id,
- nasaMission.missionName.toUpperCase,
- nasaMission.startYear,
- nasaMission.endYear))
-
- // filter missions which started after 1970
- .filter(_.startYear > 1970)
-
- // write batch data to Pulsar as Csv
- .output(pulsarCsvOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch Csv")
- }
-```
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-4,SKYLAB,1973,1974
-5,APOLLO–SOYUZ TEST PROJECT,1975,1975
-```
-
-### Complete Example
-
-You can find a complete example
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Csv
format.
-
-
-# PulsarJsonOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarJsonOutputFormat` as follows:
-
-```scala
- /**
- * NasaMission Model
- */
- private case class NasaMission(@BeanProperty id: Int,
- @BeanProperty missionName: String,
- @BeanProperty startYear: Int,
- @BeanProperty endYear: Int)
-
- /**
- * Implementation
- */
- private val nasaMissions = List(
- NasaMission(1, "Mercury program", 1959, 1963),
- NasaMission(2, "Apollo program", 1961, 1972),
- NasaMission(3, "Gemini program", 1963, 1966),
- NasaMission(4, "Skylab", 1973, 1974),
- NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
-
- private val SERVICE_URL = "pulsar://127.0.0.1:6650"
- private val TOPIC_NAME = "my-flink-topic"
-
- def main(args: Array[String]): Unit = {
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- // create PulsarJsonOutputFormat instance
- val pulsarJsonOutputFormat = new
PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-
- // create DataSet
- val nasaMissionDS = env.fromCollection(nasaMissions)
-
- // map nasa mission names to upper-case
- nasaMissionDS.map(nasaMission =>
- NasaMission(
- nasaMission.id,
- nasaMission.missionName.toUpperCase,
- nasaMission.startYear,
- nasaMission.endYear))
-
- // filter missions which started after 1970
- .filter(_.startYear > 1970)
-
- // write batch data to Pulsar
- .output(pulsarJsonOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch Json")
- }
-```
-
-**Note:** Property definitions of the model should cover `@BeanProperty` to be
visible.
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
-{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
-{"id":5,"missionName":"APOLLO–SOYUZ TEST
PROJECT","startYear":1975,"endYear":1975}
-```
-
-### Complete Example
-
-You can find a complete example
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Json
format.
-
-
-# PulsarAvroOutputFormat
-### Usage
-
-Please find Scala sample usage of `PulsarAvroOutputFormat` as follows:
-
-```scala
- val nasaMissions = List(
- NasaMission.newBuilder.setId(1).setName("Mercury
program").setStartYear(1959).setEndYear(1963).build,
- NasaMission.newBuilder.setId(2).setName("Apollo
program").setStartYear(1961).setEndYear(1972).build,
- NasaMission.newBuilder.setId(3).setName("Gemini
program").setStartYear(1963).setEndYear(1966).build,
-
NasaMission.newBuilder.setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build,
- NasaMission.newBuilder.setId(5).setName("Apollo–Soyuz Test
Project").setStartYear(1975).setEndYear(1975).build)
-
- def main(args: Array[String]): Unit = {
-
- // set up the execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- // create PulsarCsvOutputFormat instance
- val pulsarAvroOutputFormat =
- new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
-
- // create DataSet
- val textDS = env.fromCollection(nasaMissions)
-
- // map nasa mission names to upper-case
- textDS.map(nasaMission => new NasaMission(
- nasaMission.getId,
- nasaMission.getName,
- nasaMission.getStartYear,
- nasaMission.getEndYear))
-
- // filter missions which started after 1970
- .filter(_.getStartYear > 1970)
-
- // write batch data to Pulsar as Avro
- .output(pulsarAvroOutputFormat)
-
- // set parallelism to write Pulsar in parallel (optional)
- env.setParallelism(2)
-
- // execute program
- env.execute("Flink - Pulsar Batch Avro")
- }
-```
-
-**Note:** NasaMission class are automatically generated by Avro
-
-### Sample Output
-
-Please find sample output for above application as follows:
-```
- "4,SKYLAB,1973,1974"
- "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
-```
-
-### Complete Example
-
-You can find a complete example
[here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala).
-In this example, Flink DataSet is processed and written to Pulsar in Avro
format.
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
index d5f4af5b3f..ca34327873 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
@@ -44,18 +44,18 @@
private transient Function<Throwable, MessageId> failureCallback;
private static volatile Producer<byte[]> producer;
- protected static String serviceUrl;
- protected static String topicName;
+ protected final String serviceUrl;
+ protected final String topicName;
protected SerializationSchema<T> serializationSchema;
- protected BasePulsarOutputFormat(String serviceUrl, String topicName) {
+ protected BasePulsarOutputFormat(final String serviceUrl, final String
topicName) {
Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl),
"serviceUrl cannot be blank.");
Preconditions.checkArgument(StringUtils.isNotBlank(topicName),
"topicName cannot be blank.");
this.serviceUrl = serviceUrl;
this.topicName = topicName;
- LOG.info("PulsarOutputFormat is being started to write batches to
Pulsar topic {}", this.topicName);
+ LOG.info("PulsarOutputFormat is being started to write batches to
Pulsar topic: {}", this.topicName);
}
@Override
@@ -65,10 +65,10 @@ public void configure(Configuration configuration) {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
- this.producer = getProducerInstance();
+ this.producer = getProducerInstance(serviceUrl, topicName);
this.failureCallback = cause -> {
- LOG.error("Error while sending record to Pulsar : " +
cause.getMessage(), cause);
+ LOG.error("Error while sending record to Pulsar: " +
cause.getMessage(), cause);
return null;
};
}
@@ -85,11 +85,11 @@ public void close() throws IOException {
}
- private static Producer<byte[]> getProducerInstance() throws
PulsarClientException {
+ private static Producer<byte[]> getProducerInstance(String serviceUrl,
String topicName) throws PulsarClientException {
if(producer == null){
synchronized (PulsarOutputFormat.class) {
if(producer == null){
- producer =
Preconditions.checkNotNull(createPulsarProducer(),
+ producer =
Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName),
"Pulsar producer cannot be null.");
}
}
@@ -97,7 +97,7 @@ public void close() throws IOException {
return producer;
}
- private static Producer<byte[]> createPulsarProducer() throws
PulsarClientException {
+ private static Producer<byte[]> createPulsarProducer(String serviceUrl,
String topicName) throws PulsarClientException {
try {
PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
return client.newProducer().topic(topicName).create();
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index e532bfd180..889970f5fb 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -28,9 +28,9 @@
private static final long serialVersionUID = 2997027580167793000L;
- public PulsarOutputFormat(String serviceUrl, String topicName,
SerializationSchema<T> serializationSchema) {
+ public PulsarOutputFormat(String serviceUrl, String topicName, final
SerializationSchema<T> serializationSchema) {
super(serviceUrl, topicName);
- Preconditions.checkNotNull(serializationSchema, "serializationSchema
cannot be null.");
+ Preconditions.checkNotNull(serializationSchema, "serializationSchema
cannot be null.");
this.serializationSchema = serializationSchema;
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services