This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8561e02 [pulsar-spark] upgrade SparkStreamingPulsarReceiver.java use
pulsar-client and add spark example (#4143)
8561e02 is described below
commit 8561e02c70772ff34bbbedb78b1f0b55776fe512
Author: wpl <[email protected]>
AuthorDate: Mon Apr 29 04:57:35 2019 -0500
[pulsar-spark] upgrade SparkStreamingPulsarReceiver.java use pulsar-client
and add spark example (#4143)
### Motivation
upgrade SparkStreamingPulsarReceiver.java use pulsar-client and add spark
example
### Modifications
1. upgrade SparkStreamingPulsarReceiver.java use pulsar-client, remove
pulsar-client-1x pom
2. add simple spark example
---
examples/pom.xml | 1 +
{pulsar-spark => examples/spark}/pom.xml | 78 ++++++++++-----
.../example/ProducerSparkReceiverData.java | 54 +++++++++++
.../spark/streaming/receiver/example/README.md | 107 +++++++++++++++++++++
.../SparkStreamingPulsarReceiverExample.java | 90 +++++++++++++++++
pom.xml | 4 +
pulsar-spark/pom.xml | 14 ++-
.../pulsar/spark/SparkStreamingPulsarReceiver.java | 89 ++++++++++-------
site/docs/latest/adaptors/PulsarSpark.md | 30 ++++--
site2/docs/adaptors-spark.md | 30 ++++--
.../spark/SparkStreamingPulsarReceiverTest.java | 56 ++++++-----
.../SparkStreamingPulsarReceiverExample.java | 68 -------------
12 files changed, 451 insertions(+), 170 deletions(-)
diff --git a/examples/pom.xml b/examples/pom.xml
index 7d76563..5cee039 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -34,6 +34,7 @@
<modules>
<module>flink</module>
+ <module>spark</module>
</modules>
</project>
diff --git a/pulsar-spark/pom.xml b/examples/spark/pom.xml
similarity index 52%
copy from pulsar-spark/pom.xml
copy to examples/spark/pom.xml
index a903af4..2c233d9 100644
--- a/pulsar-spark/pom.xml
+++ b/examples/spark/pom.xml
@@ -18,44 +18,56 @@
under the License.
-->
-<project
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
-
<parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar</artifactId>
+ <artifactId>pulsar-examples</artifactId>
+ <groupId>org.apache.pulsar.examples</groupId>
<version>2.4.0-SNAPSHOT</version>
- <relativePath>..</relativePath>
</parent>
- <artifactId>pulsar-spark</artifactId>
- <name>Spark Streaming Pulsar Receivers</name>
+ <groupId>org.apache.pulsar.examples</groupId>
+ <artifactId>spark</artifactId>
+ <name>Pulsar Examples :: Spark</name>
- <dependencies>
+ <properties>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <jaskson.version>2.6.5</jaskson.version>
+ </properties>
+ <dependencies>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-1x</artifactId>
- <version>${project.version}</version>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jaskson.version}</version>
</dependency>
-
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jaskson.version}</version>
</dependency>
-
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jaskson.version}</version>
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-spark</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
+
<build>
<plugins>
<plugin>
@@ -63,12 +75,20 @@
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
+ <id>pulsar-spark-examples</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
- <createDependencyReducedPom>true</createDependencyReducedPom>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.spark.streaming.receiver.example.SparkStreamingPulsarReceiverExample</mainClass>
+ </transformer>
+ </transformers>
+ <finalName>pulsar-spark-examples</finalName>
<artifactSet>
<includes>
<include>com.google.guava:guava</include>
@@ -92,6 +112,20 @@
</execution>
</executions>
</plugin>
+ <!-- Scala Plugin to compile Scala Files -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>4.0.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>add-source</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/ProducerSparkReceiverData.java
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/ProducerSparkReceiverData.java
new file mode 100644
index 0000000..e10d99d
--- /dev/null
+++
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/ProducerSparkReceiverData.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.streaming.receiver.example;
+
+import java.nio.charset.Charset;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+
+/**
+ * producer data to spark streaming receiver.
+ *
+ * <p>Example usage:
+ * pulsar://localhost:6650 test_src
+ */
+public class ProducerSparkReceiverData {
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("Missing parameters!");
+ System.err.println("Usage: <pulsar-service-url> <topic>");
+ return;
+ }
+
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + args[0]);
+ System.out.println("\tTopic:\t" + args[1]);
+
+ try (PulsarClient client =
PulsarClient.builder().serviceUrl(args[0]).build()) {
+ try (Producer<byte[]> producer =
client.newProducer().topic(args[1]).create()) {
+ for (int i = 0; i < 100; i++) {
+ producer.send(("producer spark streaming
msg").getBytes(Charset.forName("UTF-8")));
+ }
+ }
+ }
+
+ System.out.println("producer spark streaming msg end ...");
+ }
+}
diff --git
a/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/README.md
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/README.md
new file mode 100644
index 0000000..742e120
--- /dev/null
+++
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/README.md
@@ -0,0 +1,107 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+## Apache Spark Streaming Receiver for Pulsar
+
+This page describes how to use the receiver to read Pulsar topics with [Apache
Spark](https://spark.apache.org/) stream processing applications.
+
+## Example
+
+### PulsarSparkReceiverWordCount
+
+This spark streaming job is consuming from a Pulsar topic and counting the
wordcount in a streaming fashion. The job can write the word count results
+to stdout or another Pulsar topic.
+
+If you choose local to run, modify PulsarSparkReceiverWordCount.java main code
example :
+```java
+public static void main(String[] args) throws InterruptedException {
+ String serviceUrl = "pulsar://localhost:6650/";
+ String topic = "persistent://public/default/test_src";
+ String subs = "test_sub";
+
+ SparkConf sparkConf = new
SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
+
+ JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,
Durations.seconds(60));
+
+ ConsumerConfigurationData<byte[]> pulsarConf = new
ConsumerConfigurationData();
+
+ Set<String> set = new HashSet<>();
+ set.add(topic);
+ pulsarConf.setTopicNames(set);
+ pulsarConf.setSubscriptionName(subs);
+
+ SparkStreamingPulsarReceiver pulsarReceiver = new
SparkStreamingPulsarReceiver(
+ serviceUrl,
+ pulsarConf,
+ new AuthenticationDisabled());
+
+ JavaReceiverInputDStream<byte[]> lineDStream =
jsc.receiverStream(pulsarReceiver);
+ JavaPairDStream<String, Integer> result = lineDStream.flatMap(x -> {
+ String line = new String(x, Charset.forName("UTF-8"));
+ List<String> list = Arrays.asList(line.split(" "));
+ return list.iterator();
+ })
+ .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
+ .reduceByKey((x, y) -> x + y);
+
+ result.print();
+
+ jsc.start();
+ jsc.awaitTermination();
+}
+```
+
+If you choose spark_submit to run, the steps to run the example:
+
+1. Start Pulsar Standalone.
+
+ You can follow the
[instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar
standalone locally.
+
+ ```shell
+ $ bin/pulsar standalone
+ ```
+
+2. Build the examples.
+
+ ```shell
+ $ cd ${PULSAR_HOME}
+ $ mvn clean install -DskipTests
+ ```
+
+3. Spark Run the word count example to print results to stdout.
+
+ ```shell
+ $ ${SPARK_HOME}/bin/spark-submit --class
org.apache.spark.streaming.receiver.example.SparkStreamingPulsarReceiverExample
\
+ --master local[2] \
+ --packages
org.apache.pulsar:pulsar-client:${project.version},org.apache.pulsar:pulsar-spark:${project.version}
\
+
${PULSAR_HOME}/examples/spark/target/pulsar-spark-examples.jar \
+ pulsar://localhost:6650 test_src test_sub
+ ```
+
+4. When you run pulsar Producer data like ProducerSparkReceiverData, You will
see similar to print results to stdout, e.g.:
+
+ ```shell
+ (streaming,100)
+ (producer,100)
+ (spark,100)
+ (msg,100)
+ ```
+
\ No newline at end of file
diff --git
a/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java
new file mode 100644
index 0000000..6b84a38
--- /dev/null
+++
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.streaming.receiver.example;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.spark.SparkStreamingPulsarReceiver;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import scala.Tuple2;
+
+/**
+ * Implements a streaming wordCount program on pulsar topics.
+ *
+ * <p>Example usage:
+ * pulsar://localhost:6650 test_src test_sub
+ */
+public class SparkStreamingPulsarReceiverExample {
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 3) {
+ System.err.println("Missing parameters!");
+ System.err.println("Usage: <pulsar-service-url> <topic> <sub>");
+ return;
+ }
+
+ String serviceUrl = args[0];
+ String inputTopic = args[1];
+ String subscription = args[2];
+ System.out.println("Parameters:");
+ System.out.println("\tServiceUrl:\t" + serviceUrl);
+ System.out.println("\tTopic:\t" + inputTopic);
+ System.out.println("\tSubscription:\t" + subscription);
+
+ SparkConf sparkConf = new SparkConf().setAppName("Pulsar Spark Example");
+
+ JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,
Durations.seconds(60));
+
+ ConsumerConfigurationData<byte[]> pulsarConf = new
ConsumerConfigurationData();
+
+ Set<String> set = new HashSet<>();
+ set.add(inputTopic);
+ pulsarConf.setTopicNames(set);
+ pulsarConf.setSubscriptionName(subscription);
+
+ SparkStreamingPulsarReceiver pulsarReceiver = new
SparkStreamingPulsarReceiver(
+ serviceUrl,
+ pulsarConf,
+ new AuthenticationDisabled());
+
+ JavaReceiverInputDStream<byte[]> lineDStream =
jsc.receiverStream(pulsarReceiver);
+ JavaPairDStream<String, Integer> result = lineDStream.flatMap(x -> {
+ String line = new String(x, Charset.forName("UTF-8"));
+ List<String> list = Arrays.asList(line.split(" "));
+ return list.iterator();
+ })
+ .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
+ .reduceByKey((x, y) -> x + y);
+
+ result.print();
+
+ jsc.start();
+ jsc.awaitTermination();
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 053b373..d9bb719 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1076,6 +1076,10 @@ flexible messaging model and an intuitive client
API.</description>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml
index a903af4..08a0f75 100644
--- a/pulsar-spark/pom.xml
+++ b/pulsar-spark/pom.xml
@@ -37,8 +37,14 @@
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-1x</artifactId>
+ <artifactId>pulsar-client</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -49,6 +55,12 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>jackson-annotations</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
diff --git
a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
index c761b2d..8e124ed 100644
---
a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
+++
b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
@@ -19,76 +19,93 @@
package org.apache.pulsar.spark;
import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
- private ClientConfiguration clientConfiguration;
- private ConsumerConfiguration consumerConfiguration;
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
+
+ private String serviceUrl;
+ private ConsumerConfigurationData<byte[]> conf;
+ private Authentication authentication;
private PulsarClient pulsarClient;
- private String url;
- private String topic;
- private String subscription;
+ private Consumer<byte[]> consumer;
- public SparkStreamingPulsarReceiver(ClientConfiguration
clientConfiguration,
- ConsumerConfiguration consumerConfiguration, String url, String
topic, String subscription) {
- this(StorageLevel.MEMORY_AND_DISK_2(), clientConfiguration,
consumerConfiguration, url, topic, subscription);
+ public SparkStreamingPulsarReceiver(
+ String serviceUrl,
+ ConsumerConfigurationData<byte[]> conf,
+ Authentication authentication) {
+ this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, conf,
authentication);
}
- public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
ClientConfiguration clientConfiguration,
- ConsumerConfiguration consumerConfiguration, String url, String
topic, String subscription) {
+ public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
+ String serviceUrl,
+ ConsumerConfigurationData<byte[]> conf,
+ Authentication authentication) {
super(storageLevel);
- checkNotNull(clientConfiguration, "ClientConfiguration must not be
null");
- checkNotNull(consumerConfiguration, "ConsumerConfiguration must not be
null");
- this.clientConfiguration = clientConfiguration;
- this.url = url;
- this.topic = topic;
- this.subscription = subscription;
- if (consumerConfiguration.getAckTimeoutMillis() == 0) {
- consumerConfiguration.setAckTimeout(60, TimeUnit.SECONDS);
+
+ checkNotNull(serviceUrl, "serviceUrl must not be null");
+ checkNotNull(conf, "ConsumerConfigurationData must not be null");
+ checkArgument(conf.getTopicNames().size() > 0, "TopicNames must be set
a value.");
+ checkNotNull(conf.getSubscriptionName(), "SubscriptionName must not be
null");
+
+ this.serviceUrl = serviceUrl;
+ this.authentication = authentication;
+
+ if (conf.getAckTimeoutMillis() == 0) {
+ conf.setAckTimeoutMillis(60000);
}
- consumerConfiguration.setMessageListener((MessageListener &
Serializable) (consumer, msg) -> {
- try {
- store(msg.getData());
- consumer.acknowledgeAsync(msg);
- } catch (Exception e) {
- log.error("Failed to store a message : {}", e.getMessage());
- }
- });
- this.consumerConfiguration = consumerConfiguration;
+ if (conf.getMessageListener() == null) {
+ conf.setMessageListener((MessageListener & Serializable)
(consumer, msg) -> {
+ try {
+ store(msg.getData());
+ consumer.acknowledgeAsync(msg);
+ } catch (Exception e) {
+ LOG.error("Failed to store a message : {}",
e.getMessage());
+ }
+ });
+ }
+ this.conf = conf;
}
public void onStart() {
try {
- pulsarClient = PulsarClient.create(url, clientConfiguration);
- pulsarClient.subscribe(topic, subscription, consumerConfiguration);
+ Set<String> topicNames = conf.getTopicNames();
+ String[] topicNamesArray = new String[topicNames.size()];
+ topicNames.toArray(topicNamesArray);
+ pulsarClient =
PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
+ consumer =
pulsarClient.newConsumer().topic(topicNamesArray).subscriptionName(conf.getSubscriptionName())
+ .messageListener(this.conf.getMessageListener()).subscribe();
} catch (PulsarClientException e) {
- log.error("Failed to start subscription : {}", e.getMessage());
+ LOG.error("Failed to start subscription : {}", e.getMessage());
restart("Restart a consumer");
}
}
public void onStop() {
try {
+ if (consumer != null) {
+ consumer.close();
+ }
if (pulsarClient != null) {
pulsarClient.close();
}
} catch (PulsarClientException e) {
- log.error("Failed to close client : {}", e.getMessage());
+ LOG.error("Failed to close client : {}", e.getMessage());
}
}
-
- private static final Logger log =
LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
}
\ No newline at end of file
diff --git a/site/docs/latest/adaptors/PulsarSpark.md
b/site/docs/latest/adaptors/PulsarSpark.md
index 027b887..7bbc0f3 100644
--- a/site/docs/latest/adaptors/PulsarSpark.md
+++ b/site/docs/latest/adaptors/PulsarSpark.md
@@ -65,21 +65,31 @@ dependencies {
Pass an instance of `SparkStreamingPulsarReceiver` to the `receiverStream`
method in `JavaStreamingContext`:
```java
-SparkConf conf = new
SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
-JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(5));
+ String serviceUrl = "pulsar://localhost:6650/";
+ String topic = "persistent://public/default/test_src";
+ String subs = "test_sub";
-ClientConfiguration clientConf = new ClientConfiguration();
-ConsumerConfiguration consConf = new ConsumerConfiguration();
-String url = "pulsar://localhost:6650/";
-String topic = "persistent://public/default/topic1";
-String subs = "sub1";
+ SparkConf sparkConf = new
SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
-JavaReceiverInputDStream<byte[]> msgs = jssc
- .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf,
url, topic, subs));
+ JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,
Durations.seconds(60));
+
+ ConsumerConfigurationData<byte[]> pulsarConf = new
ConsumerConfigurationData();
+
+ Set<String> set = new HashSet<>();
+ set.add(topic);
+ pulsarConf.setTopicNames(set);
+ pulsarConf.setSubscriptionName(subs);
+
+ SparkStreamingPulsarReceiver pulsarReceiver = new
SparkStreamingPulsarReceiver(
+ serviceUrl,
+ pulsarConf,
+ new AuthenticationDisabled());
+
+ JavaReceiverInputDStream<byte[]> lineDStream =
jsc.receiverStream(pulsarReceiver);
```
## Example
-You can find a complete example [here]({{ site.pulsar_repo
}}/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java).
+You can find a complete example [here]({{ site.pulsar_repo
}}/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java).
In this example, the number of messages which contain the string "Pulsar" in
received messages is counted.
diff --git a/site2/docs/adaptors-spark.md b/site2/docs/adaptors-spark.md
index 3ab358e..bda6117 100644
--- a/site2/docs/adaptors-spark.md
+++ b/site2/docs/adaptors-spark.md
@@ -45,22 +45,32 @@ dependencies {
Pass an instance of `SparkStreamingPulsarReceiver` to the `receiverStream`
method in `JavaStreamingContext`:
```java
-SparkConf conf = new
SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
-JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(5));
+ String serviceUrl = "pulsar://localhost:6650/";
+ String topic = "persistent://public/default/test_src";
+ String subs = "test_sub";
-ClientConfiguration clientConf = new ClientConfiguration();
-ConsumerConfiguration consConf = new ConsumerConfiguration();
-String url = "pulsar://localhost:6650/";
-String topic = "persistent://public/default/topic1";
-String subs = "sub1";
+ SparkConf sparkConf = new
SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
-JavaReceiverInputDStream<byte[]> msgs = jssc
- .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf,
url, topic, subs));
+ JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,
Durations.seconds(60));
+
+ ConsumerConfigurationData<byte[]> pulsarConf = new
ConsumerConfigurationData();
+
+ Set<String> set = new HashSet<>();
+ set.add(topic);
+ pulsarConf.setTopicNames(set);
+ pulsarConf.setSubscriptionName(subs);
+
+ SparkStreamingPulsarReceiver pulsarReceiver = new
SparkStreamingPulsarReceiver(
+ serviceUrl,
+ pulsarConf,
+ new AuthenticationDisabled());
+
+ JavaReceiverInputDStream<byte[]> lineDStream =
jsc.receiverStream(pulsarReceiver);
```
## Example
-You can find a complete example
[here](https://github.com/apache/pulsar/tree/master/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java).
+You can find a complete example
[here](https://github.com/apache/pulsar/tree/master/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java).
In this example, the number of messages which contain the string "Pulsar" in
received messages is counted.
diff --git
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
index 1200bb5..6f2ac5e 100644
---
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
+++
b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
@@ -23,14 +23,15 @@ import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.spark.storage.StorageLevel;
import org.mockito.ArgumentCaptor;
@@ -43,11 +44,13 @@ public class SparkStreamingPulsarReceiverTest extends
PulsarTestSuite {
@Test(dataProvider = "ServiceUrls")
public void testReceivedMessage(String serviceUrl) throws Exception {
- ClientConfiguration clientConf = new ClientConfiguration();
- ConsumerConfiguration consConf = new ConsumerConfiguration();
+ ConsumerConfigurationData<byte[]> consConf = new
ConsumerConfigurationData();
+
+ Set<String> set = new HashSet<>();
+ set.add(TOPIC);
+ consConf.setTopicNames(set);
+ consConf.setSubscriptionName(SUBS);
- SparkStreamingPulsarReceiver receiver = spy(
- new SparkStreamingPulsarReceiver(clientConf, consConf,
serviceUrl, TOPIC, SUBS));
MessageListener msgListener = spy(new MessageListener() {
@Override
public void received(Consumer consumer, Message msg) {
@@ -59,40 +62,47 @@ public class SparkStreamingPulsarReceiverTest extends
PulsarTestSuite {
doNothing().when(msgListener).received(consCaptor.capture(),
msgCaptor.capture());
consConf.setMessageListener(msgListener);
+ SparkStreamingPulsarReceiver receiver = new
SparkStreamingPulsarReceiver(
+ serviceUrl,
+ consConf,
+ new AuthenticationDisabled());
+
receiver.onStart();
waitForTransmission();
- PulsarClient pulsarClient = PulsarClient.create(serviceUrl,
clientConf);
- Producer producer = pulsarClient.createProducer(TOPIC, new
ProducerConfiguration());
+
+ PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
+ Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
producer.send(EXPECTED_MESSAGE.getBytes());
+
waitForTransmission();
receiver.onStop();
assertEquals(new String(msgCaptor.getValue().getData()),
EXPECTED_MESSAGE);
}
-
@Test(dataProvider = "ServiceUrls")
public void testDefaultSettingsOfReceiver(String serviceUrl) throws
Exception {
- ClientConfiguration clientConf = new ClientConfiguration();
- ConsumerConfiguration consConf = new ConsumerConfiguration();
- SparkStreamingPulsarReceiver receiver =
- new SparkStreamingPulsarReceiver(clientConf, consConf,
serviceUrl, TOPIC, SUBS);
+ ConsumerConfigurationData<byte[]> consConf = new
ConsumerConfigurationData();
+
+ Set<String> set = new HashSet<>();
+ set.add(TOPIC);
+ consConf.setTopicNames(set);
+ consConf.setSubscriptionName(SUBS);
+
+ SparkStreamingPulsarReceiver receiver = new
SparkStreamingPulsarReceiver(
+ serviceUrl,
+ consConf,
+ new AuthenticationDisabled());
+
assertEquals(receiver.storageLevel(),
StorageLevel.MEMORY_AND_DISK_2());
assertEquals(consConf.getAckTimeoutMillis(), 60_000);
assertNotNull(consConf.getMessageListener());
}
@Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "ClientConfiguration must not be
null",
+ expectedExceptionsMessageRegExp = "ConsumerConfigurationData must
not be null",
dataProvider = "ServiceUrls")
public void testReceiverWhenClientConfigurationIsNull(String serviceUrl) {
- new SparkStreamingPulsarReceiver(null, new ConsumerConfiguration(),
serviceUrl, TOPIC, SUBS);
- }
-
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "ConsumerConfiguration must not
be null",
- dataProvider = "ServiceUrls")
- public void testReceiverWhenConsumerConfigurationIsNull(String serviceUrl)
{
- new SparkStreamingPulsarReceiver(new ClientConfiguration(), null,
serviceUrl, TOPIC, SUBS);
+ new SparkStreamingPulsarReceiver(serviceUrl, null, new
AuthenticationDisabled());
}
private static void waitForTransmission() {
diff --git
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java
b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java
deleted file mode 100644
index cafe1f5..0000000
---
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.spark.example;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.spark.SparkStreamingPulsarReceiver;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-public class SparkStreamingPulsarReceiverExample {
- public static void main(String[] args) throws InterruptedException {
- SparkConf conf = new
SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
- JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(5));
-
- ClientConfiguration clientConf = new ClientConfiguration();
- ConsumerConfiguration consConf = new ConsumerConfiguration();
- String url = "pulsar://localhost:6650/";
- String topic = "persistent://sample/standalone/ns1/topic1";
- String subs = "sub1";
-
- JavaReceiverInputDStream<byte[]> msgs = jssc
- .receiverStream(new SparkStreamingPulsarReceiver(clientConf,
consConf, url, topic, subs));
-
- JavaDStream<Integer> isContainingPulsar = msgs.flatMap(new
FlatMapFunction<byte[], Integer>() {
- @Override
- public Iterator<Integer> call(byte[] msg) {
- return Arrays.asList(((new String(msg)).indexOf("Pulsar") !=
-1) ? 1 : 0).iterator();
- }
- });
-
- JavaDStream<Integer> numOfPulsar = isContainingPulsar.reduce(new
Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
-
- numOfPulsar.print();
-
- jssc.start();
- jssc.awaitTermination();
- }
-}