sijie closed pull request #2979: Add Flink - Pulsar Batch Sink Support URL: https://github.com/apache/pulsar/pull/2979
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java new file mode 100644 index 0000000000..ac54248a3c --- /dev/null +++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flink.batch.connectors.pulsar; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.function.Function; + +/** + * Flink Batch Sink to write DataSets into a Pulsar topic. + */ +public class PulsarOutputFormat<T> extends RichOutputFormat<T> { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarOutputFormat.class); + + private static String serviceUrl; + private static String topicName; + private SerializationSchema<T> serializationSchema; + + private transient Function<Throwable, MessageId> failureCallback; + + private static volatile Producer<byte[]> producer; + + public PulsarOutputFormat(String serviceUrl, String topicName, SerializationSchema<T> serializationSchema) { + Preconditions.checkNotNull(serviceUrl, "serviceUrl must not be null."); + Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName must not be blank."); + Preconditions.checkNotNull(serializationSchema, "serializationSchema must not be null."); + + this.serviceUrl = serviceUrl; + this.topicName = topicName; + this.serializationSchema = serializationSchema; + + LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic {}", this.topicName); + } + + @Override + public void configure(Configuration configuration) { + + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + this.producer = getProducerInstance(); + + this.failureCallback = cause -> { + LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause); + return null; + }; + } + + @Override + public void writeRecord(T t) throws IOException { + byte[] data = this.serializationSchema.serialize(t); + this.producer.sendAsync(data) + .exceptionally(this.failureCallback); + } + + @Override + public void close() throws IOException { + + } + + private static Producer<byte[]> getProducerInstance() throws PulsarClientException { + if(producer == null){ + synchronized (PulsarOutputFormat.class) { + if(producer == null){ + producer = Preconditions.checkNotNull(createPulsarProducer(), + "Pulsar producer must not be null."); + } + } + } + return producer; + } + + private static Producer<byte[]> createPulsarProducer() throws PulsarClientException { + try { + PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); + return client.newProducer().topic(topicName).create(); + } catch (PulsarClientException e) { + LOG.error("Pulsar producer can not be created.", e); + throw e; + } + } +} diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java index 2324c55ea5..48bc0f1b6b 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java @@ -309,7 +309,7 @@ protected void checkErroneous() throws Exception { if (e != null) { // prevent double throwing asyncException = null; - throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e); + throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e); } } diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java new file mode 100644 index 0000000000..563970945c --- /dev/null +++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flink.batch.connectors.pulsar; + +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; + +/** + * Tests for PulsarOutputFormat + */ +public class PulsarOutputFormatTest { + + @Test(expected = NullPointerException.class) + public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() { + new PulsarOutputFormat(null, "testTopic", text -> text.toString().getBytes()); + } + + @Test(expected = IllegalArgumentException.class) + public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() { + new PulsarOutputFormat("testServiceUrl", null, text -> text.toString().getBytes()); + } + + @Test(expected = IllegalArgumentException.class) + public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() { + new PulsarOutputFormat("testServiceUrl", " ", text -> text.toString().getBytes()); + } + + @Test(expected = NullPointerException.class) + public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() { + new PulsarOutputFormat("testServiceUrl", "testTopic", null); + } + + @Test + public void testPulsarOutputFormatConstructor() { + PulsarOutputFormat pulsarOutputFormat = + new PulsarOutputFormat("testServiceUrl", "testTopic", text -> text.toString().getBytes()); + assertNotNull(pulsarOutputFormat); + } + +} diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java new file mode 100644 index 0000000000..7b35065ae0 --- /dev/null +++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flink.batch.connectors.pulsar.example; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat; +import org.apache.flink.util.Collector; + +/** + * Implements a batch word-count program on Pulsar topic by writing Flink DataSet. + */ +public class FlinkPulsarBatchSinkExample { + + private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " + + "Knowledge is limited. Imagination encircles the world."; + + private static final String SERVICE_URL = "pulsar://127.0.0.1:6650"; + private static final String TOPIC_NAME = "my-flink-topic"; + + public static void main(String[] args) throws Exception { + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // create PulsarOutputFormat instance + final OutputFormat pulsarOutputFormat = + new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes()); + + // create DataSet + DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE); + + textDS.flatMap(new FlatMapFunction<String, WordWithCount>() { + @Override + public void flatMap(String value, Collector<WordWithCount> out) throws Exception { + String[] words = value.toLowerCase().split(" "); + for(String word: words) { + out.collect(new WordWithCount(word.replace(".", ""), 1)); + } + } + }) + // filter words which length is bigger than 4 + .filter(wordWithCount -> wordWithCount.word.length() > 4) + .groupBy(new KeySelector<WordWithCount, String>() { + @Override + public String getKey(WordWithCount wordWithCount) throws Exception { + return wordWithCount.word; + } + }) + .reduce(new ReduceFunction<WordWithCount>() { + @Override + public WordWithCount reduce(WordWithCount wordWithCount1, WordWithCount wordWithCount2) throws Exception { + return new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count); + } + }) + // write batch data to Pulsar + .output(pulsarOutputFormat); + + // execute program + env.execute("Flink - Pulsar Batch WordCount"); + + } + + /** + * Data type for words with count. + */ + private static class WordWithCount { + + public String word; + public long count; + + public WordWithCount(String word, long count) { + this.word = word; + this.count = count; + } + + @Override + public String toString() { + return "WordWithCount { word = " + word + ", count = " + count + " }"; + } + } +} \ No newline at end of file diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md new file mode 100644 index 0000000000..f3baf00d16 --- /dev/null +++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md @@ -0,0 +1,106 @@ +The Flink Batch Sink for Pulsar is a custom sink that enables Apache [Flink](https://flink.apache.org/) to write [DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html) to Pulsar. + +## Prerequisites + +To use this sink, include a dependency for the `pulsar-flink` library in your Java configuration. + +### Maven + +If you're using Maven, add this to your `pom.xml`: + +```xml +<!-- in your <properties> block --> +<pulsar.version>{{pulsar:version}}</pulsar.version> + +<!-- in your <dependencies> block --> +<dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-flink</artifactId> + <version>${pulsar.version}</version> +</dependency> +``` + +### Gradle + +If you're using Gradle, add this to your `build.gradle` file: + +```groovy +def pulsarVersion = "{{pulsar:version}}" + +dependencies { + compile group: 'org.apache.pulsar', name: 'pulsar-flink', version: pulsarVersion +} +``` + +## Usage + +Please find a sample usage as follows: + +```java + private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " + + "Knowledge is limited. Imagination encircles the world."; + + private static final String SERVICE_URL = "pulsar://127.0.0.1:6650"; + private static final String TOPIC_NAME = "my-flink-topic"; + + public static void main(String[] args) throws Exception { + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // create PulsarOutputFormat instance + final OutputFormat<String> pulsarOutputFormat = + new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes()); + + // create DataSet + DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE); + + textDS.flatMap(new FlatMapFunction<String, String>() { + @Override + public void flatMap(String value, Collector<String> out) throws Exception { + String[] words = value.toLowerCase().split(" "); + for(String word: words) { + out.collect(word.replace(".", "")); + } + } + }) + // filter words which length is bigger than 4 + .filter(word -> word.length() > 4) + + // write batch data to Pulsar + .output(pulsarOutputFormat); + + // execute program + env.execute("Flink - Pulsar Batch WordCount"); + } +``` + +## Sample Output + +Please find sample output for above application as follows: +``` +imagination +important +knowledge +knowledge +limited +imagination +encircles +world +``` + +## Complete Example + +You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java). +In this example, Flink DataSet is processed as word-count and being written to Pulsar. + +## Complete Example Output +Please find sample output for above linked application as follows: +``` +WordWithCount { word = important, count = 1 } +WordWithCount { word = encircles, count = 1 } +WordWithCount { word = imagination, count = 2 } +WordWithCount { word = knowledge, count = 2 } +WordWithCount { word = limited, count = 1 } +WordWithCount { word = world, count = 1 } +``` \ No newline at end of file diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java index 13680c9211..eb206ab649 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java @@ -29,7 +29,7 @@ * and publish to a Pulsar topic. The reason its called Push is * because PushSources get passed a consumer that they * invoke whenever they have data to be published to Pulsar. - * The lifcycle of a PushSource is to open it passing any config needed + * The lifecycle of a PushSource is to open it passing any config needed * by it to initialize(like open network connection, authenticate, etc). * A consumer is then to it which is invoked by the source whenever * there is data to be published. Once all data has been read, one can use close ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
