This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aefbaac Add Flink - Pulsar Batch Sink Support (#2979)
aefbaac is described below
commit aefbaacb9e29adaec593b895e625bb87d902f7b8
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Fri Nov 16 21:28:46 2018 +0000
Add Flink - Pulsar Batch Sink Support (#2979)
### Motivation
This PR aims to bring Flink - Pulsar `Batch Sink` Support. If user works
with Flink `DataSet` API and would like to write these `DataSets` to Pulsar,
this sink can help.
*Ref:* [Flink Batch Sink
API](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#data-sinks)
### Modifications
Please find the change-set as follows:
- Defines `PulsarOutputFormat` to write Flink Batch `DataSets` into Pulsar.
- UT Coverage
- `FlinkPulsarBatchSinkExample` to show how to use and to be used by users.
- `README.md` documentation
- Minor `javadoc` fix
---
.../connectors/pulsar/PulsarOutputFormat.java | 111 +++++++++++++++++++++
.../connectors/pulsar/FlinkPulsarProducer.java | 2 +-
.../connectors/pulsar/PulsarOutputFormatTest.java | 57 +++++++++++
.../example/FlinkPulsarBatchSinkExample.java | 102 +++++++++++++++++++
.../batch/connectors/pulsar/example/README.md | 106 ++++++++++++++++++++
.../java/org/apache/pulsar/io/core/PushSource.java | 2 +-
6 files changed, 378 insertions(+), 2 deletions(-)
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
new file mode 100644
index 0000000..ac54248
--- /dev/null
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * Flink Batch Sink to write DataSets into a Pulsar topic.
+ */
+public class PulsarOutputFormat<T> extends RichOutputFormat<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarOutputFormat.class);
+
+ private static String serviceUrl;
+ private static String topicName;
+ private SerializationSchema<T> serializationSchema;
+
+ private transient Function<Throwable, MessageId> failureCallback;
+
+ private static volatile Producer<byte[]> producer;
+
+ public PulsarOutputFormat(String serviceUrl, String topicName,
SerializationSchema<T> serializationSchema) {
+ Preconditions.checkNotNull(serviceUrl, "serviceUrl must not be null.");
+ Preconditions.checkArgument(StringUtils.isNotBlank(topicName),
"topicName must not be blank.");
+ Preconditions.checkNotNull(serializationSchema, "serializationSchema
must not be null.");
+
+ this.serviceUrl = serviceUrl;
+ this.topicName = topicName;
+ this.serializationSchema = serializationSchema;
+
+ LOG.info("PulsarOutputFormat is being started to write batches to
Pulsar topic {}", this.topicName);
+ }
+
+ @Override
+ public void configure(Configuration configuration) {
+
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ this.producer = getProducerInstance();
+
+ this.failureCallback = cause -> {
+ LOG.error("Error while sending record to Pulsar : " +
cause.getMessage(), cause);
+ return null;
+ };
+ }
+
+ @Override
+ public void writeRecord(T t) throws IOException {
+ byte[] data = this.serializationSchema.serialize(t);
+ this.producer.sendAsync(data)
+ .exceptionally(this.failureCallback);
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ private static Producer<byte[]> getProducerInstance() throws
PulsarClientException {
+ if(producer == null){
+ synchronized (PulsarOutputFormat.class) {
+ if(producer == null){
+ producer =
Preconditions.checkNotNull(createPulsarProducer(),
+ "Pulsar
producer must not be null.");
+ }
+ }
+ }
+ return producer;
+ }
+
+ private static Producer<byte[]> createPulsarProducer() throws
PulsarClientException {
+ try {
+ PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
+ return client.newProducer().topic(topicName).create();
+ } catch (PulsarClientException e) {
+ LOG.error("Pulsar producer can not be created.", e);
+ throw e;
+ }
+ }
+}
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 2324c55..48bc0f1 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -309,7 +309,7 @@ public class FlinkPulsarProducer<IN>
if (e != null) {
// prevent double throwing
asyncException = null;
- throw new Exception("Failed to send data to Kafka: " +
e.getMessage(), e);
+ throw new Exception("Failed to send data to Pulsar: " +
e.getMessage(), e);
}
}
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
new file mode 100644
index 0000000..5639709
--- /dev/null
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for PulsarOutputFormat
+ */
+public class PulsarOutputFormatTest {
+
+ @Test(expected = NullPointerException.class)
+ public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
+ new PulsarOutputFormat(null, "testTopic", text ->
text.toString().getBytes());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() {
+ new PulsarOutputFormat("testServiceUrl", null, text ->
text.toString().getBytes());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() {
+ new PulsarOutputFormat("testServiceUrl", " ", text ->
text.toString().getBytes());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void
testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
+ new PulsarOutputFormat("testServiceUrl", "testTopic", null);
+ }
+
+ @Test
+ public void testPulsarOutputFormatConstructor() {
+ PulsarOutputFormat pulsarOutputFormat =
+ new PulsarOutputFormat("testServiceUrl", "testTopic", text ->
text.toString().getBytes());
+ assertNotNull(pulsarOutputFormat);
+ }
+
+}
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
new file mode 100644
index 0000000..7b35065
--- /dev/null
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements a batch word-count program on Pulsar topic by writing Flink
DataSet.
+ */
+public class FlinkPulsarBatchSinkExample {
+
+ private static final String EINSTEIN_QUOTE = "Imagination is more
important than knowledge. " +
+ "Knowledge is limited. Imagination encircles the world.";
+
+ private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+ private static final String TOPIC_NAME = "my-flink-topic";
+
+ public static void main(String[] args) throws Exception {
+
+ // set up the execution environment
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ // create PulsarOutputFormat instance
+ final OutputFormat pulsarOutputFormat =
+ new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount
-> wordWithCount.toString().getBytes());
+
+ // create DataSet
+ DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
+
+ textDS.flatMap(new FlatMapFunction<String, WordWithCount>() {
+ @Override
+ public void flatMap(String value, Collector<WordWithCount> out)
throws Exception {
+ String[] words = value.toLowerCase().split(" ");
+ for(String word: words) {
+ out.collect(new WordWithCount(word.replace(".", ""), 1));
+ }
+ }
+ })
+ // filter words which length is bigger than 4
+ .filter(wordWithCount -> wordWithCount.word.length() > 4)
+ .groupBy(new KeySelector<WordWithCount, String>() {
+ @Override
+ public String getKey(WordWithCount wordWithCount) throws Exception
{
+ return wordWithCount.word;
+ }
+ })
+ .reduce(new ReduceFunction<WordWithCount>() {
+ @Override
+ public WordWithCount reduce(WordWithCount wordWithCount1,
WordWithCount wordWithCount2) throws Exception {
+ return new WordWithCount(wordWithCount1.word,
wordWithCount1.count + wordWithCount2.count);
+ }
+ })
+ // write batch data to Pulsar
+ .output(pulsarOutputFormat);
+
+ // execute program
+ env.execute("Flink - Pulsar Batch WordCount");
+
+ }
+
+ /**
+ * Data type for words with count.
+ */
+ private static class WordWithCount {
+
+ public String word;
+ public long count;
+
+ public WordWithCount(String word, long count) {
+ this.word = word;
+ this.count = count;
+ }
+
+ @Override
+ public String toString() {
+ return "WordWithCount { word = " + word + ", count = " + count + "
}";
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
new file mode 100644
index 0000000..f3baf00
--- /dev/null
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -0,0 +1,106 @@
+The Flink Batch Sink for Pulsar is a custom sink that enables Apache
[Flink](https://flink.apache.org/) to write
[DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html)
to Pulsar.
+
+## Prerequisites
+
+To use this sink, include a dependency for the `pulsar-flink` library in your
Java configuration.
+
+### Maven
+
+If you're using Maven, add this to your `pom.xml`:
+
+```xml
+<!-- in your <properties> block -->
+<pulsar.version>{{pulsar:version}}</pulsar.version>
+
+<!-- in your <dependencies> block -->
+<dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-flink</artifactId>
+ <version>${pulsar.version}</version>
+</dependency>
+```
+
+### Gradle
+
+If you're using Gradle, add this to your `build.gradle` file:
+
+```groovy
+def pulsarVersion = "{{pulsar:version}}"
+
+dependencies {
+ compile group: 'org.apache.pulsar', name: 'pulsar-flink', version:
pulsarVersion
+}
+```
+
+## Usage
+
+Please find a sample usage as follows:
+
+```java
+ private static final String EINSTEIN_QUOTE = "Imagination is more
important than knowledge. " +
+ "Knowledge is limited. Imagination encircles the world.";
+
+ private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+ private static final String TOPIC_NAME = "my-flink-topic";
+
+ public static void main(String[] args) throws Exception {
+
+ // set up the execution environment
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ // create PulsarOutputFormat instance
+ final OutputFormat<String> pulsarOutputFormat =
+ new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME,
wordWithCount -> wordWithCount.toString().getBytes());
+
+ // create DataSet
+ DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
+
+ textDS.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public void flatMap(String value, Collector<String> out)
throws Exception {
+ String[] words = value.toLowerCase().split(" ");
+ for(String word: words) {
+ out.collect(word.replace(".", ""));
+ }
+ }
+ })
+ // filter words which length is bigger than 4
+ .filter(word -> word.length() > 4)
+
+ // write batch data to Pulsar
+ .output(pulsarOutputFormat);
+
+ // execute program
+ env.execute("Flink - Pulsar Batch WordCount");
+ }
+```
+
+## Sample Output
+
+Please find sample output for above application as follows:
+```
+imagination
+important
+knowledge
+knowledge
+limited
+imagination
+encircles
+world
+```
+
+## Complete Example
+
+You can find a complete example
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
+In this example, Flink DataSet is processed as word-count and being written to
Pulsar.
+
+## Complete Example Output
+Please find sample output for above linked application as follows:
+```
+WordWithCount { word = important, count = 1 }
+WordWithCount { word = encircles, count = 1 }
+WordWithCount { word = imagination, count = 2 }
+WordWithCount { word = knowledge, count = 2 }
+WordWithCount { word = limited, count = 1 }
+WordWithCount { word = world, count = 1 }
+```
\ No newline at end of file
diff --git
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
index 13680c9..eb206ab 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.functions.api.Record;
* and publish to a Pulsar topic. The reason its called Push is
* because PushSources get passed a consumer that they
* invoke whenever they have data to be published to Pulsar.
- * The lifcycle of a PushSource is to open it passing any config needed
+ * The lifecycle of a PushSource is to open it passing any config needed
* by it to initialize(like open network connection, authenticate, etc).
* A consumer is then to it which is invoked by the source whenever
* there is data to be published. Once all data has been read, one can use
close