sijie closed pull request #3231: [pulsar-flink] add streaming connectors as a 
Pulsar stream that serializes data in Avro format
URL: https://github.com/apache/pulsar/pull/3231
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index bd3c93e795..4aef5a0252 100644
--- a/.gitignore
+++ b/.gitignore
@@ -80,5 +80,5 @@ docker.debug-info
 **/website/translated_docs*
 
 # Avro
-examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/avro/generated
-pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/avro/generated
+examples/flink-consumer-source/src/main/java/org/apache/flink/avro/generated
+pulsar-flink/src/test/java/org/apache/flink/avro/generated
diff --git a/examples/flink-consumer-source/pom.xml 
b/examples/flink-consumer-source/pom.xml
index 3c08697072..35f5924966 100644
--- a/examples/flink-consumer-source/pom.xml
+++ b/examples/flink-consumer-source/pom.xml
@@ -60,6 +60,18 @@
       <version>${flink.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-flink</artifactId>
diff --git 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index 553bf6cb6a..ef0048cbac 100644
--- 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -22,8 +22,8 @@
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.avro.generated.NasaMission;
 import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
-import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission;
 
 import java.util.Arrays;
 import java.util.List;
diff --git 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
new file mode 100644
index 0000000000..7b78da5149
--- /dev/null
+++ 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.avro.generated.WordWithCount;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.pulsar.PulsarAvroTableSink;
+import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.sinks.CsvTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+/**
+ * Implements a streaming wordcount program on pulsar topics.
+ *
+ * <p>Example usage:
+ *   --service-url pulsar://localhost:6650 --input-topic test_topic 
--subscription test_sub
+ */
+public class PulsarConsumerSourceWordCountToAvroTableSink {
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+    private static final String ROUTING_KEY = "word";
+
+    public static void main(String[] args) throws Exception {
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url 
<pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic 
<topic>");
+            return;
+        }
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().disableSysoutLogging();
+        
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000));
+        env.enableCheckpointing(5000);
+        env.getConfig().setGlobalJobParameters(parameterTool);
+        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+        StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.getTableEnvironment(env);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String inputTopic = parameterTool.getRequired("input-topic");
+        String subscription = parameterTool.get("subscription", 
"flink-examples");
+        String outputTopic = parameterTool.get("output-topic", null);
+        int parallelism = parameterTool.getInt("parallelism", 1);
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tInputTopic:\t" + inputTopic);
+        System.out.println("\tSubscription:\t" + subscription);
+        System.out.println("\tOutputTopic:\t" + outputTopic);
+        System.out.println("\tParallelism:\t" + parallelism);
+
+        PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new 
SimpleStringSchema())
+                .serviceUrl(serviceUrl)
+                .topic(inputTopic)
+                .subscriptionName(subscription);
+        SourceFunction<String> src = builder.build();
+        DataStream<String> input = env.addSource(src);
+
+
+        DataStream<WordWithCount> wc = input
+                .flatMap((FlatMapFunction<String, WordWithCount>) (line, 
collector) -> {
+                    for (String word : line.split("\\s")) {
+                        collector.collect(
+                                
WordWithCount.newBuilder().setWord(word).setCount(1).build()
+                        );
+                    }
+                })
+                .returns(WordWithCount.class)
+                .keyBy("word")
+                .timeWindow(Time.seconds(5))
+                .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
+                        
WordWithCount.newBuilder().setWord(c1.getWord()).setCount(c1.getCount() + 
c2.getCount()).build()
+                );
+
+        tableEnvironment.registerDataStream("wc",wc);
+
+        Table table = tableEnvironment.sqlQuery("select * from wc");
+        if (null != outputTopic) {
+            PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, 
outputTopic, new ProducerConfiguration(), ROUTING_KEY,WordWithCount.class);
+            table.writeToSink(sink);
+        } else {
+            TableSink sink = new CsvTableSink("./examples/file",  "|");
+            // print the results with a csv file
+            table.writeToSink(sink);
+        }
+
+        env.execute("Pulsar Stream WordCount");
+    }
+
+}
diff --git 
a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
new file mode 100644
index 0000000000..95b253675a
--- /dev/null
+++ 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.example;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.pulsar.PulsarJsonTableSink;
+import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.sinks.CsvTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+/**
+ * Implements a streaming wordcount program on pulsar topics.
+ *
+ * <p>Example usage:
+ *   --service-url pulsar://localhost:6650 --input-topic test_topic 
--subscription test_sub
+ */
+public class PulsarConsumerSourceWordCountToJsonTableSink {
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+    private static final String ROUTING_KEY = "word";
+
+    public static void main(String[] args) throws Exception {
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url 
<pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic 
<topic>");
+            return;
+        }
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().disableSysoutLogging();
+        
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000));
+        env.enableCheckpointing(5000);
+        env.getConfig().setGlobalJobParameters(parameterTool);
+        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+        StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.getTableEnvironment(env);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String inputTopic = parameterTool.getRequired("input-topic");
+        String subscription = parameterTool.get("subscription", 
"flink-examples");
+        String outputTopic = parameterTool.get("output-topic", null);
+        int parallelism = parameterTool.getInt("parallelism", 1);
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tInputTopic:\t" + inputTopic);
+        System.out.println("\tSubscription:\t" + subscription);
+        System.out.println("\tOutputTopic:\t" + outputTopic);
+        System.out.println("\tParallelism:\t" + parallelism);
+
+        PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new 
SimpleStringSchema())
+                .serviceUrl(serviceUrl)
+                .topic(inputTopic)
+                .subscriptionName(subscription);
+        SourceFunction<String> src = builder.build();
+        DataStream<String> input = env.addSource(src);
+
+
+        DataStream<WordWithCount> wc = input
+                .flatMap((FlatMapFunction<String, WordWithCount>) (line, 
collector) -> {
+                    for (String word : line.split("\\s")) {
+                        collector.collect(
+                            new WordWithCount(word, 1)
+                        );
+                    }
+                })
+                .returns(WordWithCount.class)
+                .keyBy("word")
+                .timeWindow(Time.seconds(5))
+                .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
+                        new WordWithCount(c1.word, c1.count + c2.count));
+
+        tableEnvironment.registerDataStream("wc",wc);
+
+        Table table = tableEnvironment.sqlQuery("select * from wc");
+        if (null != outputTopic) {
+            PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, 
outputTopic, new ProducerConfiguration(), ROUTING_KEY);
+            table.writeToSink(sink);
+        } else {
+            TableSink sink = new CsvTableSink("./examples/file",  "|");
+            // print the results with a csv file
+            table.writeToSink(sink);
+        }
+
+        env.execute("Pulsar Stream WordCount");
+    }
+
+    /**
+     * Data type for words with count.
+     */
+    @AllArgsConstructor
+    @NoArgsConstructor
+    @ToString
+    public static class WordWithCount {
+
+        public String word;
+        public long count;
+
+    }
+}
diff --git 
a/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc 
b/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc
index 4a669e08af..45adc9825d 100644
--- a/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc
+++ b/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc
@@ -1,4 +1,5 @@
-{"namespace": "org.apache.flink.batch.connectors.pulsar.avro.generated",
+[
+{"namespace": "org.apache.flink.avro.generated",
  "type": "record",
  "name": "NasaMission",
  "fields": [
@@ -7,4 +8,13 @@
      {"name": "start_year",  "type": ["int", "null"]},
      {"name": "end_year", "type": ["int", "null"]}
  ]
+},
+{"namespace": "org.apache.flink.avro.generated",
+ "type": "record",
+ "name": "WordWithCount",
+ "fields": [
+     {"name": "word", "type": "string"},
+     {"name": "count", "type": "long"}
+ ]
 }
+]
diff --git 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
index 5f9561128e..0d255f2a1a 100644
--- 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
+++ 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.batch.connectors.pulsar.example
 
 import org.apache.flink.api.scala._
+import org.apache.flink.avro.generated.NasaMission
 import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
-import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission
 
 /**
   * Implements a batch Scala program on Pulsar topic by writing Flink DataSet 
as Avro.
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index 92eb045957..74d3e8cc89 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -111,6 +111,10 @@
         <directory>src/main/resources</directory>
         <filtering>true</filtering>
       </resource>
+      <resource>
+        <directory>src/test/resources</directory>
+        <filtering>true</filtering>
+      </resource>
     </resources>
 
     <plugins>
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
new file mode 100644
index 0000000000..9187a0d71c
--- /dev/null
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.AvroRowSerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An append-only table sink to emit a streaming table as a Pulsar stream that 
serializes data in Avro format.
+ */
+public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
+
+    protected final String serviceUrl;
+    protected final String topic;
+    protected final ProducerConfiguration producerConf;
+    protected final String routingKeyFieldName;
+    protected SerializationSchema<Row> serializationSchema;
+    protected String[] fieldNames;
+    protected TypeInformation[] fieldTypes;
+    protected PulsarKeyExtractor<Row> keyExtractor;
+    private Class<? extends SpecificRecord> recordClazz;
+
+    /**
+     * Create PulsarAvroTableSink.
+     *
+     * @param serviceUrl          pulsar service url
+     * @param topic               topic in pulsar to which table is written
+     * @param producerConf        producer configuration
+     * @param routingKeyFieldName routing key field name
+     */
+    public PulsarAvroTableSink(
+            String serviceUrl,
+            String topic,
+            ProducerConfiguration producerConf,
+            String routingKeyFieldName,
+            Class<? extends SpecificRecord> recordClazz) {
+        this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
+        this.topic = checkNotNull(topic, "Topic is null");
+        this.producerConf = checkNotNull(producerConf, "Producer configuration 
not set");
+        this.routingKeyFieldName = routingKeyFieldName;
+        this.recordClazz = recordClazz;
+    }
+
+    /**
+     * Returns the low-level producer.
+     */
+    protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
+        serializationSchema = new AvroRowSerializationSchema(recordClazz);
+        return new FlinkPulsarProducer<Row>(
+                serviceUrl,
+                topic,
+                serializationSchema,
+                producerConf,
+                keyExtractor);
+    }
+
+    @Override
+    public void emitDataStream(DataStream<Row> dataStream) {
+        checkState(fieldNames != null, "Table sink is not configured");
+        checkState(fieldTypes != null, "Table sink is not configured");
+        checkState(serializationSchema != null, "Table sink is not 
configured");
+        checkState(keyExtractor != null, "Table sink is not configured");
+        FlinkPulsarProducer<Row> producer = createFlinkPulsarProducer();
+        dataStream.addSink(producer);
+    }
+
+    @Override
+    public TypeInformation<Row> getOutputType() {
+        return new RowTypeInfo(fieldTypes, fieldNames);
+    }
+
+    @Override
+    public String[] getFieldNames() {
+        return fieldNames;
+    }
+
+    @Override
+    public TypeInformation<?>[] getFieldTypes() {
+        return fieldTypes;
+    }
+
+    @Override
+    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] 
fieldTypes) {
+        PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, 
producerConf, routingKeyFieldName, recordClazz);
+
+        sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
+        sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
+        checkArgument(fieldNames.length == fieldTypes.length,
+                "Number of provided field names and types do not match");
+
+        sink.serializationSchema = new AvroRowSerializationSchema(recordClazz);
+        sink.keyExtractor = new AvroKeyExtractor(
+                routingKeyFieldName,
+                fieldNames,
+                fieldTypes,
+                recordClazz);
+
+        return sink;
+    }
+
+
+    /**
+     * A key extractor that extracts the routing key from a {@link Row} by 
field name.
+     */
+    private static class AvroKeyExtractor implements PulsarKeyExtractor<Row> {
+        private final int keyIndex;
+
+        public AvroKeyExtractor(
+                String keyFieldName,
+                String[] fieldNames,
+                TypeInformation<?>[] fieldTypes,
+                Class<? extends SpecificRecord> recordClazz) {
+
+            checkArgument(fieldNames.length == fieldTypes.length,
+                    "Number of provided field names and types does not 
match.");
+
+            Schema schema = SpecificData.get().getSchema(recordClazz);
+            Schema.Field keyField = schema.getField(keyFieldName);
+            Schema.Type keyType = keyField.schema().getType();
+
+            int keyIndex = Arrays.asList(fieldNames).indexOf(keyFieldName);
+            checkArgument(keyIndex >= 0,
+                    "Key field '" + keyFieldName + "' not found");
+
+            checkArgument(Schema.Type.STRING.equals(keyType),
+                    "Key field must be of type 'STRING'");
+            this.keyIndex = keyIndex;
+        }
+
+        @Override
+        public String getKey(Row event) {
+            return (String) event.getField(keyIndex);
+        }
+    }
+
+}
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
index 176acc408c..4f8f0c66ef 100644
--- 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.batch.connectors.pulsar.serialization;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission;
+import org.apache.flink.avro.generated.NasaMission;
 import org.apache.flink.formats.avro.AvroDeserializationSchema;
 import org.junit.Test;
 
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
 
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
new file mode 100644
index 0000000000..ae336ffb19
--- /dev/null
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.avro.generated.NasaMission;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ * Unit test of {@link PulsarAvroTableSink}.
+ */
+public class PulsarAvroTableSinkTest {
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+    private static final String TOPIC_NAME = "test_topic";
+    private static final String ROUTING_KEY = "name";
+
+    private final String[] fieldNames = {"id", "name","start_year","end_year"};
+    private final TypeInformation[] typeInformations = {
+            TypeInformation.of(Integer.class),
+            TypeInformation.of(String.class),
+            TypeInformation.of(Integer.class),
+            TypeInformation.of(Integer.class)
+    };
+
+    /**
+     * Test configure PulsarTableSink.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testConfigure() throws Exception {
+        PulsarAvroTableSink sink = spySink();
+
+        TableSink<Row> configuredSink = sink.configure(fieldNames, 
typeInformations);
+
+        Assert.assertArrayEquals(fieldNames, configuredSink.getFieldNames());
+        Assert.assertArrayEquals(typeInformations, 
configuredSink.getFieldTypes());
+        Assert.assertNotNull(((PulsarAvroTableSink) 
configuredSink).keyExtractor);
+        Assert.assertNotNull(((PulsarAvroTableSink) 
configuredSink).serializationSchema);
+    }
+
+
+    /**
+     * Test emit data stream.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testEmitDataStream() throws Exception {
+        DataStream mockedDataStream = Mockito.mock(DataStream.class);
+
+        PulsarAvroTableSink sink = spySink();
+
+        sink.emitDataStream(mockedDataStream);
+
+        
Mockito.verify(mockedDataStream).addSink(Mockito.any(FlinkPulsarProducer.class));
+    }
+
+
+    private PulsarAvroTableSink spySink() throws Exception {
+
+        PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, 
TOPIC_NAME, new ProducerConfiguration(), ROUTING_KEY,NasaMission.class);
+        FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
+        PowerMockito.whenNew(
+                FlinkPulsarProducer.class
+        ).withArguments(
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.any(SerializationSchema.class),
+                Mockito.any(PowerMockito.class),
+                Mockito.any(PulsarKeyExtractor.class)
+        ).thenReturn(producer);
+        Whitebox.setInternalState(sink, "fieldNames", fieldNames);
+        Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
+        Whitebox.setInternalState(sink, "serializationSchema", 
Mockito.mock(SerializationSchema.class));
+        Whitebox.setInternalState(sink, "keyExtractor", 
Mockito.mock(PulsarKeyExtractor.class));
+        return sink;
+    }
+
+}
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
 
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
new file mode 100644
index 0000000000..746e87eab9
--- /dev/null
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ * Unit test of {@link PulsarJsonTableSink}.
+ */
+public class PulsarJsonTableSinkTest {
+
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+    private static final String TOPIC_NAME = "test_topic";
+    private static final String ROUTING_KEY = "key";
+    private final String[] fieldNames = {"key", "value"};
+    private final TypeInformation[] typeInformations = {
+            TypeInformation.of(String.class),
+            TypeInformation.of(String.class)
+    };
+
+    /**
+     * Test configure PulsarTableSink.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testConfigure() throws Exception {
+        PulsarJsonTableSink sink = spySink();
+
+        TableSink<Row> configuredSink = sink.configure(fieldNames, 
typeInformations);
+
+        Assert.assertArrayEquals(fieldNames, configuredSink.getFieldNames());
+        Assert.assertArrayEquals(typeInformations, 
configuredSink.getFieldTypes());
+        Assert.assertNotNull(((PulsarJsonTableSink) 
configuredSink).keyExtractor);
+        Assert.assertNotNull(((PulsarJsonTableSink) 
configuredSink).serializationSchema);
+    }
+
+    /**
+     * Test emit data stream.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testEmitDataStream() throws Exception {
+        DataStream mockedDataStream = Mockito.mock(DataStream.class);
+
+        PulsarJsonTableSink sink = spySink();
+
+        sink.emitDataStream(mockedDataStream);
+
+        
Mockito.verify(mockedDataStream).addSink(Mockito.any(FlinkPulsarProducer.class));
+    }
+
+    private PulsarJsonTableSink spySink() throws Exception {
+        PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, 
TOPIC_NAME, new ProducerConfiguration(), ROUTING_KEY);
+        FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
+        PowerMockito.whenNew(
+                FlinkPulsarProducer.class
+        ).withArguments(
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.any(SerializationSchema.class),
+                Mockito.any(PowerMockito.class),
+                Mockito.any(PulsarKeyExtractor.class)
+        ).thenReturn(producer);
+        Whitebox.setInternalState(sink, "fieldNames", fieldNames);
+        Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
+        Whitebox.setInternalState(sink, "serializationSchema", 
Mockito.mock(SerializationSchema.class));
+        Whitebox.setInternalState(sink, "keyExtractor", 
Mockito.mock(PulsarKeyExtractor.class));
+        return sink;
+    }
+}
diff --git a/pulsar-flink/src/test/resources/avro/NasaMission.avsc 
b/pulsar-flink/src/test/resources/avro/NasaMission.avsc
index 4a669e08af..521f475126 100644
--- a/pulsar-flink/src/test/resources/avro/NasaMission.avsc
+++ b/pulsar-flink/src/test/resources/avro/NasaMission.avsc
@@ -1,4 +1,4 @@
-{"namespace": "org.apache.flink.batch.connectors.pulsar.avro.generated",
+{"namespace": "org.apache.flink.avro.generated",
  "type": "record",
  "name": "NasaMission",
  "fields": [


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to