This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6b1921a Fixed spark receiver to account for all the consumer config options (#5152) 6b1921a is described below commit 6b1921aef3a7d0be6e1d63fcc32f37333ad569de Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Sep 11 14:19:27 2019 -0700 Fixed spark receiver to account for all the consumer config options (#5152) --- .../pulsar/spark/SparkStreamingPulsarReceiver.java | 22 ++++------ .../spark/SparkStreamingPulsarReceiverTest.java | 50 +++++++++++++++++++++- 2 files changed, 57 insertions(+), 15 deletions(-) diff --git a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java index 8e124ed..833d42e 100644 --- a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java +++ b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java @@ -18,23 +18,23 @@ */ package org.apache.pulsar.spark; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + import java.io.Serializable; -import java.util.Set; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - public class SparkStreamingPulsarReceiver extends Receiver<byte[]> { private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class); @@ -66,16 +66,14 @@ public class SparkStreamingPulsarReceiver extends Receiver<byte[]> { this.serviceUrl = serviceUrl; this.authentication = authentication; - if (conf.getAckTimeoutMillis() == 0) { - conf.setAckTimeoutMillis(60000); - } if (conf.getMessageListener() == null) { - conf.setMessageListener((MessageListener & Serializable) (consumer, msg) -> { + conf.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> { try { store(msg.getData()); consumer.acknowledgeAsync(msg); } catch (Exception e) { LOG.error("Failed to store a message : {}", e.getMessage()); + consumer.negativeAcknowledge(msg); } }); } @@ -84,13 +82,9 @@ public class SparkStreamingPulsarReceiver extends Receiver<byte[]> { public void onStart() { try { - Set<String> topicNames = conf.getTopicNames(); - String[] topicNamesArray = new String[topicNames.size()]; - topicNames.toArray(topicNamesArray); pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build(); - consumer = pulsarClient.newConsumer().topic(topicNamesArray).subscriptionName(conf.getSubscriptionName()) - .messageListener(this.conf.getMessageListener()).subscribe(); - } catch (PulsarClientException e) { + consumer = ((PulsarClientImpl) pulsarClient).subscribeAsync(conf).join(); + } catch (Exception e) { LOG.error("Failed to start subscription : {}", e.getMessage()); restart("Restart a consumer"); } diff --git a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java index 7504948..bd619a5 100644 --- a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java +++ b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java @@ -23,13 +23,18 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; + +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; @@ -94,10 +99,53 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite { new AuthenticationDisabled()); assertEquals(receiver.storageLevel(), StorageLevel.MEMORY_AND_DISK_2()); - assertEquals(consConf.getAckTimeoutMillis(), 60_000); assertNotNull(consConf.getMessageListener()); } + @Test(dataProvider = "ServiceUrls") + public void testSharedSubscription(String serviceUrl) throws Exception { + ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>(); + + Set<String> set = new HashSet<>(); + set.add(TOPIC); + consConf.setTopicNames(set); + consConf.setSubscriptionName(SUBS); + consConf.setSubscriptionType(SubscriptionType.Shared); + consConf.setReceiverQueueSize(1); + + Map<String, MutableInt> receveidCounts = new HashMap<>(); + + consConf.setMessageListener((consumer, msg) -> { + receveidCounts.computeIfAbsent(consumer.getConsumerName(), x -> new MutableInt(0)).increment(); + }); + + SparkStreamingPulsarReceiver receiver1 = new SparkStreamingPulsarReceiver( + serviceUrl, + consConf, + new AuthenticationDisabled()); + + SparkStreamingPulsarReceiver receiver2 = new SparkStreamingPulsarReceiver( + serviceUrl, + consConf, + new AuthenticationDisabled()); + + receiver1.onStart(); + receiver2.onStart(); + waitForTransmission(); + + PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); + Producer<byte[]> producer = client.newProducer().topic(TOPIC).create(); + for (int i = 0; i < 10; i++) { + producer.send(EXPECTED_MESSAGE.getBytes()); + } + + waitForTransmission(); + receiver1.onStop(); + receiver2.onStop(); + + assertEquals(receveidCounts.size(), 2); + } + @Test(expectedExceptions = NullPointerException.class, expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null", dataProvider = "ServiceUrls")