This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8561e02  [pulsar-spark] upgrade SparkStreamingPulsarReceiver.java use 
pulsar-client and add spark example (#4143)
8561e02 is described below

commit 8561e02c70772ff34bbbedb78b1f0b55776fe512
Author: wpl <[email protected]>
AuthorDate: Mon Apr 29 04:57:35 2019 -0500

    [pulsar-spark] upgrade SparkStreamingPulsarReceiver.java use pulsar-client 
and add spark example (#4143)
    
    ### Motivation
    
    upgrade SparkStreamingPulsarReceiver.java use pulsar-client and add spark 
example
    
    ### Modifications
    
    1. upgrade  SparkStreamingPulsarReceiver.java use pulsar-client, remove 
pulsar-client-1x pom
    2. add  simple spark example
---
 examples/pom.xml                                   |   1 +
 {pulsar-spark => examples/spark}/pom.xml           |  78 ++++++++++-----
 .../example/ProducerSparkReceiverData.java         |  54 +++++++++++
 .../spark/streaming/receiver/example/README.md     | 107 +++++++++++++++++++++
 .../SparkStreamingPulsarReceiverExample.java       |  90 +++++++++++++++++
 pom.xml                                            |   4 +
 pulsar-spark/pom.xml                               |  14 ++-
 .../pulsar/spark/SparkStreamingPulsarReceiver.java |  89 ++++++++++-------
 site/docs/latest/adaptors/PulsarSpark.md           |  30 ++++--
 site2/docs/adaptors-spark.md                       |  30 ++++--
 .../spark/SparkStreamingPulsarReceiverTest.java    |  56 ++++++-----
 .../SparkStreamingPulsarReceiverExample.java       |  68 -------------
 12 files changed, 451 insertions(+), 170 deletions(-)

diff --git a/examples/pom.xml b/examples/pom.xml
index 7d76563..5cee039 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -34,6 +34,7 @@
 
   <modules>
     <module>flink</module>
+    <module>spark</module>
   </modules>
 
 </project>
diff --git a/pulsar-spark/pom.xml b/examples/spark/pom.xml
similarity index 52%
copy from pulsar-spark/pom.xml
copy to examples/spark/pom.xml
index a903af4..2c233d9 100644
--- a/pulsar-spark/pom.xml
+++ b/examples/spark/pom.xml
@@ -18,44 +18,56 @@
     under the License.
 
 -->
-<project
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
-  xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
-
   <parent>
-    <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar</artifactId>
+    <artifactId>pulsar-examples</artifactId>
+    <groupId>org.apache.pulsar.examples</groupId>
     <version>2.4.0-SNAPSHOT</version>
-    <relativePath>..</relativePath>
   </parent>
 
-  <artifactId>pulsar-spark</artifactId>
-  <name>Spark Streaming Pulsar Receivers</name>
+  <groupId>org.apache.pulsar.examples</groupId>
+  <artifactId>spark</artifactId>
+  <name>Pulsar Examples :: Spark</name>
 
-  <dependencies>
+  <properties>
+    <maven.compiler.target>1.8</maven.compiler.target>
+    <maven.compiler.source>1.8</maven.compiler.source>
+    <jaskson.version>2.6.5</jaskson.version>
+  </properties>
 
+  <dependencies>
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-client-1x</artifactId>
-      <version>${project.version}</version>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>${jaskson.version}</version>
     </dependency>
-
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jaskson.version}</version>
     </dependency>
-
     <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_2.10</artifactId>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>${jaskson.version}</version>
     </dependency>
 
     <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-spark</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
   </dependencies>
+
   <build>
     <plugins>
       <plugin>
@@ -63,12 +75,20 @@
         <artifactId>maven-shade-plugin</artifactId>
         <executions>
           <execution>
+            <id>pulsar-spark-examples</id>
             <phase>package</phase>
             <goals>
               <goal>shade</goal>
             </goals>
             <configuration>
-              <createDependencyReducedPom>true</createDependencyReducedPom>
+              <shadedArtifactAttached>false</shadedArtifactAttached>
+              <createDependencyReducedPom>false</createDependencyReducedPom>
+              <transformers>
+                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                  
<mainClass>org.apache.spark.streaming.receiver.example.SparkStreamingPulsarReceiverExample</mainClass>
+                </transformer>
+              </transformers>
+              <finalName>pulsar-spark-examples</finalName>
               <artifactSet>
                 <includes>
                   <include>com.google.guava:guava</include>
@@ -92,6 +112,20 @@
           </execution>
         </executions>
       </plugin>
+      <!-- Scala Plugin to compile Scala Files -->
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <version>4.0.1</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>add-source</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git 
a/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/ProducerSparkReceiverData.java
 
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/ProducerSparkReceiverData.java
new file mode 100644
index 0000000..e10d99d
--- /dev/null
+++ 
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/ProducerSparkReceiverData.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.streaming.receiver.example;
+
+import java.nio.charset.Charset;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+
+/**
+ * producer data to spark streaming receiver.
+ *
+ * <p>Example usage:
+ *   pulsar://localhost:6650 test_src
+ */
+public class ProducerSparkReceiverData {
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("Missing parameters!");
+      System.err.println("Usage: <pulsar-service-url> <topic>");
+      return;
+    }
+
+    System.out.println("Parameters:");
+    System.out.println("\tServiceUrl:\t" + args[0]);
+    System.out.println("\tTopic:\t" + args[1]);
+
+    try (PulsarClient client = 
PulsarClient.builder().serviceUrl(args[0]).build()) {
+      try (Producer<byte[]> producer = 
client.newProducer().topic(args[1]).create()) {
+        for (int i = 0; i < 100; i++) {
+          producer.send(("producer spark streaming 
msg").getBytes(Charset.forName("UTF-8")));
+        }
+      }
+    }
+
+    System.out.println("producer spark streaming msg end ...");
+  }
+}
diff --git 
a/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/README.md
 
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/README.md
new file mode 100644
index 0000000..742e120
--- /dev/null
+++ 
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/README.md
@@ -0,0 +1,107 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+## Apache Spark Streaming Receiver for Pulsar
+
+This page describes how to use the receiver to read Pulsar topics with [Apache 
Spark](https://spark.apache.org/) stream processing applications.
+
+## Example
+
+### PulsarSparkReceiverWordCount
+
+This spark streaming job is consuming from a Pulsar topic and counting the 
wordcount in a streaming fashion. The job can write the word count results
+to stdout or another Pulsar topic.
+
+If you choose local to run, modify PulsarSparkReceiverWordCount.java main code 
example :
+```java
+public static void main(String[] args) throws InterruptedException {
+    String serviceUrl = "pulsar://localhost:6650/";
+    String topic = "persistent://public/default/test_src";
+    String subs = "test_sub";
+
+    SparkConf sparkConf = new 
SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
+
+    JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
Durations.seconds(60));
+
+    ConsumerConfigurationData<byte[]> pulsarConf = new 
ConsumerConfigurationData();
+
+    Set<String> set = new HashSet<>();
+    set.add(topic);
+    pulsarConf.setTopicNames(set);
+    pulsarConf.setSubscriptionName(subs);
+
+    SparkStreamingPulsarReceiver pulsarReceiver = new 
SparkStreamingPulsarReceiver(
+        serviceUrl,
+        pulsarConf,
+        new AuthenticationDisabled());
+
+    JavaReceiverInputDStream<byte[]> lineDStream = 
jsc.receiverStream(pulsarReceiver);
+    JavaPairDStream<String, Integer> result = lineDStream.flatMap(x -> {
+      String line = new String(x, Charset.forName("UTF-8"));
+      List<String> list = Arrays.asList(line.split(" "));
+      return list.iterator();
+    })
+        .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
+        .reduceByKey((x, y) -> x + y);
+
+    result.print();
+
+    jsc.start();
+    jsc.awaitTermination();
+}
+```
+
+If you choose spark_submit to run, the steps to run the example:
+
+1. Start Pulsar Standalone.
+
+    You can follow the 
[instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar 
standalone locally.
+
+    ```shell
+    $ bin/pulsar standalone
+    ```
+    
+2. Build the examples.
+
+    ```shell
+    $ cd ${PULSAR_HOME}
+    $ mvn clean install -DskipTests
+    ```
+
+3. Spark Run the word count example to print results to stdout.
+
+    ```shell
+    $ ${SPARK_HOME}/bin/spark-submit --class 
org.apache.spark.streaming.receiver.example.SparkStreamingPulsarReceiverExample 
\
+                                     --master local[2] \ 
+                                     --packages 
org.apache.pulsar:pulsar-client:${project.version},org.apache.pulsar:pulsar-spark:${project.version}
 \
+                                     
${PULSAR_HOME}/examples/spark/target/pulsar-spark-examples.jar \ 
+                                     pulsar://localhost:6650 test_src test_sub
+    ```       
+
+4. When you run pulsar Producer data like ProducerSparkReceiverData, You will 
see similar to print results to stdout, e.g.:
+
+    ```shell
+    (streaming,100)
+    (producer,100)
+    (spark,100)
+    (msg,100)
+    ```
+    
\ No newline at end of file
diff --git 
a/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java
 
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java
new file mode 100644
index 0000000..6b84a38
--- /dev/null
+++ 
b/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.streaming.receiver.example;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.spark.SparkStreamingPulsarReceiver;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import scala.Tuple2;
+
+/**
+ * Implements a streaming wordCount program on pulsar topics.
+ *
+ * <p>Example usage:
+ *   pulsar://localhost:6650 test_src test_sub
+ */
+public class SparkStreamingPulsarReceiverExample {
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 3) {
+      System.err.println("Missing parameters!");
+      System.err.println("Usage: <pulsar-service-url> <topic> <sub>");
+      return;
+    }
+
+    String serviceUrl =  args[0];
+    String inputTopic =  args[1];
+    String subscription =  args[2];
+    System.out.println("Parameters:");
+    System.out.println("\tServiceUrl:\t" + serviceUrl);
+    System.out.println("\tTopic:\t" + inputTopic);
+    System.out.println("\tSubscription:\t" + subscription);
+
+    SparkConf sparkConf = new SparkConf().setAppName("Pulsar Spark Example");
+
+    JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
Durations.seconds(60));
+
+    ConsumerConfigurationData<byte[]> pulsarConf = new 
ConsumerConfigurationData();
+
+    Set<String> set = new HashSet<>();
+    set.add(inputTopic);
+    pulsarConf.setTopicNames(set);
+    pulsarConf.setSubscriptionName(subscription);
+
+    SparkStreamingPulsarReceiver pulsarReceiver = new 
SparkStreamingPulsarReceiver(
+        serviceUrl,
+        pulsarConf,
+        new AuthenticationDisabled());
+
+    JavaReceiverInputDStream<byte[]> lineDStream = 
jsc.receiverStream(pulsarReceiver);
+    JavaPairDStream<String, Integer> result = lineDStream.flatMap(x -> {
+        String line = new String(x, Charset.forName("UTF-8"));
+        List<String> list = Arrays.asList(line.split(" "));
+        return list.iterator();
+      })
+        .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
+        .reduceByKey((x, y) -> x + y);
+
+    result.print();
+
+    jsc.start();
+    jsc.awaitTermination();
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index 053b373..d9bb719 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1076,6 +1076,10 @@ flexible messaging model and an intuitive client 
API.</description>
           <groupId>io.netty</groupId>
           <artifactId>netty-tcnative-boringssl-static</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
   </dependencies>
diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml
index a903af4..08a0f75 100644
--- a/pulsar-spark/pom.xml
+++ b/pulsar-spark/pom.xml
@@ -37,8 +37,14 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-client-1x</artifactId>
+      <artifactId>pulsar-client</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
@@ -49,6 +55,12 @@
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_2.10</artifactId>
+      <exclusions>
+        <exclusion>
+          <artifactId>jackson-annotations</artifactId>
+          <groupId>com.fasterxml.jackson.core</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
diff --git 
a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
 
b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
index c761b2d..8e124ed 100644
--- 
a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
+++ 
b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
@@ -19,76 +19,93 @@
 package org.apache.pulsar.spark;
 
 import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
 
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.receiver.Receiver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
 
-    private ClientConfiguration clientConfiguration;
-    private ConsumerConfiguration consumerConfiguration;
+    private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
+
+    private String serviceUrl;
+    private ConsumerConfigurationData<byte[]> conf;
+    private Authentication authentication;
     private PulsarClient pulsarClient;
-    private String url;
-    private String topic;
-    private String subscription;
+    private Consumer<byte[]> consumer;
 
-    public SparkStreamingPulsarReceiver(ClientConfiguration 
clientConfiguration,
-            ConsumerConfiguration consumerConfiguration, String url, String 
topic, String subscription) {
-        this(StorageLevel.MEMORY_AND_DISK_2(), clientConfiguration, 
consumerConfiguration, url, topic, subscription);
+    public SparkStreamingPulsarReceiver(
+        String serviceUrl,
+        ConsumerConfigurationData<byte[]> conf,
+        Authentication authentication) {
+        this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, conf, 
authentication);
     }
 
-    public SparkStreamingPulsarReceiver(StorageLevel storageLevel, 
ClientConfiguration clientConfiguration,
-            ConsumerConfiguration consumerConfiguration, String url, String 
topic, String subscription) {
+    public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
+        String serviceUrl,
+        ConsumerConfigurationData<byte[]> conf,
+        Authentication authentication) {
         super(storageLevel);
-        checkNotNull(clientConfiguration, "ClientConfiguration must not be 
null");
-        checkNotNull(consumerConfiguration, "ConsumerConfiguration must not be 
null");
-        this.clientConfiguration = clientConfiguration;
-        this.url = url;
-        this.topic = topic;
-        this.subscription = subscription;
-        if (consumerConfiguration.getAckTimeoutMillis() == 0) {
-            consumerConfiguration.setAckTimeout(60, TimeUnit.SECONDS);
+
+        checkNotNull(serviceUrl, "serviceUrl must not be null");
+        checkNotNull(conf, "ConsumerConfigurationData must not be null");
+        checkArgument(conf.getTopicNames().size() > 0, "TopicNames must be set 
a value.");
+        checkNotNull(conf.getSubscriptionName(), "SubscriptionName must not be 
null");
+
+        this.serviceUrl = serviceUrl;
+        this.authentication = authentication;
+
+        if (conf.getAckTimeoutMillis() == 0) {
+            conf.setAckTimeoutMillis(60000);
         }
-        consumerConfiguration.setMessageListener((MessageListener & 
Serializable) (consumer, msg) -> {
-            try {
-                store(msg.getData());
-                consumer.acknowledgeAsync(msg);
-            } catch (Exception e) {
-                log.error("Failed to store a message : {}", e.getMessage());
-            }
-        });
-        this.consumerConfiguration = consumerConfiguration;
+        if (conf.getMessageListener() == null) {
+            conf.setMessageListener((MessageListener & Serializable) 
(consumer, msg) -> {
+                try {
+                    store(msg.getData());
+                    consumer.acknowledgeAsync(msg);
+                } catch (Exception e) {
+                    LOG.error("Failed to store a message : {}", 
e.getMessage());
+                }
+            });
+        }
+        this.conf = conf;
     }
 
     public void onStart() {
         try {
-            pulsarClient = PulsarClient.create(url, clientConfiguration);
-            pulsarClient.subscribe(topic, subscription, consumerConfiguration);
+            Set<String> topicNames = conf.getTopicNames();
+            String[] topicNamesArray = new String[topicNames.size()];
+            topicNames.toArray(topicNamesArray);
+            pulsarClient = 
PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
+            consumer = 
pulsarClient.newConsumer().topic(topicNamesArray).subscriptionName(conf.getSubscriptionName())
+                .messageListener(this.conf.getMessageListener()).subscribe();
         } catch (PulsarClientException e) {
-            log.error("Failed to start subscription : {}", e.getMessage());
+            LOG.error("Failed to start subscription : {}", e.getMessage());
             restart("Restart a consumer");
         }
     }
 
     public void onStop() {
         try {
+            if (consumer != null) {
+                consumer.close();
+            }
             if (pulsarClient != null) {
                 pulsarClient.close();
             }
         } catch (PulsarClientException e) {
-            log.error("Failed to close client : {}", e.getMessage());
+            LOG.error("Failed to close client : {}", e.getMessage());
         }
     }
-
-    private static final Logger log = 
LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
 }
\ No newline at end of file
diff --git a/site/docs/latest/adaptors/PulsarSpark.md 
b/site/docs/latest/adaptors/PulsarSpark.md
index 027b887..7bbc0f3 100644
--- a/site/docs/latest/adaptors/PulsarSpark.md
+++ b/site/docs/latest/adaptors/PulsarSpark.md
@@ -65,21 +65,31 @@ dependencies {
 Pass an instance of `SparkStreamingPulsarReceiver` to the `receiverStream` 
method in `JavaStreamingContext`:
 
 ```java
-SparkConf conf = new 
SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
-JavaStreamingContext jssc = new JavaStreamingContext(conf, 
Durations.seconds(5));
+    String serviceUrl = "pulsar://localhost:6650/";
+    String topic = "persistent://public/default/test_src";
+    String subs = "test_sub";
 
-ClientConfiguration clientConf = new ClientConfiguration();
-ConsumerConfiguration consConf = new ConsumerConfiguration();
-String url = "pulsar://localhost:6650/";
-String topic = "persistent://public/default/topic1";
-String subs = "sub1";
+    SparkConf sparkConf = new 
SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
 
-JavaReceiverInputDStream<byte[]> msgs = jssc
-        .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, 
url, topic, subs));
+    JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
Durations.seconds(60));
+
+    ConsumerConfigurationData<byte[]> pulsarConf = new 
ConsumerConfigurationData();
+
+    Set<String> set = new HashSet<>();
+    set.add(topic);
+    pulsarConf.setTopicNames(set);
+    pulsarConf.setSubscriptionName(subs);
+
+    SparkStreamingPulsarReceiver pulsarReceiver = new 
SparkStreamingPulsarReceiver(
+        serviceUrl,
+        pulsarConf,
+        new AuthenticationDisabled());
+
+    JavaReceiverInputDStream<byte[]> lineDStream = 
jsc.receiverStream(pulsarReceiver);
 ```
 
 
 ## Example
 
-You can find a complete example [here]({{ site.pulsar_repo 
}}/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java).
+You can find a complete example [here]({{ site.pulsar_repo 
}}/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java).
 In this example, the number of messages which contain the string "Pulsar" in 
received messages is counted.
diff --git a/site2/docs/adaptors-spark.md b/site2/docs/adaptors-spark.md
index 3ab358e..bda6117 100644
--- a/site2/docs/adaptors-spark.md
+++ b/site2/docs/adaptors-spark.md
@@ -45,22 +45,32 @@ dependencies {
 Pass an instance of `SparkStreamingPulsarReceiver` to the `receiverStream` 
method in `JavaStreamingContext`:
 
 ```java
-SparkConf conf = new 
SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
-JavaStreamingContext jssc = new JavaStreamingContext(conf, 
Durations.seconds(5));
+    String serviceUrl = "pulsar://localhost:6650/";
+    String topic = "persistent://public/default/test_src";
+    String subs = "test_sub";
 
-ClientConfiguration clientConf = new ClientConfiguration();
-ConsumerConfiguration consConf = new ConsumerConfiguration();
-String url = "pulsar://localhost:6650/";
-String topic = "persistent://public/default/topic1";
-String subs = "sub1";
+    SparkConf sparkConf = new 
SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
 
-JavaReceiverInputDStream<byte[]> msgs = jssc
-        .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, 
url, topic, subs));
+    JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
Durations.seconds(60));
+
+    ConsumerConfigurationData<byte[]> pulsarConf = new 
ConsumerConfigurationData();
+
+    Set<String> set = new HashSet<>();
+    set.add(topic);
+    pulsarConf.setTopicNames(set);
+    pulsarConf.setSubscriptionName(subs);
+
+    SparkStreamingPulsarReceiver pulsarReceiver = new 
SparkStreamingPulsarReceiver(
+        serviceUrl,
+        pulsarConf,
+        new AuthenticationDisabled());
+
+    JavaReceiverInputDStream<byte[]> lineDStream = 
jsc.receiverStream(pulsarReceiver);
 ```
 
 
 ## Example
 
-You can find a complete example 
[here](https://github.com/apache/pulsar/tree/master/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java).
+You can find a complete example 
[here](https://github.com/apache/pulsar/tree/master/examples/spark/src/main/java/org/apache/spark/streaming/receiver/example/SparkStreamingPulsarReceiverExample.java).
 In this example, the number of messages which contain the string "Pulsar" in 
received messages is counted.
 
diff --git 
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
 
b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
index 1200bb5..6f2ac5e 100644
--- 
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
+++ 
b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
@@ -23,14 +23,15 @@ import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
-import org.apache.pulsar.client.api.ClientConfiguration;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.spark.storage.StorageLevel;
 import org.mockito.ArgumentCaptor;
@@ -43,11 +44,13 @@ public class SparkStreamingPulsarReceiverTest extends 
PulsarTestSuite {
 
     @Test(dataProvider = "ServiceUrls")
     public void testReceivedMessage(String serviceUrl) throws Exception {
-        ClientConfiguration clientConf = new ClientConfiguration();
-        ConsumerConfiguration consConf = new ConsumerConfiguration();
+        ConsumerConfigurationData<byte[]> consConf = new 
ConsumerConfigurationData();
+
+        Set<String> set = new HashSet<>();
+        set.add(TOPIC);
+        consConf.setTopicNames(set);
+        consConf.setSubscriptionName(SUBS);
 
-        SparkStreamingPulsarReceiver receiver = spy(
-                new SparkStreamingPulsarReceiver(clientConf, consConf, 
serviceUrl, TOPIC, SUBS));
         MessageListener msgListener = spy(new MessageListener() {
             @Override
             public void received(Consumer consumer, Message msg) {
@@ -59,40 +62,47 @@ public class SparkStreamingPulsarReceiverTest extends 
PulsarTestSuite {
         doNothing().when(msgListener).received(consCaptor.capture(), 
msgCaptor.capture());
         consConf.setMessageListener(msgListener);
 
+        SparkStreamingPulsarReceiver receiver = new 
SparkStreamingPulsarReceiver(
+            serviceUrl,
+            consConf,
+            new AuthenticationDisabled());
+
         receiver.onStart();
         waitForTransmission();
-        PulsarClient pulsarClient = PulsarClient.create(serviceUrl, 
clientConf);
-        Producer producer = pulsarClient.createProducer(TOPIC, new 
ProducerConfiguration());
+
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+        Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
         producer.send(EXPECTED_MESSAGE.getBytes());
+
         waitForTransmission();
         receiver.onStop();
         assertEquals(new String(msgCaptor.getValue().getData()), 
EXPECTED_MESSAGE);
     }
 
-
     @Test(dataProvider = "ServiceUrls")
     public void testDefaultSettingsOfReceiver(String serviceUrl) throws 
Exception {
-        ClientConfiguration clientConf = new ClientConfiguration();
-        ConsumerConfiguration consConf = new ConsumerConfiguration();
-        SparkStreamingPulsarReceiver receiver =
-                new SparkStreamingPulsarReceiver(clientConf, consConf, 
serviceUrl, TOPIC, SUBS);
+        ConsumerConfigurationData<byte[]> consConf = new 
ConsumerConfigurationData();
+
+        Set<String> set = new HashSet<>();
+        set.add(TOPIC);
+        consConf.setTopicNames(set);
+        consConf.setSubscriptionName(SUBS);
+
+        SparkStreamingPulsarReceiver receiver = new 
SparkStreamingPulsarReceiver(
+            serviceUrl,
+            consConf,
+            new AuthenticationDisabled());
+
         assertEquals(receiver.storageLevel(), 
StorageLevel.MEMORY_AND_DISK_2());
         assertEquals(consConf.getAckTimeoutMillis(), 60_000);
         assertNotNull(consConf.getMessageListener());
     }
 
     @Test(expectedExceptions = NullPointerException.class,
-            expectedExceptionsMessageRegExp = "ClientConfiguration must not be 
null",
+            expectedExceptionsMessageRegExp = "ConsumerConfigurationData must 
not be null",
             dataProvider = "ServiceUrls")
     public void testReceiverWhenClientConfigurationIsNull(String serviceUrl) {
-        new SparkStreamingPulsarReceiver(null, new ConsumerConfiguration(), 
serviceUrl, TOPIC, SUBS);
-    }
-
-    @Test(expectedExceptions = NullPointerException.class,
-            expectedExceptionsMessageRegExp = "ConsumerConfiguration must not 
be null",
-            dataProvider = "ServiceUrls")
-    public void testReceiverWhenConsumerConfigurationIsNull(String serviceUrl) 
{
-        new SparkStreamingPulsarReceiver(new ClientConfiguration(), null, 
serviceUrl, TOPIC, SUBS);
+        new SparkStreamingPulsarReceiver(serviceUrl, null, new 
AuthenticationDisabled());
     }
 
     private static void waitForTransmission() {
diff --git 
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java
 
b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java
deleted file mode 100644
index cafe1f5..0000000
--- 
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/example/SparkStreamingPulsarReceiverExample.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.spark.example;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.spark.SparkStreamingPulsarReceiver;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-public class SparkStreamingPulsarReceiverExample {
-    public static void main(String[] args) throws InterruptedException {
-        SparkConf conf = new 
SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
-        JavaStreamingContext jssc = new JavaStreamingContext(conf, 
Durations.seconds(5));
-
-        ClientConfiguration clientConf = new ClientConfiguration();
-        ConsumerConfiguration consConf = new ConsumerConfiguration();
-        String url = "pulsar://localhost:6650/";
-        String topic = "persistent://sample/standalone/ns1/topic1";
-        String subs = "sub1";
-
-        JavaReceiverInputDStream<byte[]> msgs = jssc
-                .receiverStream(new SparkStreamingPulsarReceiver(clientConf, 
consConf, url, topic, subs));
-
-        JavaDStream<Integer> isContainingPulsar = msgs.flatMap(new 
FlatMapFunction<byte[], Integer>() {
-            @Override
-            public Iterator<Integer> call(byte[] msg) {
-                return Arrays.asList(((new String(msg)).indexOf("Pulsar") != 
-1) ? 1 : 0).iterator();
-            }
-        });
-
-        JavaDStream<Integer> numOfPulsar = isContainingPulsar.reduce(new 
Function2<Integer, Integer, Integer>() {
-            @Override
-            public Integer call(Integer i1, Integer i2) {
-                return i1 + i2;
-            }
-        });
-
-        numOfPulsar.print();
-
-        jssc.start();
-        jssc.awaitTermination();
-    }
-}

Reply via email to