This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fe2c8ee  [Pulsar-Flink] Add Batch Json Sink Support (#3046)
fe2c8ee is described below

commit fe2c8ee4d37e2a45dfb528592915746827416e18
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Sun Nov 25 19:42:58 2018 +0000

    [Pulsar-Flink] Add Batch Json Sink Support (#3046)
    
    ### Motivation
    This PR aims to add Flink - Pulsar Batch Json Sink Support. If user works 
with Flink DataSet API and would like to write the DataSets to Pulsar in Json 
format, this sink can help.
    
    Ref: [Flink Batch Sink 
API](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#data-sinks)
    
    ### Modifications
    Please find the change-set as follows:
    
    **1-** Defines `PulsarJsonOutputFormat` to write Flink Batch `DataSets` 
into Pulsar by providing ready `JsonSerializationSchema`.
    **2-** UT Coverages
    **3-** `FlinkPulsarBatchJsonSinkExample` to show how to be used by users.
    **4-** `README.md `documentation
---
 .../connectors/pulsar/PulsarCsvOutputFormat.java   |   1 -
 ...tputFormat.java => PulsarJsonOutputFormat.java} |  15 +-
 .../connectors/pulsar/PulsarOutputFormat.java      |   1 -
 .../serialization/CsvSerializationSchema.java      |   8 +-
 .../JsonSerializationSchema.java}                  |  25 ++--
 .../pulsar/PulsarJsonOutputFormatTest.java         |  56 ++++++++
 .../example/FlinkPulsarBatchCsvSinkExample.java    |  48 ++++---
 .../example/FlinkPulsarBatchJsonSinkExample.java   | 108 +++++++++++++++
 .../batch/connectors/pulsar/example/README.md      | 152 +++++++++++++++++----
 .../serialization/JsonSerializationSchemaTest.java |  94 +++++++++++++
 10 files changed, 435 insertions(+), 73 deletions(-)

diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
index d6aecda..adae9f7 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
@@ -30,7 +30,6 @@ public class PulsarCsvOutputFormat<T extends Tuple> extends 
BasePulsarOutputForm
 
     public PulsarCsvOutputFormat(String serviceUrl, String topicName) {
         super(serviceUrl, topicName);
-
         this.serializationSchema = new CsvSerializationSchema<>();
     }
 
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
similarity index 63%
copy from 
pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
copy to 
pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
index d6aecda..3fe5baa 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
@@ -18,20 +18,17 @@
  */
 package org.apache.flink.batch.connectors.pulsar;
 
-import org.apache.flink.api.java.tuple.Tuple;
-import 
org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema;
+import 
org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema;
 
 /**
- * Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv 
format.
+ * Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in 
Json format.
  */
-public class PulsarCsvOutputFormat<T extends Tuple> extends 
BasePulsarOutputFormat<T> {
+public class PulsarJsonOutputFormat<T> extends BasePulsarOutputFormat<T> {
 
-    private static final long serialVersionUID = -4461671510903404196L;
+    private static final long serialVersionUID = 8499620770848461958L;
 
-    public PulsarCsvOutputFormat(String serviceUrl, String topicName) {
+    public PulsarJsonOutputFormat(String serviceUrl, String topicName) {
         super(serviceUrl, topicName);
-
-        this.serializationSchema = new CsvSerializationSchema<>();
+        this.serializationSchema = new JsonSerializationSchema();
     }
-
 }
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index 8b46977..e532bfd 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -31,7 +31,6 @@ public class PulsarOutputFormat<T> extends 
BasePulsarOutputFormat<T> {
     public PulsarOutputFormat(String serviceUrl, String topicName, 
SerializationSchema<T> serializationSchema) {
         super(serviceUrl, topicName);
         Preconditions.checkNotNull(serializationSchema,  "serializationSchema 
cannot be null.");
-
         this.serializationSchema = serializationSchema;
     }
 
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
index c01cba3..c7b7131 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
@@ -21,8 +21,6 @@ package 
org.apache.flink.batch.connectors.pulsar.serialization;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.StringWriter;
@@ -32,14 +30,12 @@ import java.io.StringWriter;
  */
 public class CsvSerializationSchema<T extends Tuple> implements 
SerializationSchema<T> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(CsvSerializationSchema.class);
     private static final long serialVersionUID = -3379119592495232636L;
-
     private static final int STRING_WRITER_INITIAL_BUFFER_SIZE = 256;
 
     @Override
     public byte[] serialize(T t) {
-        StringWriter stringWriter = null;
+        StringWriter stringWriter;
         try {
             Object[] fieldsValues = new Object[t.getArity()];
             for(int index = 0; index < t.getArity(); index++) {
@@ -49,7 +45,7 @@ public class CsvSerializationSchema<T extends Tuple> 
implements SerializationSch
             stringWriter = new StringWriter(STRING_WRITER_INITIAL_BUFFER_SIZE);
             
CSVFormat.DEFAULT.withRecordSeparator("").printRecord(stringWriter, 
fieldsValues);
         } catch (IOException e) {
-            LOG.error("Error while serializing the record to Csv", e);
+            throw new RuntimeException("Error while serializing the record to 
Csv", e);
         }
 
         return stringWriter.toString().getBytes();
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
similarity index 53%
copy from 
pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
copy to 
pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
index 8b46977..b7a56c5 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchema.java
@@ -16,23 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.flink.batch.connectors.pulsar;
+package org.apache.flink.batch.connectors.pulsar.serialization;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.util.Preconditions;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 /**
- * Pulsar Output Format to write Flink DataSets into a Pulsar topic in 
user-defined format.
+ * Json Serialization Schema to serialize Dataset records to Json.
  */
-public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {
+public class JsonSerializationSchema<T> implements SerializationSchema<T> {
 
-    private static final long serialVersionUID = 2997027580167793000L;
+    private static final long serialVersionUID = -6938065355389311385L;
+    private ObjectMapper mapper = new ObjectMapper();
 
-    public PulsarOutputFormat(String serviceUrl, String topicName, 
SerializationSchema<T> serializationSchema) {
-        super(serviceUrl, topicName);
-        Preconditions.checkNotNull(serializationSchema,  "serializationSchema 
cannot be null.");
-
-        this.serializationSchema = serializationSchema;
+    @Override
+    public byte[] serialize(T t) {
+        try {
+            return mapper.writeValueAsBytes(t);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("Error while serializing the record to 
Json", e);
+        }
     }
-
 }
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
new file mode 100644
index 0000000..4a10273
--- /dev/null
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Pulsar Json Output Format
+ */
+public class PulsarJsonOutputFormatTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsNull() {
+        new PulsarJsonOutputFormat(null, "testTopic");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsNull() {
+        new PulsarJsonOutputFormat("testServiceUrl", null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsBlank() {
+        new PulsarJsonOutputFormat("testServiceUrl", " ");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsBlank() {
+        new PulsarJsonOutputFormat(" ", "testTopic");
+    }
+
+    @Test
+    public void testPulsarJsonOutputFormatConstructor() {
+        PulsarJsonOutputFormat pulsarJsonOutputFormat =
+                new PulsarJsonOutputFormat("testServiceUrl", "testTopic");
+        assertNotNull(pulsarJsonOutputFormat);
+    }
+}
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
index 0d7281a..6b0f0ca 100644
--- 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
@@ -33,11 +33,12 @@ import java.util.List;
  */
 public class FlinkPulsarBatchCsvSinkExample {
 
-    private static final List<Tuple4<Integer, String, String, String>> 
employeeTuples = Arrays.asList(
-            new Tuple4(1, "John", "Tyson", "Engineering"),
-            new Tuple4(2, "Pamela", "Moon", "HR"),
-            new Tuple4(3, "Jim", "Sun", "Finance"),
-            new Tuple4(4, "Michael", "Star", "Engineering"));
+    private static final List<Tuple4<Integer, String, Integer, Integer>> 
nasaMissions = Arrays.asList(
+            new Tuple4(1, "Mercury program", 1959, 1963),
+            new Tuple4(2, "Apollo program", 1961, 1972),
+            new Tuple4(3, "Gemini program", 1963, 1966),
+            new Tuple4(4, "Skylab", 1973, 1974),
+            new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
 
     private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
     private static final String TOPIC_NAME = "my-flink-topic";
@@ -48,28 +49,33 @@ public class FlinkPulsarBatchCsvSinkExample {
         final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
         // create PulsarCsvOutputFormat instance
-        final OutputFormat<Tuple4<Integer, String, String, String>> 
pulsarCsvOutputFormat =
+        final OutputFormat<Tuple4<Integer, String, Integer, Integer>> 
pulsarCsvOutputFormat =
                 new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
 
         // create DataSet
-        DataSet<Tuple4<Integer, String, String, String>> employeeDS = 
env.fromCollection(employeeTuples);
-        // map employees' name, surname and department as upper-case
-        employeeDS.map(
-                new MapFunction<Tuple4<Integer, String, String, String>, 
Tuple4<Integer, String, String, String>>() {
-            @Override
-            public Tuple4<Integer, String, String, String> map(
-                    Tuple4<Integer, String, String, String> employeeTuple) 
throws Exception {
-                return new Tuple4(employeeTuple.f0,
-                        employeeTuple.f1.toUpperCase(),
-                        employeeTuple.f2.toUpperCase(),
-                        employeeTuple.f3.toUpperCase());
-            }
-        })
-        // filter employees which is member of Engineering
-        .filter(tuple -> tuple.f3.equals("ENGINEERING"))
+        DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = 
env.fromCollection(nasaMissions);
+        // map nasa mission names to upper-case
+        nasaMissionDS.map(
+            new MapFunction<Tuple4<Integer, String, Integer, Integer>, 
Tuple4<Integer, String, Integer, Integer>>() {
+                           @Override
+                           public Tuple4<Integer, String, Integer, Integer> 
map(
+                                   Tuple4<Integer, String, Integer, Integer> 
nasaMission) throws Exception {
+                               return new Tuple4(
+                                       nasaMission.f0,
+                                       nasaMission.f1.toUpperCase(),
+                                       nasaMission.f2,
+                                       nasaMission.f3);
+                           }
+                       }
+        )
+        // filter missions which started after 1970
+        .filter(nasaMission -> nasaMission.f2 > 1970)
         // write batch data to Pulsar
         .output(pulsarCsvOutputFormat);
 
+        // set parallelism to write Pulsar in parallel (optional)
+        env.setParallelism(2);
+
         // execute program
         env.execute("Flink - Pulsar Batch Csv");
 
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
new file mode 100644
index 0000000..e037616
--- /dev/null
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Implements a batch program on Pulsar topic by writing Flink DataSet as Json.
+ */
+public class FlinkPulsarBatchJsonSinkExample {
+
+    private static final List<NasaMission> nasaMissions = Arrays.asList(
+            new NasaMission(1, "Mercury program", 1959, 1963),
+            new NasaMission(2, "Apollo program", 1961, 1972),
+            new NasaMission(3, "Gemini program", 1963, 1966),
+            new NasaMission(4, "Skylab", 1973, 1974),
+            new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
+
+    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+    private static final String TOPIC_NAME = "my-flink-topic";
+
+    public static void main(String[] args) throws Exception {
+
+        // set up the execution environment
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        // create PulsarJsonOutputFormat instance
+        final OutputFormat<NasaMission> pulsarJsonOutputFormat = new 
PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+
+        // create DataSet
+        DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
+        // map nasa mission names to upper-case
+        nasaMissionDS.map(nasaMission -> new NasaMission(
+                nasaMission.id,
+                nasaMission.missionName.toUpperCase(),
+                nasaMission.startYear,
+                nasaMission.endYear))
+        // filter missions which started after 1970
+        .filter(nasaMission -> nasaMission.startYear > 1970)
+        // write batch data to Pulsar
+        .output(pulsarJsonOutputFormat);
+
+        // set parallelism to write Pulsar in parallel (optional)
+        env.setParallelism(2);
+
+        // execute program
+        env.execute("Flink - Pulsar Batch Json");
+    }
+
+    /**
+     * NasaMission data model
+     *
+     * Note: Properties should be public or have getter functions to be visible
+     */
+    private static class NasaMission {
+
+        private int id;
+        private String missionName;
+        private int startYear;
+        private int endYear;
+
+        public NasaMission(int id, String missionName, int startYear, int 
endYear) {
+            this.id = id;
+            this.missionName = missionName;
+            this.startYear = startYear;
+            this.endYear = endYear;
+        }
+
+        public int getId() {
+            return id;
+        }
+
+        public String getMissionName() {
+            return missionName;
+        }
+
+        public int getStartYear() {
+            return startYear;
+        }
+
+        public int getEndYear() {
+            return endYear;
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 9942ba4..2ab6ec0 100644
--- 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -112,11 +112,12 @@ WordWithCount { word = world, count = 1 }
 Please find a sample usage as follows:
 
 ```java
-        private static final List<Tuple4<Integer, String, String, String>> 
employeeTuples = Arrays.asList(
-            new Tuple4(1, "John", "Tyson", "Engineering"),
-            new Tuple4(2, "Pamela", "Moon", "HR"),
-            new Tuple4(3, "Jim", "Sun", "Finance"),
-            new Tuple4(4, "Michael", "Star", "Engineering"));
+        private static final List<Tuple4<Integer, String, Integer, Integer>> 
nasaMissions = Arrays.asList(
+                new Tuple4(1, "Mercury program", 1959, 1963),
+                new Tuple4(2, "Apollo program", 1961, 1972),
+                new Tuple4(3, "Gemini program", 1963, 1966),
+                new Tuple4(4, "Skylab", 1973, 1974),
+                new Tuple4(5, "Apollo–Soyuz Test Project", 1975, 1975));
 
         private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
         private static final String TOPIC_NAME = "my-flink-topic";
@@ -127,28 +128,33 @@ Please find a sample usage as follows:
             final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
             // create PulsarCsvOutputFormat instance
-            final OutputFormat<Tuple4<Integer, String, String, String>> 
pulsarCsvOutputFormat =
+            final OutputFormat<Tuple4<Integer, String, Integer, Integer>> 
pulsarCsvOutputFormat =
                     new PulsarCsvOutputFormat<>(SERVICE_URL, TOPIC_NAME);
 
             // create DataSet
-            DataSet<Tuple4<Integer, String, String, String>> employeeDS = 
env.fromCollection(employeeTuples);
-            // map employees' name, surname and department as upper-case
-            employeeDS.map(
-                    new MapFunction<Tuple4<Integer, String, String, String>, 
Tuple4<Integer, String, String, String>>() {
-                @Override
-                public Tuple4<Integer, String, String, String> map(
-                        Tuple4<Integer, String, String, String> employeeTuple) 
throws Exception {
-                    return new Tuple4(employeeTuple.f0,
-                            employeeTuple.f1.toUpperCase(),
-                            employeeTuple.f2.toUpperCase(),
-                            employeeTuple.f3.toUpperCase());
-                }
-            })
-            // filter employees who are member of Engineering
-            .filter(tuple -> tuple.f3.equals("ENGINEERING"))
+            DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS = 
env.fromCollection(nasaMissions);
+            // map nasa mission names to upper-case
+            nasaMissionDS.map(
+                new MapFunction<Tuple4<Integer, String, Integer, Integer>, 
Tuple4<Integer, String, Integer, Integer>>() {
+                               @Override
+                               public Tuple4<Integer, String, Integer, 
Integer> map(
+                                       Tuple4<Integer, String, Integer, 
Integer> nasaMission) throws Exception {
+                                   return new Tuple4(
+                                           nasaMission.f0,
+                                           nasaMission.f1.toUpperCase(),
+                                           nasaMission.f2,
+                                           nasaMission.f3);
+                               }
+                           }
+            )
+            // filter missions which started after 1970
+            .filter(nasaMission -> nasaMission.f2 > 1970)
             // write batch data to Pulsar
             .output(pulsarCsvOutputFormat);
 
+            // set parallelism to write Pulsar in parallel (optional)
+            env.setParallelism(2);
+
             // execute program
             env.execute("Flink - Pulsar Batch Csv");
 
@@ -159,11 +165,109 @@ Please find a sample usage as follows:
 
 Please find sample output for above application as follows:
 ```
-1,JOHN,TYSON,ENGINEERING
-4,MICHAEL,STAR,ENGINEERING
+4,SKYLAB,1973,1974
+5,APOLLO–SOYUZ TEST PROJECT,1975,1975
 ```
 
 ### Complete Example
 
 You can find a complete example 
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
-In this example, Flink DataSet is processed as word-count and being written to 
Pulsar.
+In this example, Flink DataSet is processed and written to Pulsar in Csv 
format.
+
+
+# PulsarJsonOutputFormat
+### Usage
+
+Please find a sample usage as follows:
+
+```java
+        private static final List<NasaMission> nasaMissions = Arrays.asList(
+                new NasaMission(1, "Mercury program", 1959, 1963),
+                new NasaMission(2, "Apollo program", 1961, 1972),
+                new NasaMission(3, "Gemini program", 1963, 1966),
+                new NasaMission(4, "Skylab", 1973, 1974),
+                new NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975));
+
+        private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+        private static final String TOPIC_NAME = "my-flink-topic";
+
+        public static void main(String[] args) throws Exception {
+
+            // set up the execution environment
+            final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+            // create PulsarJsonOutputFormat instance
+            final OutputFormat<NasaMission> pulsarJsonOutputFormat = new 
PulsarJsonOutputFormat<>(SERVICE_URL, TOPIC_NAME);
+
+            // create DataSet
+            DataSet<NasaMission> nasaMissionDS = 
env.fromCollection(nasaMissions);
+            // map nasa mission names to upper-case
+            nasaMissionDS.map(nasaMission -> new NasaMission(
+                    nasaMission.id,
+                    nasaMission.missionName.toUpperCase(),
+                    nasaMission.startYear,
+                    nasaMission.endYear))
+            // filter missions which started after 1970
+            .filter(nasaMission -> nasaMission.startYear > 1970)
+            // write batch data to Pulsar
+            .output(pulsarJsonOutputFormat);
+
+            // set parallelism to write Pulsar in parallel (optional)
+            env.setParallelism(2);
+
+            // execute program
+            env.execute("Flink - Pulsar Batch Json");
+        }
+
+        /**
+         * NasaMission data model
+         *
+         * Note: Property definitions of the model should be public or have 
getter functions to be visible
+         */
+        private static class NasaMission {
+
+            private int id;
+            private String missionName;
+            private int startYear;
+            private int endYear;
+
+            public NasaMission(int id, String missionName, int startYear, int 
endYear) {
+                this.id = id;
+                this.missionName = missionName;
+                this.startYear = startYear;
+                this.endYear = endYear;
+            }
+
+            public int getId() {
+                return id;
+            }
+
+            public String getMissionName() {
+                return missionName;
+            }
+
+            public int getStartYear() {
+                return startYear;
+            }
+
+            public int getEndYear() {
+                return endYear;
+            }
+        }
+
+```
+
+**Note:** Property definitions of the model should be public or have getter 
functions to be visible
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
+{"id":5,"missionName":"APOLLO–SOYUZ TEST 
PROJECT","startYear":1975,"endYear":1975}
+```
+
+### Complete Example
+
+You can find a complete example 
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java).
+In this example, Flink DataSet is processed and written to Pulsar in Json 
format.
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
new file mode 100644
index 0000000..fded6c3
--- /dev/null
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/JsonSerializationSchemaTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.serialization;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for Json Serialization Schema
+ */
+public class JsonSerializationSchemaTest {
+
+    @Test
+    public void testJsonSerializationSchemaWithSuccessfulCase() throws 
IOException {
+        Employee employee = new Employee(1, "Test Name");
+        JsonSerializationSchema schema = new JsonSerializationSchema();
+        byte[] rowBytes = schema.serialize(employee);
+        String jsonContent = IOUtils.toString(rowBytes, 
StandardCharsets.UTF_8.toString());
+        assertEquals(jsonContent, "{\"id\":1,\"name\":\"Test Name\"}");
+    }
+
+    @Test
+    public void testJsonSerializationSchemaWithEmptyRecord() throws 
IOException {
+        Employee employee = new Employee();
+        JsonSerializationSchema schema = new JsonSerializationSchema();
+        byte[] employeeBytes = schema.serialize(employee);
+        String jsonContent = IOUtils.toString(employeeBytes, 
StandardCharsets.UTF_8.toString());
+        assertEquals(jsonContent, "{\"id\":0,\"name\":null}");
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testJsonSerializationSchemaWithNotSerializableObject() {
+        NotSerializableObject notSerializableObject = new 
NotSerializableObject();
+        JsonSerializationSchema schema = new JsonSerializationSchema();
+        schema.serialize(notSerializableObject);
+    }
+
+    /**
+     * Employee data model
+     */
+    private static class Employee {
+
+        private long id;
+        private String name;
+
+        public Employee() {
+        }
+
+        public Employee(long id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        public long getId() {
+            return id;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+    }
+
+    /**
+     * Not Serializable Object due to not having any public property
+     */
+    private static class NotSerializableObject {
+
+        private long id;
+        private String name;
+
+    }
+}

Reply via email to