This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b413c19 [Pulsar-Flink] Add Scala Examples (#3071)
b413c19 is described below
commit b413c19aa205d38e57ebc1509099d57bb72fb844
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Wed Nov 28 01:46:11 2018 +0000
[Pulsar-Flink] Add Scala Examples (#3071)
* [Pulsar-Flink] Add Scala Examples
* Line break is added for input text.
* Adding ASF Header.
* Fix License format
---
pulsar-flink/pom.xml | 6 ++
.../example/FlinkPulsarBatchSinkExample.java | 10 +++
.../FlinkPulsarBatchCsvSinkScalaExample.scala | 78 ++++++++++++++++++++
.../FlinkPulsarBatchJsonSinkScalaExample.scala | 81 +++++++++++++++++++++
.../example/FlinkPulsarBatchSinkScalaExample.scala | 85 ++++++++++++++++++++++
5 files changed, 260 insertions(+)
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index b91af16..e48d213 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -52,6 +52,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client</artifactId>
<version>${project.version}</version>
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
index 7b35065..6724c62 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
@@ -50,6 +50,7 @@ public class FlinkPulsarBatchSinkExample {
// create DataSet
DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
+ // convert sentences to words
textDS.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out)
throws Exception {
@@ -59,23 +60,32 @@ public class FlinkPulsarBatchSinkExample {
}
}
})
+
// filter words which length is bigger than 4
.filter(wordWithCount -> wordWithCount.word.length() > 4)
+
+ // group the words
.groupBy(new KeySelector<WordWithCount, String>() {
@Override
public String getKey(WordWithCount wordWithCount) throws Exception
{
return wordWithCount.word;
}
})
+
+ // sum the word counts
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount wordWithCount1,
WordWithCount wordWithCount2) throws Exception {
return new WordWithCount(wordWithCount1.word,
wordWithCount1.count + wordWithCount2.count);
}
})
+
// write batch data to Pulsar
.output(pulsarOutputFormat);
+ // set parallelism to write Pulsar in parallel (optional)
+ env.setParallelism(2);
+
// execute program
env.execute("Flink - Pulsar Batch WordCount");
diff --git
a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
new file mode 100644
index 0000000..7db844b
--- /dev/null
+++
b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example
+
+import org.apache.flink.api.java.tuple.Tuple4
+import org.apache.flink.api.scala._
+import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat
+
+/**
+ * Implements a batch Scala program on Pulsar topic by writing Flink DataSet
as Csv.
+ */
+object FlinkPulsarBatchCsvSinkScalaExample {
+
+ /**
+ * NasaMission Model
+ */
+ private case class NasaMission(id: Int, missionName: String, startYear: Int,
endYear: Int)
+ extends Tuple4(id, missionName, startYear, endYear)
+
+ private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+ private val TOPIC_NAME = "my-flink-topic"
+
+ private val nasaMissions = List(
+ NasaMission(1, "Mercury program", 1959, 1963),
+ NasaMission(2, "Apollo program", 1961, 1972),
+ NasaMission(3, "Gemini program", 1963, 1966),
+ NasaMission(4, "Skylab", 1973, 1974),
+ NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
+
+ def main(args: Array[String]): Unit = {
+
+ // set up the execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // create PulsarCsvOutputFormat instance
+ val pulsarCsvOutputFormat =
+ new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+
+ // create DataSet
+ val textDS = env.fromCollection(nasaMissions)
+
+ // map nasa mission names to upper-case
+ textDS.map(nasaMission => NasaMission(
+ nasaMission.id,
+ nasaMission.missionName.toUpperCase,
+ nasaMission.startYear,
+ nasaMission.endYear))
+
+ // filter missions which started after 1970
+ .filter(_.startYear > 1970)
+
+ // write batch data to Pulsar as Csv
+ .output(pulsarCsvOutputFormat)
+
+ // set parallelism to write Pulsar in parallel (optional)
+ env.setParallelism(2)
+
+ // execute program
+ env.execute("Flink - Pulsar Batch Csv")
+ }
+
+}
\ No newline at end of file
diff --git
a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
new file mode 100644
index 0000000..1f7fc19
--- /dev/null
+++
b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example
+
+import org.apache.flink.api.scala._
+import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat
+
+import scala.beans.BeanProperty
+
+/**
+ * Implements a batch Scala program on Pulsar topic by writing Flink DataSet
as Json.
+ */
+object FlinkPulsarBatchJsonSinkScalaExample {
+
+ /**
+ * NasaMission Model
+ */
+ private case class NasaMission(@BeanProperty id: Int,
+ @BeanProperty missionName: String,
+ @BeanProperty startYear: Int,
+ @BeanProperty endYear: Int)
+
+ private val nasaMissions = List(
+ NasaMission(1, "Mercury program", 1959, 1963),
+ NasaMission(2, "Apollo program", 1961, 1972),
+ NasaMission(3, "Gemini program", 1963, 1966),
+ NasaMission(4, "Skylab", 1973, 1974),
+ NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
+
+ private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+ private val TOPIC_NAME = "my-flink-topic"
+
+ def main(args: Array[String]): Unit = {
+
+ // set up the execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // create PulsarJsonOutputFormat instance
+ val pulsarJsonOutputFormat = new
PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+
+ // create DataSet
+ val nasaMissionDS = env.fromCollection(nasaMissions)
+
+ // map nasa mission names to upper-case
+ nasaMissionDS.map(nasaMission =>
+ NasaMission(
+ nasaMission.id,
+ nasaMission.missionName.toUpperCase,
+ nasaMission.startYear,
+ nasaMission.endYear))
+
+ // filter missions which started after 1970
+ .filter(_.startYear > 1970)
+
+ // write batch data to Pulsar
+ .output(pulsarJsonOutputFormat)
+
+ // set parallelism to write Pulsar in parallel (optional)
+ env.setParallelism(2)
+
+ // execute program
+ env.execute("Flink - Pulsar Batch Json")
+ }
+
+}
diff --git
a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
new file mode 100644
index 0000000..5e536cf
--- /dev/null
+++
b/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example
+
+import org.apache.flink.api.common.serialization.SerializationSchema
+import org.apache.flink.api.scala._
+import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
+import org.apache.flink.util.Collector
+
+/**
+ * Data type for words with count.
+ */
+case class WordWithCount(word: String, count: Long) {
+ override def toString: String = "WordWithCount { word = " + word + ", count
= " + count + " }"
+}
+
+/**
+ * Implements a batch word-count Scala program on Pulsar topic by writing
Flink DataSet.
+ */
+object FlinkPulsarBatchSinkScalaExample {
+
+ private val EINSTEIN_QUOTE = "Imagination is more important than knowledge.
" +
+ "Knowledge is limited. Imagination encircles the world."
+ private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+ private val TOPIC_NAME = "my-flink-topic"
+
+ def main(args: Array[String]): Unit = {
+
+ // set up the execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // create PulsarOutputFormat instance
+ val pulsarOutputFormat =
+ new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new
SerializationSchema[WordWithCount] {
+ override def serialize(wordWithCount: WordWithCount): Array[Byte] =
wordWithCount.toString.getBytes
+ })
+
+ // create DataSet
+ val textDS = env.fromElements[String](EINSTEIN_QUOTE)
+
+ // convert sentence to words
+ textDS.flatMap((value: String, out: Collector[WordWithCount]) => {
+ val words = value.toLowerCase.split(" ")
+ for (word <- words) {
+ out.collect(new WordWithCount(word.replace(".", ""), 1))
+ }
+ })
+
+ // filter words which length is bigger than 4
+ .filter((wordWithCount: WordWithCount) => wordWithCount.word.length > 4)
+
+ // group the words
+ .groupBy((wordWithCount: WordWithCount) => wordWithCount.word)
+
+ // sum the word counts
+ .reduce((wordWithCount1: WordWithCount, wordWithCount2: WordWithCount) =>
+ new WordWithCount(wordWithCount1.word, wordWithCount1.count +
wordWithCount2.count))
+
+ // write batch data to Pulsar
+ .output(pulsarOutputFormat)
+
+ // set parallelism to write Pulsar in parallel (optional)
+ env.setParallelism(2)
+
+ // execute program
+ env.execute("Flink - Pulsar Batch WordCount")
+ }
+
+}
\ No newline at end of file