sijie closed pull request #3046: [Pulsar-Flink] Add Batch Json Sink Support
URL: https://github.com/apache/pulsar/pull/3046
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
index d6aecdadb2..adae9f7781 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
@@ -30,7 +30,6 @@
 
     public PulsarCsvOutputFormat(String serviceUrl, String topicName) {
         super(serviceUrl, topicName);
-
         this.serializationSchema = new CsvSerializationSchema<>();
     }
 
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
new file mode 100644
index 0000000000..3fe5baa797
--- /dev/null
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+
+import 
org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema;
+
+/**
+ * Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in 
Json format.
+ */
+public class PulsarJsonOutputFormat<T> extends BasePulsarOutputFormat<T> {
+
+    private static final long serialVersionUID = 8499620770848461958L;
+
+    public PulsarJsonOutputFormat(String serviceUrl, String topicName) {
+        super(serviceUrl, topicName);
+        this.serializationSchema = new JsonSerializationSchema();
+    }
+}
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index 8b469771a7..e532bfd180 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -31,7 +31,6 @@
     public PulsarOutputFormat(String serviceUrl, String topicName, 
SerializationSchema<T> serializationSchema) {
         super(serviceUrl, topicName);
         Preconditions.checkNotNull(serializationSchema,  "serializationSchema 
cannot be null.");
-
         this.serializationSchema = serializationSchema;
     }
 
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
index c01cba3a04..c7b7131e1d 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
@@ -21,8 +21,6 @@
 import org.apache.commons.csv.CSVFormat;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.StringWriter;
@@ -32,14 +30,12 @@
  */
 public class CsvSerializationSchema<T extends Tuple> implements 
SerializationSchema<T> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(CsvSerializationSchema.class);
     private static final long serialVersionUID = -3379119592495232636L;
-
     private static final int STRING_WRITER_INITIAL_BUFFER_SIZE = 256;
 
     @Override
     public byte[] serialize(T t) {
-        StringWriter stringWriter = null;
+        StringWriter stringWriter;
         try {
             Object[] fieldsValues = new Object[t.getArity()];
             for(int index = 0; index < t.getArity(); index++) {
@@ -49,7 +45,7 @@
             stringWriter = new StringWriter(STRING_WRITER_INITIAL_BUFFER_SIZE);
             
CSVFormat.DEFAULT.withRecordSeparator("").printRecord(stringWriter, 
fieldsValues);
         } catch (IOException e) {
-            LOG.error("Error while serializing the record to Csv", e);
+            throw new RuntimeException("Error while serializing the record to 
Csv", e);
         }
 
         return stringWriter.toString().getBytes();
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
new file mode 100644
index 0000000000..b7a56c53e9
--- /dev/null
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Json Serialization Schema to serialize Dataset records to Json.
+ */
+public class JsonSerializationSchema<T> implements SerializationSchema<T> {
+
+    private static final long serialVersionUID = -6938065355389311385L;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public byte[] serialize(T t) {
+        try {
+            return mapper.writeValueAsBytes(t);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("Error while serializing the record to 
Json", e);
+        }
+    }
+}
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
new file mode 100644
index 0000000000..4a10273491
--- /dev/null
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Pulsar Json Output Format
+ */
+public class PulsarJsonOutputFormatTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsNull() {
+        new PulsarJsonOutputFormat(null, "testTopic");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsNull() {
+        new PulsarJsonOutputFormat("testServiceUrl", null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsBlank() {
+        new PulsarJsonOutputFormat("testServiceUrl", " ");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsBlank() {
+        new PulsarJsonOutputFormat(" ", "testTopic");
+    }
+
+    @Test
+    public void testPulsarJsonOutputFormatConstructor() {
+        PulsarJsonOutputFormat pulsarJsonOutputFormat =
+                new PulsarJsonOutputFormat("testServiceUrl", "testTopic");
+        assertNotNull(pulsarJsonOutputFormat);
+    }
+}
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
index 0d7281a77b..6b0f0ca6cd 100644
--- 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
@@ -33,11 +33,12 @@
  */
 public class FlinkPulsarBatchCsvSinkExample {
 
-    private static final List<Tuple4<Integer, String, String, String>> 
employeeTuples = Arrays.asList(
-            new Tuple4(1, "John", "Tyson", "Engineering"),
-            new Tuple4(2, "Pamela", "Moon", "HR"),
-            new Tuple4(3, "Jim", "Sun", "Finance"),
-            new Tuple4(4, "Michael", "Star", "Engineering"));
+    private static final List<Tuple4<Integer, String, Integer, Integer>> 
nasaMissions = Arrays.asList(
+            new Tuple4(1, "Mercury program", 1959, 1963),
+            new Tuple4(2, "Apollo program", 1961, 1972),
+            new Tuple4(3, "Gemini program", 1963, 1966),
+            new Tuple4(4, "Skylab", 1973, 1974),
+            new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
 
     private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
     private static final String TOPIC_NAME = "my-flink-topic";
@@ -48,28 +49,33 @@ public static void main(String[] args) throws Exception {
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         // create PulsarCsvOutputFormat instance
-        final OutputFormat<Tuple4<Integer, String, String, String>> 
pulsarCsvOutputFormat =
+        final OutputFormat<Tuple4<Integer, String, Integer, Integer>> 
pulsarCsvOutputFormat =
                 new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
 
         // create DataSet
-        DataSet<Tuple4<Integer, String, String, String>> employeeDS = 
env.fromCollection(employeeTuples);
-        // map employees' name, surname and department as upper-case
-        employeeDS.map(
-                new MapFunction<Tuple4<Integer, String, String, String>, 
Tuple4<Integer, String, String, String>>() {
-            @Override
-            public Tuple4<Integer, String, String, String> map(
-                    Tuple4<Integer, String, String, String> employeeTuple) 
throws Exception {
-                return new Tuple4(employeeTuple.f0,
-                        employeeTuple.f1.toUpperCase(),
-                        employeeTuple.f2.toUpperCase(),
-                        employeeTuple.f3.toUpperCase());
-            }
-        })
-        // filter employees which is member of Engineering
-        .filter(tuple -> tuple.f3.equals("ENGINEERING"))
+        DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = 
env.fromCollection(nasaMissions);
+        // map nasa mission names to upper-case
+        nasaMissionDS.map(
+            new MapFunction<Tuple4<Integer, String, Integer, Integer>, 
Tuple4<Integer, String, Integer, Integer>>() {
+                           @Override
+                           public Tuple4<Integer, String, Integer, Integer> 
map(
+                                   Tuple4<Integer, String, Integer, Integer> 
nasaMission) throws Exception {
+                               return new Tuple4(
+                                       nasaMission.f0,
+                                       nasaMission.f1.toUpperCase(),
+                                       nasaMission.f2,
+                                       nasaMission.f3);
+                           }
+                       }
+        )
+        // filter missions which started after 1970
+        .filter(nasaMission -> nasaMission.f2 > 1970)
         // write batch data to Pulsar
         .output(pulsarCsvOutputFormat);
 
+        // set parallelism to write Pulsar in parallel (optional)
+        env.setParallelism(2);
+
         // execute program
         env.execute("Flink - Pulsar Batch Csv");
 
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
new file mode 100644
index 0000000000..e037616c9f
--- /dev/null
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Implements a batch program on Pulsar topic by writing Flink DataSet as Json.
+ */
+public class FlinkPulsarBatchJsonSinkExample {
+
+    private static final List<NasaMission> nasaMissions = Arrays.asList(
+            new NasaMission(1, "Mercury program", 1959, 1963),
+            new NasaMission(2, "Apollo program", 1961, 1972),
+            new NasaMission(3, "Gemini program", 1963, 1966),
+            new NasaMission(4, "Skylab", 1973, 1974),
+            new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
+
+    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+    private static final String TOPIC_NAME = "my-flink-topic";
+
+    public static void main(String[] args) throws Exception {
+
+        // set up the execution environment
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        // create PulsarJsonOutputFormat instance
+        final OutputFormat<NasaMission> pulsarJsonOutputFormat = new 
PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+
+        // create DataSet
+        DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
+        // map nasa mission names to upper-case
+        nasaMissionDS.map(nasaMission -> new NasaMission(
+                nasaMission.id,
+                nasaMission.missionName.toUpperCase(),
+                nasaMission.startYear,
+                nasaMission.endYear))
+        // filter missions which started after 1970
+        .filter(nasaMission -> nasaMission.startYear > 1970)
+        // write batch data to Pulsar
+        .output(pulsarJsonOutputFormat);
+
+        // set parallelism to write Pulsar in parallel (optional)
+        env.setParallelism(2);
+
+        // execute program
+        env.execute("Flink - Pulsar Batch Json");
+    }
+
+    /**
+     * NasaMission data model
+     *
+     * Note: Properties should be public or have getter functions to be visible
+     */
+    private static class NasaMission {
+
+        private int id;
+        private String missionName;
+        private int startYear;
+        private int endYear;
+
+        public NasaMission(int id, String missionName, int startYear, int 
endYear) {
+            this.id = id;
+            this.missionName = missionName;
+            this.startYear = startYear;
+            this.endYear = endYear;
+        }
+
+        public int getId() {
+            return id;
+        }
+
+        public String getMissionName() {
+            return missionName;
+        }
+
+        public int getStartYear() {
+            return startYear;
+        }
+
+        public int getEndYear() {
+            return endYear;
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 9942ba480b..2ab6ec0b6b 100644
--- 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -112,11 +112,12 @@ WordWithCount { word = world, count = 1 }
 Please find a sample usage as follows:
 
 ```java
-        private static final List<Tuple4<Integer, String, String, String>> 
employeeTuples = Arrays.asList(
-            new Tuple4(1, "John", "Tyson", "Engineering"),
-            new Tuple4(2, "Pamela", "Moon", "HR"),
-            new Tuple4(3, "Jim", "Sun", "Finance"),
-            new Tuple4(4, "Michael", "Star", "Engineering"));
+        private static final List<Tuple4<Integer, String, Integer, Integer>> 
nasaMissions = Arrays.asList(
+                new Tuple4(1, "Mercury program", 1959, 1963),
+                new Tuple4(2, "Apollo program", 1961, 1972),
+                new Tuple4(3, "Gemini program", 1963, 1966),
+                new Tuple4(4, "Skylab", 1973, 1974),
+                new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
 
         private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
         private static final String TOPIC_NAME = "my-flink-topic";
@@ -127,28 +128,33 @@ Please find a sample usage as follows:
             final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
             // create PulsarCsvOutputFormat instance
-            final OutputFormat<Tuple4<Integer, String, String, String>> 
pulsarCsvOutputFormat =
+            final OutputFormat<Tuple4<Integer, String, Integer, Integer>> 
pulsarCsvOutputFormat =
                     new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
 
             // create DataSet
-            DataSet<Tuple4<Integer, String, String, String>> employeeDS = 
env.fromCollection(employeeTuples);
-            // map employees' name, surname and department as upper-case
-            employeeDS.map(
-                    new MapFunction<Tuple4<Integer, String, String, String>, 
Tuple4<Integer, String, String, String>>() {
-                @Override
-                public Tuple4<Integer, String, String, String> map(
-                        Tuple4<Integer, String, String, String> employeeTuple) 
throws Exception {
-                    return new Tuple4(employeeTuple.f0,
-                            employeeTuple.f1.toUpperCase(),
-                            employeeTuple.f2.toUpperCase(),
-                            employeeTuple.f3.toUpperCase());
-                }
-            })
-            // filter employees who are member of Engineering
-            .filter(tuple -> tuple.f3.equals("ENGINEERING"))
+            DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = 
env.fromCollection(nasaMissions);
+            // map nasa mission names to upper-case
+            nasaMissionDS.map(
+                new MapFunction<Tuple4<Integer, String, Integer, Integer>, 
Tuple4<Integer, String, Integer, Integer>>() {
+                               @Override
+                               public Tuple4<Integer, String, Integer, 
Integer> map(
+                                       Tuple4<Integer, String, Integer, 
Integer> nasaMission) throws Exception {
+                                   return new Tuple4(
+                                           nasaMission.f0,
+                                           nasaMission.f1.toUpperCase(),
+                                           nasaMission.f2,
+                                           nasaMission.f3);
+                               }
+                           }
+            )
+            // filter missions which started after 1970
+            .filter(nasaMission -> nasaMission.f2 > 1970)
             // write batch data to Pulsar
             .output(pulsarCsvOutputFormat);
 
+            // set parallelism to write Pulsar in parallel (optional)
+            env.setParallelism(2);
+
             // execute program
             env.execute("Flink - Pulsar Batch Csv");
 
@@ -159,11 +165,109 @@ Please find a sample usage as follows:
 
 Please find sample output for above application as follows:
 ```
-1,JOHN,TYSON,ENGINEERING
-4,MICHAEL,STAR,ENGINEERING
+4,SKYLAB,1973,1974
+5,APOLLO–SOYUZ TEST PROJECT,1975,1975
 ```
 
 ### Complete Example
 
 You can find a complete example 
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
-In this example, Flink DataSet is processed as word-count and being written to 
Pulsar.
+In this example, Flink DataSet is processed and written to Pulsar in Csv 
format.
+
+
+# PulsarJsonOutputFormat
+### Usage
+
+Please find a sample usage as follows:
+
+```java
+        private static final List<NasaMission> nasaMissions = Arrays.asList(
+                new NasaMission(1, "Mercury program", 1959, 1963),
+                new NasaMission(2, "Apollo program", 1961, 1972),
+                new NasaMission(3, "Gemini program", 1963, 1966),
+                new NasaMission(4, "Skylab", 1973, 1974),
+                new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
+
+        private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+        private static final String TOPIC_NAME = "my-flink-topic";
+
+        public static void main(String[] args) throws Exception {
+
+            // set up the execution environment
+            final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+            // create PulsarJsonOutputFormat instance
+            final OutputFormat<NasaMission> pulsarJsonOutputFormat = new 
PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+
+            // create DataSet
+            DataSet<NasaMission> nasaMissionDS = 
env.fromCollection(nasaMissions);
+            // map nasa mission names to upper-case
+            nasaMissionDS.map(nasaMission -> new NasaMission(
+                    nasaMission.id,
+                    nasaMission.missionName.toUpperCase(),
+                    nasaMission.startYear,
+                    nasaMission.endYear))
+            // filter missions which started after 1970
+            .filter(nasaMission -> nasaMission.startYear > 1970)
+            // write batch data to Pulsar
+            .output(pulsarJsonOutputFormat);
+
+            // set parallelism to write Pulsar in parallel (optional)
+            env.setParallelism(2);
+
+            // execute program
+            env.execute("Flink - Pulsar Batch Json");
+        }
+
+        /**
+         * NasaMission data model
+         *
+         * Note: Property definitions of the model should be public or have 
getter functions to be visible
+         */
+        private static class NasaMission {
+
+            private int id;
+            private String missionName;
+            private int startYear;
+            private int endYear;
+
+            public NasaMission(int id, String missionName, int startYear, int 
endYear) {
+                this.id = id;
+                this.missionName = missionName;
+                this.startYear = startYear;
+                this.endYear = endYear;
+            }
+
+            public int getId() {
+                return id;
+            }
+
+            public String getMissionName() {
+                return missionName;
+            }
+
+            public int getStartYear() {
+                return startYear;
+            }
+
+            public int getEndYear() {
+                return endYear;
+            }
+        }
+
+```
+
+**Note:** Property definitions of the model should be public or have getter 
functions to be visible
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
+{"id":5,"missionName":"APOLLO–SOYUZ TEST 
PROJECT","startYear":1975,"endYear":1975}
+```
+
+### Complete Example
+
+You can find a complete example 
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java).
+In this example, Flink DataSet is processed and written to Pulsar in Json 
format.
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
new file mode 100644
index 0000000000..fded6c391d
--- /dev/null
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.serialization;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for Json Serialization Schema
+ */
+public class JsonSerializationSchemaTest {
+
+    @Test
+    public void testJsonSerializationSchemaWithSuccessfulCase() throws 
IOException {
+        Employee employee = new Employee(1, "Test Name");
+        JsonSerializationSchema schema = new JsonSerializationSchema();
+        byte[] rowBytes = schema.serialize(employee);
+        String jsonContent = IOUtils.toString(rowBytes, 
StandardCharsets.UTF_8.toString());
+        assertEquals(jsonContent, "{\"id\":1,\"name\":\"Test Name\"}");
+    }
+
+    @Test
+    public void testJsonSerializationSchemaWithEmptyRecord() throws 
IOException {
+        Employee employee = new Employee();
+        JsonSerializationSchema schema = new JsonSerializationSchema();
+        byte[] employeeBytes = schema.serialize(employee);
+        String jsonContent = IOUtils.toString(employeeBytes, 
StandardCharsets.UTF_8.toString());
+        assertEquals(jsonContent, "{\"id\":0,\"name\":null}");
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testJsonSerializationSchemaWithNotSerializableObject() {
+        NotSerializableObject notSerializableObject = new 
NotSerializableObject();
+        JsonSerializationSchema schema = new JsonSerializationSchema();
+        schema.serialize(notSerializableObject);
+    }
+
+    /**
+     * Employee data model
+     */
+    private static class Employee {
+
+        private long id;
+        private String name;
+
+        public Employee() {
+        }
+
+        public Employee(long id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        public long getId() {
+            return id;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+    }
+
+    /**
+     * Not Serializable Object due to not having any public property
+     */
+    private static class NotSerializableObject {
+
+        private long id;
+        private String name;
+
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to