This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fe2c8ee [Pulsar-Flink] Add Batch Json Sink Support (#3046)
fe2c8ee is described below
commit fe2c8ee4d37e2a45dfb528592915746827416e18
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Sun Nov 25 19:42:58 2018 +0000
[Pulsar-Flink] Add Batch Json Sink Support (#3046)
### Motivation
This PR aims to add Flink - Pulsar Batch Json Sink Support. If user works
with Flink DataSet API and would like to write the DataSets to Pulsar in Json
format, this sink can help.
Ref: [Flink Batch Sink
API](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#data-sinks)
### Modifications
Please find the change-set as follows:
**1-** Defines `PulsarJsonOutputFormat` to write Flink Batch `DataSets`
into Pulsar by providing ready `JsonSerializationSchema`.
**2-** UT Coverages
**3-** `FlinkPulsarBatchJsonSinkExample` to show how to be used by users.
**4-** `README.md `documentation
---
.../connectors/pulsar/PulsarCsvOutputFormat.java | 1 -
...tputFormat.java => PulsarJsonOutputFormat.java} | 15 +-
.../connectors/pulsar/PulsarOutputFormat.java | 1 -
.../serialization/CsvSerializationSchema.java | 8 +-
.../JsonSerializationSchema.java} | 25 ++--
.../pulsar/PulsarJsonOutputFormatTest.java | 56 ++++++++
.../example/FlinkPulsarBatchCsvSinkExample.java | 48 ++++---
.../example/FlinkPulsarBatchJsonSinkExample.java | 108 +++++++++++++++
.../batch/connectors/pulsar/example/README.md | 152 +++++++++++++++++----
.../serialization/JsonSerializationSchemaTest.java | 94 +++++++++++++
10 files changed, 435 insertions(+), 73 deletions(-)
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
index d6aecda..adae9f7 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
@@ -30,7 +30,6 @@ public class PulsarCsvOutputFormat<T extends Tuple> extends
BasePulsarOutputForm
public PulsarCsvOutputFormat(String serviceUrl, String topicName) {
super(serviceUrl, topicName);
-
this.serializationSchema = new CsvSerializationSchema<>();
}
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
similarity index 63%
copy from
pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
copy to
pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
index d6aecda..3fe5baa 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
@@ -18,20 +18,17 @@
*/
package org.apache.flink.batch.connectors.pulsar;
-import org.apache.flink.api.java.tuple.Tuple;
-import
org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema;
+import
org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema;
/**
- * Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv
format.
+ * Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in
Json format.
*/
-public class PulsarCsvOutputFormat<T extends Tuple> extends
BasePulsarOutputFormat<T> {
+public class PulsarJsonOutputFormat<T> extends BasePulsarOutputFormat<T> {
- private static final long serialVersionUID = -4461671510903404196L;
+ private static final long serialVersionUID = 8499620770848461958L;
- public PulsarCsvOutputFormat(String serviceUrl, String topicName) {
+ public PulsarJsonOutputFormat(String serviceUrl, String topicName) {
super(serviceUrl, topicName);
-
- this.serializationSchema = new CsvSerializationSchema<>();
+ this.serializationSchema = new JsonSerializationSchema();
}
-
}
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index 8b46977..e532bfd 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -31,7 +31,6 @@ public class PulsarOutputFormat<T> extends
BasePulsarOutputFormat<T> {
public PulsarOutputFormat(String serviceUrl, String topicName,
SerializationSchema<T> serializationSchema) {
super(serviceUrl, topicName);
Preconditions.checkNotNull(serializationSchema, "serializationSchema
cannot be null.");
-
this.serializationSchema = serializationSchema;
}
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
index c01cba3..c7b7131 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
@@ -21,8 +21,6 @@ package
org.apache.flink.batch.connectors.pulsar.serialization;
import org.apache.commons.csv.CSVFormat;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringWriter;
@@ -32,14 +30,12 @@ import java.io.StringWriter;
*/
public class CsvSerializationSchema<T extends Tuple> implements
SerializationSchema<T> {
- private static final Logger LOG =
LoggerFactory.getLogger(CsvSerializationSchema.class);
private static final long serialVersionUID = -3379119592495232636L;
-
private static final int STRING_WRITER_INITIAL_BUFFER_SIZE = 256;
@Override
public byte[] serialize(T t) {
- StringWriter stringWriter = null;
+ StringWriter stringWriter;
try {
Object[] fieldsValues = new Object[t.getArity()];
for(int index = 0; index < t.getArity(); index++) {
@@ -49,7 +45,7 @@ public class CsvSerializationSchema<T extends Tuple>
implements SerializationSch
stringWriter = new StringWriter(STRING_WRITER_INITIAL_BUFFER_SIZE);
CSVFormat.DEFAULT.withRecordSeparator("").printRecord(stringWriter,
fieldsValues);
} catch (IOException e) {
- LOG.error("Error while serializing the record to Csv", e);
+ throw new RuntimeException("Error while serializing the record to
Csv", e);
}
return stringWriter.toString().getBytes();
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
similarity index 53%
copy from
pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
copy to
pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
index 8b46977..b7a56c5 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
@@ -16,23 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.flink.batch.connectors.pulsar;
+package org.apache.flink.batch.connectors.pulsar.serialization;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.util.Preconditions;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
/**
- * Pulsar Output Format to write Flink DataSets into a Pulsar topic in
user-defined format.
+ * Json Serialization Schema to serialize Dataset records to Json.
*/
-public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {
+public class JsonSerializationSchema<T> implements SerializationSchema<T> {
- private static final long serialVersionUID = 2997027580167793000L;
+ private static final long serialVersionUID = -6938065355389311385L;
+ private ObjectMapper mapper = new ObjectMapper();
- public PulsarOutputFormat(String serviceUrl, String topicName,
SerializationSchema<T> serializationSchema) {
- super(serviceUrl, topicName);
- Preconditions.checkNotNull(serializationSchema, "serializationSchema
cannot be null.");
-
- this.serializationSchema = serializationSchema;
+ @Override
+ public byte[] serialize(T t) {
+ try {
+ return mapper.writeValueAsBytes(t);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Error while serializing the record to
Json", e);
+ }
}
-
}
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
new file mode 100644
index 0000000..4a10273
--- /dev/null
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Pulsar Json Output Format
+ */
+public class PulsarJsonOutputFormatTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsNull() {
+ new PulsarJsonOutputFormat(null, "testTopic");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsNull() {
+ new PulsarJsonOutputFormat("testServiceUrl", null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsBlank() {
+ new PulsarJsonOutputFormat("testServiceUrl", " ");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsBlank() {
+ new PulsarJsonOutputFormat(" ", "testTopic");
+ }
+
+ @Test
+ public void testPulsarJsonOutputFormatConstructor() {
+ PulsarJsonOutputFormat pulsarJsonOutputFormat =
+ new PulsarJsonOutputFormat("testServiceUrl", "testTopic");
+ assertNotNull(pulsarJsonOutputFormat);
+ }
+}
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
index 0d7281a..6b0f0ca 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
@@ -33,11 +33,12 @@ import java.util.List;
*/
public class FlinkPulsarBatchCsvSinkExample {
- private static final List<Tuple4<Integer, String, String, String>>
employeeTuples = Arrays.asList(
- new Tuple4(1, "John", "Tyson", "Engineering"),
- new Tuple4(2, "Pamela", "Moon", "HR"),
- new Tuple4(3, "Jim", "Sun", "Finance"),
- new Tuple4(4, "Michael", "Star", "Engineering"));
+ private static final List<Tuple4<Integer, String, Integer, Integer>>
nasaMissions = Arrays.asList(
+ new Tuple4(1, "Mercury program", 1959, 1963),
+ new Tuple4(2, "Apollo program", 1961, 1972),
+ new Tuple4(3, "Gemini program", 1963, 1966),
+ new Tuple4(4, "Skylab", 1973, 1974),
+ new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
private static final String TOPIC_NAME = "my-flink-topic";
@@ -48,28 +49,33 @@ public class FlinkPulsarBatchCsvSinkExample {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
// create PulsarCsvOutputFormat instance
- final OutputFormat<Tuple4<Integer, String, String, String>>
pulsarCsvOutputFormat =
+ final OutputFormat<Tuple4<Integer, String, Integer, Integer>>
pulsarCsvOutputFormat =
new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
// create DataSet
- DataSet<Tuple4<Integer, String, String, String>> employeeDS =
env.fromCollection(employeeTuples);
- // map employees' name, surname and department as upper-case
- employeeDS.map(
- new MapFunction<Tuple4<Integer, String, String, String>,
Tuple4<Integer, String, String, String>>() {
- @Override
- public Tuple4<Integer, String, String, String> map(
- Tuple4<Integer, String, String, String> employeeTuple)
throws Exception {
- return new Tuple4(employeeTuple.f0,
- employeeTuple.f1.toUpperCase(),
- employeeTuple.f2.toUpperCase(),
- employeeTuple.f3.toUpperCase());
- }
- })
- // filter employees which is member of Engineering
- .filter(tuple -> tuple.f3.equals("ENGINEERING"))
+ DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS =
env.fromCollection(nasaMissions);
+ // map nasa mission names to upper-case
+ nasaMissionDS.map(
+ new MapFunction<Tuple4<Integer, String, Integer, Integer>,
Tuple4<Integer, String, Integer, Integer>>() {
+ @Override
+ public Tuple4<Integer, String, Integer, Integer>
map(
+ Tuple4<Integer, String, Integer, Integer>
nasaMission) throws Exception {
+ return new Tuple4(
+ nasaMission.f0,
+ nasaMission.f1.toUpperCase(),
+ nasaMission.f2,
+ nasaMission.f3);
+ }
+ }
+ )
+ // filter missions which started after 1970
+ .filter(nasaMission -> nasaMission.f2 > 1970)
// write batch data to Pulsar
.output(pulsarCsvOutputFormat);
+ // set parallelism to write Pulsar in parallel (optional)
+ env.setParallelism(2);
+
// execute program
env.execute("Flink - Pulsar Batch Csv");
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
new file mode 100644
index 0000000..e037616
--- /dev/null
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Implements a batch program on Pulsar topic by writing Flink DataSet as Json.
+ */
+public class FlinkPulsarBatchJsonSinkExample {
+
+ private static final List<NasaMission> nasaMissions = Arrays.asList(
+ new NasaMission(1, "Mercury program", 1959, 1963),
+ new NasaMission(2, "Apollo program", 1961, 1972),
+ new NasaMission(3, "Gemini program", 1963, 1966),
+ new NasaMission(4, "Skylab", 1973, 1974),
+ new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
+
+ private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+ private static final String TOPIC_NAME = "my-flink-topic";
+
+ public static void main(String[] args) throws Exception {
+
+ // set up the execution environment
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ // create PulsarJsonOutputFormat instance
+ final OutputFormat<NasaMission> pulsarJsonOutputFormat = new
PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+
+ // create DataSet
+ DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
+ // map nasa mission names to upper-case
+ nasaMissionDS.map(nasaMission -> new NasaMission(
+ nasaMission.id,
+ nasaMission.missionName.toUpperCase(),
+ nasaMission.startYear,
+ nasaMission.endYear))
+ // filter missions which started after 1970
+ .filter(nasaMission -> nasaMission.startYear > 1970)
+ // write batch data to Pulsar
+ .output(pulsarJsonOutputFormat);
+
+ // set parallelism to write Pulsar in parallel (optional)
+ env.setParallelism(2);
+
+ // execute program
+ env.execute("Flink - Pulsar Batch Json");
+ }
+
+ /**
+ * NasaMission data model
+ *
+ * Note: Properties should be public or have getter functions to be visible
+ */
+ private static class NasaMission {
+
+ private int id;
+ private String missionName;
+ private int startYear;
+ private int endYear;
+
+ public NasaMission(int id, String missionName, int startYear, int
endYear) {
+ this.id = id;
+ this.missionName = missionName;
+ this.startYear = startYear;
+ this.endYear = endYear;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getMissionName() {
+ return missionName;
+ }
+
+ public int getStartYear() {
+ return startYear;
+ }
+
+ public int getEndYear() {
+ return endYear;
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 9942ba4..2ab6ec0 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -112,11 +112,12 @@ WordWithCount { word = world, count = 1 }
Please find a sample usage as follows:
```java
- private static final List<Tuple4<Integer, String, String, String>>
employeeTuples = Arrays.asList(
- new Tuple4(1, "John", "Tyson", "Engineering"),
- new Tuple4(2, "Pamela", "Moon", "HR"),
- new Tuple4(3, "Jim", "Sun", "Finance"),
- new Tuple4(4, "Michael", "Star", "Engineering"));
+ private static final List<Tuple4<Integer, String, Integer, Integer>>
nasaMissions = Arrays.asList(
+ new Tuple4(1, "Mercury program", 1959, 1963),
+ new Tuple4(2, "Apollo program", 1961, 1972),
+ new Tuple4(3, "Gemini program", 1963, 1966),
+ new Tuple4(4, "Skylab", 1973, 1974),
+ new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
private static final String TOPIC_NAME = "my-flink-topic";
@@ -127,28 +128,33 @@ Please find a sample usage as follows:
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
// create PulsarCsvOutputFormat instance
- final OutputFormat<Tuple4<Integer, String, String, String>>
pulsarCsvOutputFormat =
+ final OutputFormat<Tuple4<Integer, String, Integer, Integer>>
pulsarCsvOutputFormat =
new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
// create DataSet
- DataSet<Tuple4<Integer, String, String, String>> employeeDS =
env.fromCollection(employeeTuples);
- // map employees' name, surname and department as upper-case
- employeeDS.map(
- new MapFunction<Tuple4<Integer, String, String, String>,
Tuple4<Integer, String, String, String>>() {
- @Override
- public Tuple4<Integer, String, String, String> map(
- Tuple4<Integer, String, String, String> employeeTuple)
throws Exception {
- return new Tuple4(employeeTuple.f0,
- employeeTuple.f1.toUpperCase(),
- employeeTuple.f2.toUpperCase(),
- employeeTuple.f3.toUpperCase());
- }
- })
- // filter employees who are member of Engineering
- .filter(tuple -> tuple.f3.equals("ENGINEERING"))
+ DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS =
env.fromCollection(nasaMissions);
+ // map nasa mission names to upper-case
+ nasaMissionDS.map(
+ new MapFunction<Tuple4<Integer, String, Integer, Integer>,
Tuple4<Integer, String, Integer, Integer>>() {
+ @Override
+ public Tuple4<Integer, String, Integer,
Integer> map(
+ Tuple4<Integer, String, Integer,
Integer> nasaMission) throws Exception {
+ return new Tuple4(
+ nasaMission.f0,
+ nasaMission.f1.toUpperCase(),
+ nasaMission.f2,
+ nasaMission.f3);
+ }
+ }
+ )
+ // filter missions which started after 1970
+ .filter(nasaMission -> nasaMission.f2 > 1970)
// write batch data to Pulsar
.output(pulsarCsvOutputFormat);
+ // set parallelism to write Pulsar in parallel (optional)
+ env.setParallelism(2);
+
// execute program
env.execute("Flink - Pulsar Batch Csv");
@@ -159,11 +165,109 @@ Please find a sample usage as follows:
Please find sample output for above application as follows:
```
-1,JOHN,TYSON,ENGINEERING
-4,MICHAEL,STAR,ENGINEERING
+4,SKYLAB,1973,1974
+5,APOLLO–SOYUZ TEST PROJECT,1975,1975
```
### Complete Example
You can find a complete example
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
-In this example, Flink DataSet is processed as word-count and being written to
Pulsar.
+In this example, Flink DataSet is processed and written to Pulsar in Csv
format.
+
+
+# PulsarJsonOutputFormat
+### Usage
+
+Please find a sample usage as follows:
+
+```java
+ private static final List<NasaMission> nasaMissions = Arrays.asList(
+ new NasaMission(1, "Mercury program", 1959, 1963),
+ new NasaMission(2, "Apollo program", 1961, 1972),
+ new NasaMission(3, "Gemini program", 1963, 1966),
+ new NasaMission(4, "Skylab", 1973, 1974),
+ new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
+
+ private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+ private static final String TOPIC_NAME = "my-flink-topic";
+
+ public static void main(String[] args) throws Exception {
+
+ // set up the execution environment
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ // create PulsarJsonOutputFormat instance
+ final OutputFormat<NasaMission> pulsarJsonOutputFormat = new
PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+
+ // create DataSet
+ DataSet<NasaMission> nasaMissionDS =
env.fromCollection(nasaMissions);
+ // map nasa mission names to upper-case
+ nasaMissionDS.map(nasaMission -> new NasaMission(
+ nasaMission.id,
+ nasaMission.missionName.toUpperCase(),
+ nasaMission.startYear,
+ nasaMission.endYear))
+ // filter missions which started after 1970
+ .filter(nasaMission -> nasaMission.startYear > 1970)
+ // write batch data to Pulsar
+ .output(pulsarJsonOutputFormat);
+
+ // set parallelism to write Pulsar in parallel (optional)
+ env.setParallelism(2);
+
+ // execute program
+ env.execute("Flink - Pulsar Batch Json");
+ }
+
+ /**
+ * NasaMission data model
+ *
+ * Note: Property definitions of the model should be public or have
getter functions to be visible
+ */
+ private static class NasaMission {
+
+ private int id;
+ private String missionName;
+ private int startYear;
+ private int endYear;
+
+ public NasaMission(int id, String missionName, int startYear, int
endYear) {
+ this.id = id;
+ this.missionName = missionName;
+ this.startYear = startYear;
+ this.endYear = endYear;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getMissionName() {
+ return missionName;
+ }
+
+ public int getStartYear() {
+ return startYear;
+ }
+
+ public int getEndYear() {
+ return endYear;
+ }
+ }
+
+```
+
+**Note:** Property definitions of the model should be public or have getter
functions to be visible
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
+{"id":5,"missionName":"APOLLO–SOYUZ TEST
PROJECT","startYear":1975,"endYear":1975}
+```
+
+### Complete Example
+
+You can find a complete example
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java).
+In this example, Flink DataSet is processed and written to Pulsar in Json
format.
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
new file mode 100644
index 0000000..fded6c3
--- /dev/null
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.serialization;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for Json Serialization Schema
+ */
+public class JsonSerializationSchemaTest {
+
+ @Test
+ public void testJsonSerializationSchemaWithSuccessfulCase() throws
IOException {
+ Employee employee = new Employee(1, "Test Name");
+ JsonSerializationSchema schema = new JsonSerializationSchema();
+ byte[] rowBytes = schema.serialize(employee);
+ String jsonContent = IOUtils.toString(rowBytes,
StandardCharsets.UTF_8.toString());
+ assertEquals(jsonContent, "{\"id\":1,\"name\":\"Test Name\"}");
+ }
+
+ @Test
+ public void testJsonSerializationSchemaWithEmptyRecord() throws
IOException {
+ Employee employee = new Employee();
+ JsonSerializationSchema schema = new JsonSerializationSchema();
+ byte[] employeeBytes = schema.serialize(employee);
+ String jsonContent = IOUtils.toString(employeeBytes,
StandardCharsets.UTF_8.toString());
+ assertEquals(jsonContent, "{\"id\":0,\"name\":null}");
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testJsonSerializationSchemaWithNotSerializableObject() {
+ NotSerializableObject notSerializableObject = new
NotSerializableObject();
+ JsonSerializationSchema schema = new JsonSerializationSchema();
+ schema.serialize(notSerializableObject);
+ }
+
+ /**
+ * Employee data model
+ */
+ private static class Employee {
+
+ private long id;
+ private String name;
+
+ public Employee() {
+ }
+
+ public Employee(long id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ }
+
+ /**
+ * Not Serializable Object due to not having any public property
+ */
+ private static class NotSerializableObject {
+
+ private long id;
+ private String name;
+
+ }
+}