This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push:
new 5aeddbb Add Strom adapter back after removing from Apache Storm repo
(#55)
5aeddbb is described below
commit 5aeddbb21508fa70d36199f95dabebe17774384c
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Nov 1 10:24:57 2023 -0700
Add Strom adapter back after removing from Apache Storm repo (#55)
This reverts commit ab537c13061b06de2044cd43965de930d762fe8a.
---
.asf.yaml | 3 +-
README.md | 4 +-
pom.xml | 19 +
pulsar-storm/pom.xml | 102 +++++
.../apache/pulsar/storm/MessageToValuesMapper.java | 44 ++
.../java/org/apache/pulsar/storm/PulsarBolt.java | 207 +++++++++
.../pulsar/storm/PulsarBoltConfiguration.java | 57 +++
.../java/org/apache/pulsar/storm/PulsarSpout.java | 494 +++++++++++++++++++++
.../pulsar/storm/PulsarSpoutConfiguration.java | 195 ++++++++
.../apache/pulsar/storm/PulsarSpoutConsumer.java | 58 +++
.../pulsar/storm/PulsarStormConfiguration.java | 90 ++++
.../java/org/apache/pulsar/storm/PulsarTuple.java | 45 ++
.../apache/pulsar/storm/SharedPulsarClient.java | 155 +++++++
.../apache/pulsar/storm/TupleToMessageMapper.java | 66 +++
pulsar-storm/src/main/javadoc/overview.html | 29 ++
.../org/apache/pulsar/storm/PulsarSpoutTest.java | 178 ++++++++
tests/pom.xml | 1 +
tests/pulsar-storm-test/pom.xml | 131 ++++++
.../apache/pulsar/storm/MockOutputCollector.java | 101 +++++
.../pulsar/storm/MockSpoutOutputCollector.java | 80 ++++
.../org/apache/pulsar/storm/PulsarBoltTest.java | 236 ++++++++++
.../org/apache/pulsar/storm/PulsarSpoutTest.java | 349 +++++++++++++++
.../java/org/apache/pulsar/storm/TestUtil.java | 35 ++
.../apache/pulsar/storm/example/StormExample.java | 166 +++++++
24 files changed, 2841 insertions(+), 4 deletions(-)
diff --git a/.asf.yaml b/.asf.yaml
index c04e151..74589a2 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -27,6 +27,7 @@ github:
- streaming
- queuing
- event-streaming
+ - apache-storm
- apache-spark
- apache-kafka
features:
@@ -47,4 +48,4 @@ github:
notifications:
commits: [email protected]
issues: [email protected]
- pullrequests: [email protected]
+ pullrequests: [email protected]
\ No newline at end of file
diff --git a/README.md b/README.md
index 33409df..ce62338 100644
--- a/README.md
+++ b/README.md
@@ -25,8 +25,6 @@ This repository is used for hosting all the adapters
maintained and supported by
[Apache Flink adapter](https://github.com/apache/flink-connector-pulsar) is
supported and maintained by Apache Flink Community.
-[Apache Storm bolt and
spout](https://github.com/apache/storm/tree/master/external/storm-pulsar) are
supported by Apache Storm Community.
-
## Building
In order to build this code you can simply use Maven
@@ -44,5 +42,5 @@ git checkout v2.11.0
mvn clean install -DskipTests
```
-This is because this repository depends on test integration artifacts of the
relative version on the main
+This is because this repository depends on test integration artifacts of the
relative version on the main
Apache Pulsar codebase
diff --git a/pom.xml b/pom.xml
index 5f39f48..4f240d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@
<properties>
<pulsar.version>2.11.0</pulsar.version>
<kafka-client.version>2.7.2</kafka-client.version>
+ <storm.version>2.0.0</storm.version>
<kafka_0_8.version>0.8.1.1</kafka_0_8.version>
<avro.version>1.10.2</avro.version>
<log4j.version>1.2.17</log4j.version>
@@ -139,6 +140,7 @@
</properties>
<modules>
+ <module>pulsar-storm</module>
<module>pulsar-spark</module>
<module>pulsar-client-kafka-compat</module>
<module>pulsar-log4j2-appender</module>
@@ -250,6 +252,22 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-client</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-server</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
@@ -1068,3 +1086,4 @@
</repositories>
</project>
+
diff --git a/pulsar-storm/pom.xml b/pulsar-storm/pom.xml
new file mode 100644
index 0000000..a20649b
--- /dev/null
+++ b/pulsar-storm/pom.xml
@@ -0,0 +1,102 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-adapters</artifactId>
+ <version>2.11.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-storm</artifactId>
+ <name>Pulsar Storm adapter</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.yaml</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-client</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </build>
+</project>
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
new file mode 100644
index 0000000..92e127c
--- /dev/null
+++
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.api.Message;
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
+
+public interface MessageToValuesMapper extends Serializable {
+
+ /**
+ * Convert {@link org.apache.pulsar.client.api.Message} to tuple values.
+ *
+ * @param msg
+ * @return
+ */
+ Values toValues(Message<byte[]> msg);
+
+ /**
+ * Declare the output schema for the spout.
+ *
+ * @param declarer
+ */
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
new file mode 100644
index 0000000..32fa78f
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static java.lang.String.format;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class PulsarBolt extends BaseRichBolt implements IMetric {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarBolt.class);
+
+ public static final String NO_OF_MESSAGES_SENT = "numberOfMessagesSent";
+ public static final String PRODUCER_RATE = "producerRate";
+ public static final String PRODUCER_THROUGHPUT_BYTES =
"producerThroughput";
+
+ private final ClientConfigurationData clientConf;
+ private final ProducerConfigurationData producerConf;
+ private final PulsarBoltConfiguration pulsarBoltConf;
+ private final ConcurrentMap<String, Object> metricsMap = new
ConcurrentHashMap<>();
+
+ private SharedPulsarClient sharedPulsarClient;
+ private String componentId;
+ private String boltId;
+ private OutputCollector collector;
+ private Producer<byte[]> producer;
+ private volatile long messagesSent = 0;
+ private volatile long messageSizeSent = 0;
+
+ public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf) {
+ this(pulsarBoltConf, PulsarClient.builder());
+ }
+
+ public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder
clientBuilder) {
+ this(pulsarBoltConf, ((ClientBuilderImpl)
clientBuilder).getClientConfigurationData().clone(),
+ new ProducerConfigurationData());
+ }
+
+ public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf,
ClientConfigurationData clientConf,
+ ProducerConfigurationData producerConf) {
+ checkNotNull(pulsarBoltConf, "bolt configuration can't be null");
+ checkNotNull(clientConf, "client configuration can't be null");
+ checkNotNull(producerConf, "producer configuration can't be null");
+ Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
+ Objects.requireNonNull(pulsarBoltConf.getTopic());
+ Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
+ this.pulsarBoltConf = pulsarBoltConf;
+ this.clientConf = clientConf;
+ this.producerConf = producerConf;
+ this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
+ this.producerConf.setTopicName(pulsarBoltConf.getTopic());
+ this.producerConf.setBatcherBuilder(null);
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector
collector) {
+ this.componentId = context.getThisComponentId();
+ this.boltId = String.format("%s-%s", componentId,
context.getThisTaskId());
+ this.collector = collector;
+ try {
+ sharedPulsarClient = SharedPulsarClient.get(componentId,
clientConf);
+ producer = sharedPulsarClient.getSharedProducer(producerConf);
+ LOG.info("[{}] Created a pulsar producer on topic {} to send
messages", boltId, pulsarBoltConf.getTopic());
+ } catch (PulsarClientException e) {
+ LOG.error("[{}] Error initializing pulsar producer on topic {}",
boltId, pulsarBoltConf.getTopic(), e);
+ throw new IllegalStateException(
+ format("Failed to initialize producer for %s : %s",
pulsarBoltConf.getTopic(), e.getMessage()), e);
+ }
+ context.registerMetric(String.format("PulsarBoltMetrics-%s-%s",
componentId, context.getThisTaskIndex()), this,
+ pulsarBoltConf.getMetricsTimeIntervalInSecs());
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (TupleUtils.isTick(input)) {
+ collector.ack(input);
+ return;
+ }
+ try {
+ if (producer != null) {
+ // a message key can be provided in the mapper
+ TypedMessageBuilder<byte[]> msgBuilder =
pulsarBoltConf.getTupleToMessageMapper()
+ .toMessage(producer.newMessage(), input);
+ if (msgBuilder == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Cannot send null message, acking the
collector", boltId);
+ }
+ collector.ack(input);
+ } else {
+ final long messageSizeToBeSent =
((TypedMessageBuilderImpl<byte[]>) msgBuilder).getContent()
+ .remaining();
+ msgBuilder.sendAsync().handle((msgId, ex) -> {
+ synchronized (collector) {
+ if (ex != null) {
+ collector.reportError(ex);
+ collector.fail(input);
+ LOG.error("[{}] Message send failed", boltId,
ex);
+
+ } else {
+ collector.ack(input);
+ ++messagesSent;
+ messageSizeSent += messageSizeToBeSent;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Message sent with id {}",
boltId, msgId);
+ }
+ }
+ }
+
+ return null;
+ });
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("[{}] Message processing failed", boltId, e);
+ collector.reportError(e);
+ collector.fail(input);
+ }
+ }
+
+ public void close() {
+ try {
+ LOG.info("[{}] Closing Pulsar producer on topic {}", boltId,
pulsarBoltConf.getTopic());
+ if (sharedPulsarClient != null) {
+ sharedPulsarClient.close();
+ }
+ } catch (PulsarClientException e) {
+ LOG.error("[{}] Error closing Pulsar producer on topic {}",
boltId, pulsarBoltConf.getTopic(), e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ close();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ pulsarBoltConf.getTupleToMessageMapper().declareOutputFields(declarer);
+ }
+
+ /**
+ * Helpers for metrics.
+ */
+
+ @SuppressWarnings({ "rawtypes" })
+ ConcurrentMap getMetrics() {
+ metricsMap.put(NO_OF_MESSAGES_SENT, messagesSent);
+ metricsMap.put(PRODUCER_RATE, ((double) messagesSent) /
pulsarBoltConf.getMetricsTimeIntervalInSecs());
+ metricsMap.put(PRODUCER_THROUGHPUT_BYTES,
+ ((double) messageSizeSent) /
pulsarBoltConf.getMetricsTimeIntervalInSecs());
+ return metricsMap;
+ }
+
+ void resetMetrics() {
+ messagesSent = 0;
+ messageSizeSent = 0;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Object getValueAndReset() {
+ ConcurrentMap metrics = getMetrics();
+ resetMetrics();
+ return metrics;
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
new file mode 100644
index 0000000..714e435
--- /dev/null
+++
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.Objects;
+
+/**
+ * Class used to specify Pulsar bolt configuration
+ *
+ *
+ */
+public class PulsarBoltConfiguration extends PulsarStormConfiguration {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private TupleToMessageMapper tupleToMessageMapper = null;
+
+ /**
+ * @return the mapper to convert storm tuples to a pulsar message
+ */
+ public TupleToMessageMapper getTupleToMessageMapper() {
+ return tupleToMessageMapper;
+ }
+
+ /**
+ * Sets the mapper to convert storm tuples to a pulsar message
+ * <p>
+ * Note: If the mapper returns null, the message is not sent by the
producer and is acked immediately on the
+ * collector
+ * </p>
+ *
+ * @param mapper
+ */
+ public void setTupleToMessageMapper(TupleToMessageMapper mapper) {
+ this.tupleToMessageMapper = Objects.requireNonNull(mapper);
+ }
+
+}
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
new file mode 100644
index 0000000..8ed090e
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarSpout extends BaseRichSpout implements IMetric {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarSpout.class);
+
+ public static final String NO_OF_PENDING_FAILED_MESSAGES =
"numberOfPendingFailedMessages";
+ public static final String NO_OF_MESSAGES_RECEIVED =
"numberOfMessagesReceived";
+ public static final String NO_OF_MESSAGES_EMITTED =
"numberOfMessagesEmitted";
+ public static final String NO_OF_MESSAGES_FAILED =
"numberOfMessagesFailed";
+ public static final String MESSAGE_NOT_AVAILABLE_COUNT =
"messageNotAvailableCount";
+ public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks";
+ public static final String CONSUMER_RATE = "consumerRate";
+ public static final String CONSUMER_THROUGHPUT_BYTES =
"consumerThroughput";
+
+ private final ClientConfigurationData clientConf;
+ private final PulsarSpoutConfiguration pulsarSpoutConf;
+ private final ConsumerConfigurationData<byte[]> consumerConf;
+ private final long failedRetriesTimeoutNano;
+ private final int maxFailedRetries;
+ private final ConcurrentMap<MessageId, MessageRetries>
pendingMessageRetries = new ConcurrentHashMap<>();
+ private final Queue<Message<byte[]>> failedMessages = new
ConcurrentLinkedQueue<>();
+ private final ConcurrentMap<String, Object> metricsMap = new
ConcurrentHashMap<>();
+
+ private SharedPulsarClient sharedPulsarClient;
+ private String componentId;
+ private String spoutId;
+ private SpoutOutputCollector collector;
+ private PulsarSpoutConsumer consumer;
+ private volatile long messagesReceived = 0;
+ private volatile long messagesEmitted = 0;
+ private volatile long messagesFailed = 0;
+ private volatile long messageNotAvailableCount = 0;
+ private volatile long pendingAcks = 0;
+ private volatile long messageSizeReceived = 0;
+
+ public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf) {
+ this(pulsarSpoutConf, PulsarClient.builder());
+ }
+
+ public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder
clientBuilder) {
+ this(pulsarSpoutConf, ((ClientBuilderImpl)
clientBuilder).getClientConfigurationData().clone(),
+ new ConsumerConfigurationData<byte[]>());
+ }
+
+ public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf,
ClientConfigurationData clientConfig,
+ ConsumerConfigurationData<byte[]> consumerConfig) {
+ Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
+ Objects.requireNonNull(pulsarSpoutConf.getTopic());
+ Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
+ Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
+
+ checkNotNull(pulsarSpoutConf, "spout configuration can't be null");
+ checkNotNull(clientConfig, "client configuration can't be null");
+ checkNotNull(consumerConfig, "consumer configuration can't be null");
+ this.clientConf = clientConfig;
+ this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
+ this.consumerConf = consumerConfig;
+ this.pulsarSpoutConf = pulsarSpoutConf;
+ this.failedRetriesTimeoutNano =
pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
+ this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
+ }
+
+ @Override
+ public void close() {
+ try {
+ LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId,
pulsarSpoutConf.getTopic());
+
+ if (pulsarSpoutConf.isAutoUnsubscribe()) {
+ try {
+ consumer.unsubscribe();
+ } catch (PulsarClientException e) {
+ LOG.error("[{}] Failed to unsubscribe {} on topic {}",
spoutId,
+ this.pulsarSpoutConf.getSubscriptionName(),
pulsarSpoutConf.getTopic(), e);
+ }
+ }
+
+ if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer !=
null) {
+ consumer.close();
+ }
+ if (sharedPulsarClient != null) {
+ sharedPulsarClient.close();
+ }
+ pendingMessageRetries.clear();
+ failedMessages.clear();
+ } catch (PulsarClientException e) {
+ LOG.error("[{}] Error closing Pulsar consumer for topic {}",
spoutId, pulsarSpoutConf.getTopic(), e);
+ }
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ if (msgId instanceof Message) {
+ Message<?> msg = (Message<?>) msgId;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Received ack for message {}", spoutId,
msg.getMessageId());
+ }
+ consumer.acknowledgeAsync(msg);
+ pendingMessageRetries.remove(msg.getMessageId());
+ // we should also remove message from failedMessages but it will be
+ // eventually removed while emitting next
+ // tuple
+ --pendingAcks;
+ }
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ if (msgId instanceof Message) {
+ @SuppressWarnings("unchecked")
+ Message<byte[]> msg = (Message<byte[]>) msgId;
+ MessageId id = msg.getMessageId();
+ LOG.warn("[{}] Error processing message {}", spoutId, id);
+
+ // Since the message processing failed, we put it in the failed
+ // messages queue if there are more retries
+ // remaining for the message
+ MessageRetries messageRetries =
pendingMessageRetries.computeIfAbsent(id, (k) -> new MessageRetries());
+ if ((failedRetriesTimeoutNano < 0
+ || (messageRetries.getTimeStamp() +
failedRetriesTimeoutNano) > System.nanoTime())
+ && (maxFailedRetries < 0 || messageRetries.numRetries <
maxFailedRetries)) {
+ // since we can retry again, we increment retry count and put
it
+ // in the queue
+ LOG.info("[{}] Putting message {} in the retry queue",
spoutId, id);
+ messageRetries.incrementAndGet();
+ pendingMessageRetries.putIfAbsent(id, messageRetries);
+ failedMessages.add(msg);
+ --pendingAcks;
+ messagesFailed++;
+ } else {
+ LOG.warn("[{}] Number of retries limit reached, dropping the
message {}", spoutId, id);
+ ack(msg);
+ }
+ }
+
+ }
+
+ /**
+ * Emits a tuple received from the Pulsar consumer unless there are any
+ * failed messages.
+ */
+ @Override
+ public void nextTuple() {
+ emitNextAvailableTuple();
+ }
+
+ /**
+ * It makes sure that it emits next available non-tuple to topology unless
+ * consumer queue doesn't have any message available. It receives message
+ * from consumer queue and converts it to tuple and emits to topology. if
+ * the converted tuple is null then it tries to receives next message and
+ * perform the same until it finds non-tuple to emit.
+ */
+ public void emitNextAvailableTuple() {
+ // check if there are any failed messages to re-emit in the topology
+ if (emitFailedMessage()) {
+ return;
+ }
+
+ Message<byte[]> msg;
+ // receive from consumer if no failed messages
+ if (consumer != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Receiving the next message from pulsar
consumer to emit to the collector", spoutId);
+ }
+ try {
+ boolean done = false;
+ while (!done) {
+ msg = consumer.receive(100, TimeUnit.MILLISECONDS);
+ if (msg != null) {
+ ++messagesReceived;
+ messageSizeReceived += msg.getData().length;
+ done = mapToValueAndEmit(msg);
+ } else {
+ // queue is empty and nothing to emit
+ done = true;
+ messageNotAvailableCount++;
+ }
+ }
+ } catch (PulsarClientException e) {
+ LOG.error("[{}] Error receiving message from pulsar consumer",
spoutId, e);
+ }
+ }
+ }
+
+ private boolean emitFailedMessage() {
+ Message<byte[]> msg;
+
+ while ((msg = failedMessages.peek()) != null) {
+ MessageRetries messageRetries =
pendingMessageRetries.get(msg.getMessageId());
+ if (messageRetries != null) {
+ // emit the tuple if retry doesn't need backoff else sleep with
+ // backoff time and return without doing
+ // anything
+ if (Backoff.shouldBackoff(messageRetries.getTimeStamp(),
TimeUnit.NANOSECONDS,
+ messageRetries.getNumRetries(),
clientConf.getInitialBackoffIntervalNanos(),
+ clientConf.getMaxBackoffIntervalNanos())) {
+
Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getInitialBackoffIntervalNanos()));
+ } else {
+ // remove the message from the queue and emit to the
+ // topology, only if it should not be backedoff
+ LOG.info("[{}] Retrying failed message {}", spoutId,
msg.getMessageId());
+ failedMessages.remove();
+ mapToValueAndEmit(msg);
+ }
+ return true;
+ }
+
+ // messageRetries is null because messageRetries is already acked
+ // and removed from pendingMessageRetries
+ // then remove it from failed message queue as well.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}]-{} removing {} from failedMessage because it's
already acked",
+ pulsarSpoutConf.getTopic(), spoutId,
msg.getMessageId());
+ }
+ failedMessages.remove();
+ // try to find out next failed message
+ continue;
+ }
+ return false;
+ }
+
+ @Override
+ @SuppressWarnings({ "rawtypes" })
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector
collector) {
+ this.componentId = context.getThisComponentId();
+ this.spoutId = String.format("%s-%s", componentId,
context.getThisTaskId());
+ this.collector = collector;
+ pendingMessageRetries.clear();
+ failedMessages.clear();
+ try {
+ consumer = createConsumer();
+ LOG.info("[{}] Created a pulsar consumer on topic {} to receive
messages with subscription {}", spoutId,
+ pulsarSpoutConf.getTopic(),
pulsarSpoutConf.getSubscriptionName());
+ } catch (PulsarClientException e) {
+ LOG.error("[{}] Error creating pulsar consumer on topic {}",
spoutId, pulsarSpoutConf.getTopic(), e);
+ throw new IllegalStateException(format("Failed to initialize
consumer for %s-%s : %s",
+ pulsarSpoutConf.getTopic(),
pulsarSpoutConf.getSubscriptionName(), e.getMessage()), e);
+ }
+ context.registerMetric(String.format("PulsarSpoutMetrics-%s-%s",
componentId, context.getThisTaskIndex()), this,
+ pulsarSpoutConf.getMetricsTimeIntervalInSecs());
+ }
+
+ private PulsarSpoutConsumer createConsumer() throws PulsarClientException {
+ sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
+ PulsarSpoutConsumer consumer;
+ if (pulsarSpoutConf.isSharedConsumerEnabled()) {
+ consumer = pulsarSpoutConf.isDurableSubscription()
+ ? new
SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration()))
+ : new
SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration()));
+ } else {
+ try {
+ consumer = pulsarSpoutConf.isDurableSubscription()
+ ? new SpoutConsumer(
+
sharedPulsarClient.getClient().subscribeAsync(newConsumerConfiguration()).join())
+ : new SpoutReader(
+
sharedPulsarClient.getClient().createReaderAsync(newReaderConfiguration()).join());
+ } catch (CompletionException e) {
+ throw (PulsarClientException) e.getCause();
+ }
+ }
+ return consumer;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(declarer);
+
+ }
+
+ private boolean mapToValueAndEmit(Message<byte[]> msg) {
+ if (msg != null) {
+ Values values =
pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
+ ++pendingAcks;
+ if (values == null) {
+ // since the mapper returned null, we can drop the message and
+ // ack it immediately
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Dropping message {}", spoutId,
msg.getMessageId());
+ }
+ ack(msg);
+ } else {
+ if (values instanceof PulsarTuple) {
+ collector.emit(((PulsarTuple) values).getOutputStream(),
values, msg);
+ } else {
+ collector.emit(values, msg);
+ }
+ ++messagesEmitted;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Emitted message {} to the collector",
spoutId, msg.getMessageId());
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public class MessageRetries {
+ private final long timestampInNano;
+ private int numRetries;
+
+ public MessageRetries() {
+ this.timestampInNano = System.nanoTime();
+ this.numRetries = 0;
+ }
+
+ public long getTimeStamp() {
+ return timestampInNano;
+ }
+
+ public int incrementAndGet() {
+ return ++numRetries;
+ }
+
+ public int getNumRetries() {
+ return numRetries;
+ }
+ }
+
+ /**
+ * Helpers for metrics.
+ */
+
+ @SuppressWarnings({ "rawtypes" })
+ ConcurrentMap getMetrics() {
+ metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, (long)
pendingMessageRetries.size());
+ metricsMap.put(NO_OF_MESSAGES_RECEIVED, messagesReceived);
+ metricsMap.put(NO_OF_MESSAGES_EMITTED, messagesEmitted);
+ metricsMap.put(NO_OF_MESSAGES_FAILED, messagesFailed);
+ metricsMap.put(MESSAGE_NOT_AVAILABLE_COUNT, messageNotAvailableCount);
+ metricsMap.put(NO_OF_PENDING_ACKS, pendingAcks);
+ metricsMap.put(CONSUMER_RATE, ((double) messagesReceived) /
pulsarSpoutConf.getMetricsTimeIntervalInSecs());
+ metricsMap.put(CONSUMER_THROUGHPUT_BYTES,
+ ((double) messageSizeReceived) /
pulsarSpoutConf.getMetricsTimeIntervalInSecs());
+ return metricsMap;
+ }
+
+ void resetMetrics() {
+ messagesReceived = 0;
+ messagesEmitted = 0;
+ messageSizeReceived = 0;
+ messagesFailed = 0;
+ messageNotAvailableCount = 0;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Object getValueAndReset() {
+ ConcurrentMap metrics = getMetrics();
+ resetMetrics();
+ return metrics;
+ }
+
+ private ReaderConfigurationData<byte[]> newReaderConfiguration() {
+ ReaderConfigurationData<byte[]> readerConf = new
ReaderConfigurationData<>();
+ readerConf.setTopicName(pulsarSpoutConf.getTopic());
+ readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName());
+
readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition());
+ if (this.consumerConf != null) {
+
readerConf.setCryptoFailureAction(consumerConf.getCryptoFailureAction());
+ readerConf.setCryptoKeyReader(consumerConf.getCryptoKeyReader());
+ readerConf.setReadCompacted(consumerConf.isReadCompacted());
+
readerConf.setReceiverQueueSize(consumerConf.getReceiverQueueSize());
+ }
+ return readerConf;
+ }
+
+ private ConsumerConfigurationData<byte[]> newConsumerConfiguration() {
+ ConsumerConfigurationData<byte[]> consumerConf = this.consumerConf !=
null ? this.consumerConf
+ : new ConsumerConfigurationData<>();
+
consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
+
consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
+
consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
+ return consumerConf;
+ }
+
+ static class SpoutConsumer implements PulsarSpoutConsumer {
+ private Consumer<byte[]> consumer;
+
+ SpoutConsumer(Consumer<byte[]> consumer) {
+ super();
+ this.consumer = consumer;
+ }
+
+ @Override
+ public Message<byte[]> receive(int timeout, TimeUnit unit) throws
PulsarClientException {
+ return consumer.receive(timeout, unit);
+ }
+
+ @Override
+ public void acknowledgeAsync(Message<?> msg) {
+ consumer.acknowledgeAsync(msg);
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ consumer.close();
+ }
+
+ @Override
+ public void unsubscribe() throws PulsarClientException {
+ consumer.unsubscribe();
+ }
+
+ }
+
+ static class SpoutReader implements PulsarSpoutConsumer {
+ private Reader<byte[]> reader;
+
+ SpoutReader(Reader<byte[]> reader) {
+ super();
+ this.reader = reader;
+ }
+
+ @Override
+ public Message<byte[]> receive(int timeout, TimeUnit unit) throws
PulsarClientException {
+ return reader.readNext(timeout, unit);
+ }
+
+ @Override
+ public void acknowledgeAsync(Message<?> msg) {
+ // No-op
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public void unsubscribe() throws PulsarClientException {
+ // No-op
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
new file mode 100644
index 0000000..db797ee
--- /dev/null
+++
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+/**
+ * Class used to specify pulsar spout configuration
+ *
+ *
+ */
+public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public static final long DEFAULT_FAILED_RETRIES_TIMEOUT_NANO =
TimeUnit.SECONDS.toNanos(60);
+ public static final int DEFAULT_MAX_FAILED_RETRIES = -1;
+
+ private String subscriptionName = null;
+ private MessageToValuesMapper messageToValuesMapper = null;
+ private long failedRetriesTimeoutNano =
DEFAULT_FAILED_RETRIES_TIMEOUT_NANO;
+ private int maxFailedRetries = DEFAULT_MAX_FAILED_RETRIES;
+ private boolean sharedConsumerEnabled = false;
+
+ private SubscriptionType subscriptionType = SubscriptionType.Shared;
+ private boolean autoUnsubscribe = false;
+ private boolean durableSubscription = true;
+ // read position if non-durable subscription is enabled : default oldest
message available in topic
+ private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest;
+
+
+ /**
+ * @return the subscription name for the consumer in the spout
+ */
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ /**
+ * Sets the subscription name for the consumer in the spout
+ *
+ * @param subscriptionName
+ */
+ public void setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ }
+
+ public SubscriptionType getSubscriptionType() {
+ return subscriptionType;
+ }
+
+ public void setSubscriptionType(SubscriptionType subscriptionType) {
+ this.subscriptionType = subscriptionType;
+ }
+
+ /**
+ * @return the mapper to convert pulsar message to a storm tuple
+ */
+ public MessageToValuesMapper getMessageToValuesMapper() {
+ return messageToValuesMapper;
+ }
+
+ /**
+ * Sets the mapper to convert pulsar message to a storm tuple.
+ * <p>
+ * Note: If the mapper returns null, the message is not emitted to the
collector and is acked immediately
+ * </p>
+ *
+ * @param mapper
+ */
+ public void setMessageToValuesMapper(MessageToValuesMapper mapper) {
+ this.messageToValuesMapper = Objects.requireNonNull(mapper);
+ }
+
+ /**
+ *
+ * @param unit
+ * @return the timeout for retrying failed messages
+ */
+ public long getFailedRetriesTimeout(TimeUnit unit) {
+ return unit.convert(failedRetriesTimeoutNano, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * Sets the timeout within which the spout will re-inject failed messages
with an exponential backoff <i>(default:
+ * 60 seconds)</i> Note: If set to 0, the message will not be retried when
failed. If set to < 0, the message will
+ * be retried forever till it is successfully processed or max message
retry count is reached, whichever comes
+ * first.
+ *
+ * @param failedRetriesTimeout
+ * @param unit
+ */
+ public void setFailedRetriesTimeout(long failedRetriesTimeout, TimeUnit
unit) {
+ this.failedRetriesTimeoutNano = unit.toNanos(failedRetriesTimeout);
+ }
+
+ /**
+ *
+ * @return the maximum number of times a failed message will be retried
+ */
+ public int getMaxFailedRetries() {
+ return maxFailedRetries;
+ }
+
+ /**
+ * Sets the maximum number of times the spout will re-inject failed
messages with an exponential backoff
+ * <i>(default: -1)</i> Note: If set to 0, the message will not be retried
when failed. If set to < 0, the message
+ * will be retried forever till it is successfully processed or configured
timeout expires, whichever comes first.
+ *
+ * @param maxFailedRetries
+ */
+ public void setMaxFailedRetries(int maxFailedRetries) {
+ this.maxFailedRetries = maxFailedRetries;
+ }
+
+ /**
+ *
+ * @return if the consumer is shared across different executors of a spout
+ */
+ public boolean isSharedConsumerEnabled() {
+ return sharedConsumerEnabled;
+ }
+
+ /**
+ * Sets whether the consumer will be shared across different executors of
a spout. <i>(default: false)</i>
+ *
+ * @param sharedConsumerEnabled
+ */
+ public void setSharedConsumerEnabled(boolean sharedConsumerEnabled) {
+ this.sharedConsumerEnabled = sharedConsumerEnabled;
+ }
+
+ public boolean isAutoUnsubscribe() {
+ return autoUnsubscribe;
+ }
+
+ /**
+ * It unsubscribes the subscription when spout gets closed in the topology.
+ *
+ * @param autoUnsubscribe
+ */
+ public void setAutoUnsubscribe(boolean autoUnsubscribe) {
+ this.autoUnsubscribe = autoUnsubscribe;
+ }
+
+ public boolean isDurableSubscription() {
+ return durableSubscription;
+ }
+
+ /**
+ * if subscription is not durable then it creates non-durable reader to
start reading from the
+ * {@link #setNonDurableSubscriptionReadPosition(MessagePosition)} in
topic.
+ *
+ * @param durableSubscription
+ */
+ public void setDurableSubscription(boolean durableSubscription) {
+ this.durableSubscription = durableSubscription;
+ }
+
+ public MessageId getNonDurableSubscriptionReadPosition() {
+ return nonDurableSubscriptionReadPosition;
+ }
+
+ /**
+ * Non-durable-subscription/Reader can be set to start reading from a
specific position earliest/latest.
+ *
+ * @param nonDurableSubscriptionReadPosition
+ */
+ public void setNonDurableSubscriptionReadPosition(MessageId
nonDurableSubscriptionReadPosition) {
+ this.nonDurableSubscriptionReadPosition =
nonDurableSubscriptionReadPosition;
+ }
+}
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
new file mode 100644
index 0000000..5502a62
--- /dev/null
+++
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public interface PulsarSpoutConsumer {
+
+ /**
+ * Receives a single message.
+ *
+ * @param waitTime
+ * @param unit
+ * @return
+ * @throws PulsarClientException
+ */
+ Message<byte[]> receive(int waitTime, TimeUnit unit) throws
PulsarClientException;
+
+ /**
+ * Ack the message async.
+ *
+ * @param msg
+ */
+ void acknowledgeAsync(Message<?> msg);
+
+ /**
+ * unsubscribe the consumer
+ * @throws PulsarClientException
+ */
+ void unsubscribe() throws PulsarClientException;
+
+ /**
+ * Close the consumer
+ *
+ * @throws PulsarClientException
+ */
+ void close() throws PulsarClientException;
+
+}
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java
new file mode 100644
index 0000000..7082bf2
--- /dev/null
+++
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.io.Serializable;
+
+/**
+ * Class used to specify pulsar storm configurations like service url and topic
+ *
+ *
+ */
+public class PulsarStormConfiguration implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final int DEFAULT_METRICS_TIME_INTERVAL_IN_SECS = 60;
+
+ private String serviceUrl = null;
+ private String topic = null;
+ private int metricsTimeIntervalInSecs =
DEFAULT_METRICS_TIME_INTERVAL_IN_SECS;
+
+ /**
+ * Get service url.
+ * @return the service URL to connect to from the client.
+ */
+ public String getServiceUrl() {
+ return serviceUrl;
+ }
+
+ /**
+ * Sets the service URL to connect to from the client.
+ *
+ * @param serviceUrl - service url
+ */
+ public void setServiceUrl(String serviceUrl) {
+ this.serviceUrl = serviceUrl;
+ }
+
+ /**
+ * Get topic.
+ * @return the topic name for the producer/consumer.
+ */
+ public String getTopic() {
+ return topic;
+ }
+
+ /**
+ * Sets the topic name for the producer/consumer. It should be of the
format
+ * {persistent|non-persistent}://{property}/{cluster}/{namespace}/{topic}.
+ *
+ * @param topic - topic name
+ */
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * Get metrics interval.
+ * @return the time interval in seconds for metrics generation.
+ */
+ public int getMetricsTimeIntervalInSecs() {
+ return metricsTimeIntervalInSecs;
+ }
+
+ /**
+ * Sets the time interval in seconds for metrics generation <i>(default:
60 seconds)</i>.
+ *
+ * @param metricsTimeIntervalInSecs - metrics interval in sec.
+ */
+ public void setMetricsTimeIntervalInSecs(int metricsTimeIntervalInSecs) {
+ this.metricsTimeIntervalInSecs = metricsTimeIntervalInSecs;
+ }
+
+}
\ No newline at end of file
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
new file mode 100644
index 0000000..b000827
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+
+import org.apache.storm.tuple.Values;
+
+/**
+ * Returned by MessageToValuesMapper, this specifies the Values
+ * for an output tuple and the stream it should be sent to.
+ */
+public class PulsarTuple extends Values {
+
+ protected final String outputStream;
+
+ public PulsarTuple(String outStream, Object ... values) {
+ super(values);
+ outputStream = outStream;
+ }
+
+ /**
+ * Return stream the tuple should be emitted on.
+ *
+ * @return String
+ */
+ public String getOutputStream() {
+ return outputStream;
+ }
+}
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
new file mode 100644
index 0000000..ce7bfb9
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SharedPulsarClient {
+ private static final Logger LOG =
LoggerFactory.getLogger(SharedPulsarClient.class);
+ private static final ConcurrentMap<String, SharedPulsarClient> instances =
new ConcurrentHashMap<>();
+
+ private final String componentId;
+ private final PulsarClientImpl client;
+ private final AtomicInteger counter = new AtomicInteger();
+
+ private Consumer<byte[]> consumer;
+ private Reader<byte[]> reader;
+ private Producer<byte[]> producer;
+
+ private SharedPulsarClient(String componentId, ClientConfigurationData
clientConf) throws PulsarClientException {
+ this.client = new PulsarClientImpl(clientConf);
+ this.componentId = componentId;
+ }
+
+ /**
+ * Provides a shared pulsar client that is shared across all different
tasks
+ * in the same component. Different components will not share the pulsar
+ * client since they can have different configurations.
+ *
+ * @param componentId
+ * - the id of the spout/bolt
+ * @param clientConf
+ * - client config
+ * @return SharedPulsarClient
+ * @throws PulsarClientException
+ * in case of an error
+ */
+ public static SharedPulsarClient get(String componentId,
ClientConfigurationData clientConf)
+ throws PulsarClientException {
+ AtomicReference<PulsarClientException> exception = new
AtomicReference<PulsarClientException>();
+ instances.computeIfAbsent(componentId, pulsarClient -> {
+ SharedPulsarClient sharedPulsarClient = null;
+ try {
+ sharedPulsarClient = new SharedPulsarClient(componentId,
clientConf);
+ LOG.info("[{}] Created a new Pulsar Client.", componentId);
+ } catch (PulsarClientException e) {
+ exception.set(e);
+ }
+ return sharedPulsarClient;
+ });
+ if (exception.get() != null) {
+ throw exception.get();
+ }
+ return instances.get(componentId);
+ }
+
+ public PulsarClientImpl getClient() {
+ counter.incrementAndGet();
+ return client;
+ }
+
+ public Consumer<byte[]>
getSharedConsumer(ConsumerConfigurationData<byte[]> consumerConf)
+ throws PulsarClientException {
+ counter.incrementAndGet();
+ synchronized (this) {
+ if (consumer == null) {
+ try {
+ consumer = client.subscribeAsync(consumerConf).join();
+ } catch (CompletionException e) {
+ throw (PulsarClientException) e.getCause();
+ }
+ LOG.info("[{}] Created a new Pulsar Consumer on {}",
componentId, consumerConf.getSingleTopic());
+ } else {
+ LOG.info("[{}] Using a shared consumer on {}", componentId,
consumerConf.getSingleTopic());
+ }
+ }
+ return consumer;
+ }
+
+ public Reader<byte[]> getSharedReader(ReaderConfigurationData<byte[]>
readerConf) throws PulsarClientException {
+ counter.incrementAndGet();
+ synchronized (this) {
+ if (reader == null) {
+ try {
+ reader = client.createReaderAsync(readerConf).join();
+ } catch (CompletionException e) {
+ throw (PulsarClientException) e.getCause();
+ }
+ LOG.info("[{}] Created a new Pulsar reader on {}",
componentId, readerConf.getTopicName());
+ } else {
+ LOG.info("[{}] Using a shared reader on {}", componentId,
readerConf.getTopicName());
+ }
+ }
+ return reader;
+ }
+
+ public Producer<byte[]> getSharedProducer(ProducerConfigurationData
producerConf) throws PulsarClientException {
+ counter.incrementAndGet();
+ synchronized (this) {
+ if (producer == null) {
+ try {
+ producer = client.createProducerAsync(producerConf).join();
+ } catch (CompletionException e) {
+ throw (PulsarClientException) e.getCause();
+ }
+ LOG.info("[{}] Created a new Pulsar Producer on {}",
componentId, producerConf.getTopicName());
+ } else {
+ LOG.info("[{}] Using a shared producer on {}", componentId,
producerConf.getTopicName());
+ }
+ }
+ return producer;
+ }
+
+ public void close() throws PulsarClientException {
+ if (counter.decrementAndGet() <= 0) {
+ if (client != null) {
+ client.close();
+ instances.remove(componentId);
+ LOG.info("[{}] Closed Pulsar Client", componentId);
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
new file mode 100644
index 0000000..452e0ce
--- /dev/null
+++
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+
+public interface TupleToMessageMapper extends Serializable {
+
+ /**
+ * Convert tuple to {@link org.apache.pulsar.client.api.Message}.
+ *
+ * @param tuple
+ * @return
+ * @deprecated use {@link #toMessage(TypedMessageBuilder, Tuple)}
+ */
+ @Deprecated
+ default Message<byte[]> toMessage(Tuple tuple) {
+ return null;
+ }
+
+ /**
+ * Set the value on a message builder to prepare the message to be
published from the Bolt.
+ *
+ * @param tuple
+ * @return
+ */
+ default TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]>
msgBuilder, Tuple tuple) {
+ // Default implementation provided for backward compatibility
+ Message<byte[]> msg = toMessage(tuple);
+ msgBuilder.value(msg.getData())
+ .properties(msg.getProperties());
+ if (msg.hasKey()) {
+ msgBuilder.key(msg.getKey());
+ }
+ return msgBuilder;
+ }
+
+
+ /**
+ * Declare the output schema for the bolt.
+ *
+ * @param declarer
+ */
+ public void declareOutputFields(OutputFieldsDeclarer declarer);
+}
diff --git a/pulsar-storm/src/main/javadoc/overview.html
b/pulsar-storm/src/main/javadoc/overview.html
new file mode 100644
index 0000000..a1595eb
--- /dev/null
+++ b/pulsar-storm/src/main/javadoc/overview.html
@@ -0,0 +1,29 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
+<HTML>
+ <HEAD>
+ <TITLE>Pulsar Storm API Overview</TITLE>
+ </HEAD>
+ <BODY>
+ The Pulsar Storm API is a proprietary messaging API.
+ </BODY>
+</HTML>
diff --git
a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
new file mode 100644
index 0000000..e6cbc51
--- /dev/null
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class PulsarSpoutTest {
+
+ @Test
+ public void testAckFailedMessage() throws Exception {
+
+ PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
+ conf.setServiceUrl("http://localhost:8080");
+ conf.setSubscriptionName("sub1");
+ conf.setTopic("persistent://prop/ns1/topic1");
+ conf.setSubscriptionType(SubscriptionType.Exclusive);
+ conf.setMessageToValuesMapper(new MessageToValuesMapper() {
+ @Override
+ public Values toValues(Message<byte[]> msg) {
+ return null;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ });
+
+ ClientBuilder builder = spy(new ClientBuilderImpl());
+ PulsarSpout spout = spy(new PulsarSpout(conf, builder));
+
+ Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1",
Maps.newHashMap(),
+ new byte[0], Schema.BYTES, new MessageMetadata());
+ Consumer<byte[]> consumer = mock(Consumer.class);
+ SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.complete(null);
+ doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId());
+ Field consField = PulsarSpout.class.getDeclaredField("consumer");
+ consField.setAccessible(true);
+ consField.set(spout, spoutConsumer);
+
+ spout.fail(msg);
+ spout.ack(msg);
+ spout.emitNextAvailableTuple();
+ verify(consumer, atLeast(1)).receive(anyInt(), any());
+ }
+
+ @Test
+ public void testPulsarTuple() throws Exception {
+ testPulsarSpout(true);
+ }
+
+ @Test
+ public void testPulsarSpout() throws Exception {
+ testPulsarSpout(false);
+ }
+
+ public void testPulsarSpout(boolean pulsarTuple) throws Exception {
+ PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
+ conf.setServiceUrl("http://localhost:8080");
+ conf.setSubscriptionName("sub1");
+ conf.setTopic("persistent://prop/ns1/topic1");
+ conf.setSubscriptionType(SubscriptionType.Exclusive);
+ conf.setSharedConsumerEnabled(true);
+ AtomicBoolean called = new AtomicBoolean(false);
+ conf.setMessageToValuesMapper(new MessageToValuesMapper() {
+ @Override
+ public Values toValues(Message<byte[]> msg) {
+ called.set(true);
+ if ("message to be dropped".equals(new String(msg.getData())))
{
+ return null;
+ }
+ String val = new String(msg.getData());
+ if (val.startsWith("stream:")) {
+ String stream = val.split(":")[1];
+ return new PulsarTuple(stream, val);
+ }
+ return new Values(val);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ });
+
+ String msgContent = pulsarTuple ? "stream:pstream" : "test";
+
+ ClientBuilder builder = spy(new ClientBuilderImpl());
+ PulsarSpout spout = spy(new PulsarSpout(conf, builder));
+ TopologyContext context = mock(TopologyContext.class);
+ final String componentId = "test-component-id";
+ doReturn(componentId).when(context).getThisComponentId();
+ SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+ Map config = new HashMap<>();
+ Field field = SharedPulsarClient.class.getDeclaredField("instances");
+ field.setAccessible(true);
+ ConcurrentMap<String, SharedPulsarClient> instances =
(ConcurrentMap<String, SharedPulsarClient>) field
+ .get(SharedPulsarClient.class);
+
+ SharedPulsarClient client = mock(SharedPulsarClient.class);
+ Consumer<byte[]> consumer = mock(Consumer.class);
+ when(client.getSharedConsumer(any())).thenReturn(consumer);
+ instances.put(componentId, client);
+
+ Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1",
Maps.newHashMap(),
+ msgContent.getBytes(), Schema.BYTES, new MessageMetadata());
+ when(consumer.receive(anyInt(), any())).thenReturn(msg);
+
+ spout.open(config, context, collector);
+ spout.emitNextAvailableTuple();
+
+ assertTrue(called.get());
+ verify(consumer, atLeast(1)).receive(anyInt(), any());
+ ArgumentCaptor<Values> capt = ArgumentCaptor.forClass(Values.class);
+ if (pulsarTuple) {
+ verify(collector, times(1)).emit(eq("pstream"), capt.capture(),
eq(msg));
+ } else {
+ verify(collector, times(1)).emit(capt.capture(), eq(msg));
+ }
+ Values vals = capt.getValue();
+ assertEquals(msgContent, vals.get(0));
+ }
+
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index 0844bae..18ccb15 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -33,6 +33,7 @@
<name>Apache Pulsar Adapters :: Tests</name>
<modules>
<module>pulsar-kafka-compat-client-test</module>
+ <module>pulsar-storm-test</module>
<module>pulsar-spark-test</module>
</modules>
<build>
diff --git a/tests/pulsar-storm-test/pom.xml b/tests/pulsar-storm-test/pom.xml
new file mode 100644
index 0000000..3134328
--- /dev/null
+++ b/tests/pulsar-storm-test/pom.xml
@@ -0,0 +1,131 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar.tests</groupId>
+ <artifactId>adapters-tests-parent</artifactId>
+ <version>2.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-storm-test</artifactId>
+ <packaging>jar</packaging>
+ <name>Pulsar Storm adapter Tests</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-storm</artifactId>
+ <version>2.11.0-SNAPSHOT</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-server</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>buildtools</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>testmocks</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.asynchttpclient</groupId>
+ <artifactId>async-http-client</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
new file mode 100644
index 0000000..4355ad6
--- /dev/null
+++
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.Tuple;
+
+public class MockOutputCollector implements IOutputCollector {
+
+ private boolean acked = false;
+ private boolean failed = false;
+ private Throwable lastError = null;
+ private Tuple ackedTuple = null;
+ private int numTuplesAcked = 0;
+
+ @Override
+ public void reportError(Throwable error) {
+ lastError = error;
+ }
+
+ @Override
+ public List<Integer> emit(String streamId, Collection<Tuple> anchors,
List<Object> tuple) {
+ return null;
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, Collection<Tuple>
anchors, List<Object> tuple) {
+ }
+
+ @Override
+ public void ack(Tuple input) {
+ acked = true;
+ failed = false;
+ ackedTuple = input;
+ ++numTuplesAcked;
+ }
+
+ @Override
+ public void fail(Tuple input) {
+ failed = true;
+ acked = false;
+ }
+
+ @Override
+ public void resetTimeout(Tuple tuple) {
+
+ }
+
+ public boolean acked() {
+ return acked;
+ }
+
+ public boolean failed() {
+ return failed;
+ }
+
+ public Throwable getLastError() {
+ return lastError;
+ }
+
+ public Tuple getAckedTuple() {
+ return ackedTuple;
+ }
+
+ public int getNumTuplesAcked() {
+ return numTuplesAcked;
+ }
+
+ public void reset() {
+ acked = false;
+ failed = false;
+ lastError = null;
+ ackedTuple = null;
+ numTuplesAcked = 0;
+ }
+
+ @Override
+ public void flush() {
+ // Nothing to flush from buffer
+ }
+
+}
diff --git
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
new file mode 100644
index 0000000..98c8d20
--- /dev/null
+++
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pulsar.client.api.Message;
+
+import org.apache.storm.spout.ISpoutOutputCollector;
+
+public class MockSpoutOutputCollector implements ISpoutOutputCollector {
+
+ private boolean emitted = false;
+ private Message lastMessage = null;
+ private String data = null;
+
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple, Object
messageId) {
+ emitted = true;
+ data = (String) tuple.get(0);
+ lastMessage = (Message) messageId;
+ return new ArrayList<Integer>();
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, List<Object> tuple,
Object messageId) {
+ emitted = true;
+ data = (String) tuple.get(0);
+ lastMessage = (Message) messageId;
+ }
+
+ @Override
+ public long getPendingCount() {
+ return 0;
+ }
+
+ @Override
+ public void reportError(Throwable error) {
+ }
+
+ public boolean emitted() {
+ return emitted;
+ }
+
+ public String getTupleData() {
+ return data;
+ }
+
+ public Message getLastMessage() {
+ return lastMessage;
+ }
+
+ public void reset() {
+ emitted = false;
+ data = null;
+ lastMessage = null;
+ }
+
+ @Override
+ public void flush() {
+ // Nothing to flush from buffer
+ }
+}
diff --git
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
new file mode 100644
index 0000000..b90e855
--- /dev/null
+++
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.fail;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+public class PulsarBoltTest extends ProducerConsumerBase {
+
+ private static final int NO_OF_RETRIES = 10;
+
+ public String serviceUrl;
+ public final String topic = "persistent://my-property/my-ns/my-topic1";
+ public final String subscriptionName = "my-subscriber-name";
+
+ protected PulsarBoltConfiguration pulsarBoltConf;
+ protected PulsarBolt bolt;
+ protected MockOutputCollector mockCollector;
+ protected Consumer consumer;
+
+ @Override
+ @BeforeMethod
+ public void beforeMethod(Method m) throws Exception {
+ super.beforeMethod(m);
+ setup();
+ }
+
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+
+ serviceUrl = pulsar.getWebServiceAddress();
+
+ pulsarBoltConf = new PulsarBoltConfiguration();
+ pulsarBoltConf.setServiceUrl(serviceUrl);
+ pulsarBoltConf.setTopic(topic);
+ pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
+ pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
+ bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
+ mockCollector = new MockOutputCollector();
+ OutputCollector collector = new OutputCollector(mockCollector);
+ TopologyContext context = mock(TopologyContext.class);
+ when(context.getThisComponentId()).thenReturn("test-bolt-" +
methodName);
+ when(context.getThisTaskId()).thenReturn(0);
+ bolt.prepare(Maps.newHashMap(), context, collector);
+ consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName).subscribe();
+ }
+
+ @AfterMethod
+ public void cleanup() throws Exception {
+ bolt.close();
+ consumer.close();
+ super.internalCleanup();
+ }
+
+ @SuppressWarnings("serial")
+ static TupleToMessageMapper tupleToMessageMapper = new
TupleToMessageMapper() {
+
+ @Override
+ public TypedMessageBuilder<byte[]>
toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
+ if ("message to be dropped".equals(new
String(tuple.getBinary(0)))) {
+ return null;
+ }
+ if ("throw exception".equals(new String(tuple.getBinary(0)))) {
+ throw new RuntimeException();
+ }
+ return msgBuilder.value(tuple.getBinary(0));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+ };
+
+ private Tuple getMockTuple(String msgContent) {
+ Tuple mockTuple = mock(Tuple.class);
+ when(mockTuple.getBinary(0)).thenReturn(msgContent.getBytes());
+ when(mockTuple.getSourceComponent()).thenReturn("");
+ when(mockTuple.getSourceStreamId()).thenReturn("");
+ return mockTuple;
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ String msgContent = "hello world";
+ Tuple tuple = getMockTuple(msgContent);
+ bolt.execute(tuple);
+ for (int i = 0; i < NO_OF_RETRIES; i++) {
+ Thread.sleep(1000);
+ if (mockCollector.acked()) {
+ break;
+ }
+ }
+ Assert.assertTrue(mockCollector.acked());
+ Assert.assertFalse(mockCollector.failed());
+ Assert.assertNull(mockCollector.getLastError());
+ Assert.assertEquals(tuple, mockCollector.getAckedTuple());
+ Message msg = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledge(msg);
+ Assert.assertEquals(msgContent, new String(msg.getData()));
+ }
+
+ @Test
+ public void testExecuteFailure() throws Exception {
+ String msgContent = "throw exception";
+ Tuple tuple = getMockTuple(msgContent);
+ bolt.execute(tuple);
+ Assert.assertFalse(mockCollector.acked());
+ Assert.assertTrue(mockCollector.failed());
+ Assert.assertNotNull(mockCollector.getLastError());
+ }
+
+ @Test
+ public void testNoMessageSend() throws Exception {
+ String msgContent = "message to be dropped";
+ Tuple tuple = getMockTuple(msgContent);
+ bolt.execute(tuple);
+ Assert.assertTrue(mockCollector.acked());
+ Message msg = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(msg);
+ }
+
+ @Test
+ public void testMetrics() throws Exception {
+ bolt.resetMetrics();
+ String msgContent = "hello world";
+ Tuple tuple = getMockTuple(msgContent);
+ for (int i = 0; i < 10; i++) {
+ bolt.execute(tuple);
+ }
+ for (int i = 0; i < NO_OF_RETRIES; i++) {
+ Thread.sleep(1000);
+ if (mockCollector.getNumTuplesAcked() == 10) {
+ break;
+ }
+ }
+ @SuppressWarnings("rawtypes")
+ Map metrics = (Map) bolt.getValueAndReset();
+ Assert.assertEquals(((Long)
metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 10);
+ Assert.assertEquals(((Double)
metrics.get(PulsarBolt.PRODUCER_RATE)).doubleValue(),
+ 10.0 / pulsarBoltConf.getMetricsTimeIntervalInSecs());
+ Assert.assertEquals(((Double)
metrics.get(PulsarBolt.PRODUCER_THROUGHPUT_BYTES)).doubleValue(),
+ ((double) msgContent.getBytes().length * 10) /
pulsarBoltConf.getMetricsTimeIntervalInSecs());
+ metrics = bolt.getMetrics();
+ Assert.assertEquals(((Long)
metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 0);
+ for (int i = 0; i < 10; i++) {
+ Message msg = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledge(msg);
+ }
+ }
+
+ @Test
+ public void testSharedProducer() throws Exception {
+ TopicStats topicStats = admin.topics().getStats(topic);
+ Assert.assertEquals(topicStats.getPublishers().size(), 1);
+ PulsarBolt otherBolt = new PulsarBolt(pulsarBoltConf,
PulsarClient.builder());
+ MockOutputCollector otherMockCollector = new MockOutputCollector();
+ OutputCollector collector = new OutputCollector(otherMockCollector);
+ TopologyContext context = mock(TopologyContext.class);
+ when(context.getThisComponentId()).thenReturn("test-bolt-" +
methodName);
+ when(context.getThisTaskId()).thenReturn(1);
+ otherBolt.prepare(Maps.newHashMap(), context, collector);
+
+ topicStats = admin.topics().getStats(topic);
+ Assert.assertEquals(topicStats.getPublishers().size(), 1);
+
+ otherBolt.close();
+
+ topicStats = admin.topics().getStats(topic);
+ Assert.assertEquals(topicStats.getPublishers().size(), 1);
+ }
+
+ @Test
+ public void testSerializability() throws Exception {
+ // test serializability with no auth
+ PulsarBolt boltWithNoAuth = new PulsarBolt(pulsarBoltConf,
PulsarClient.builder());
+ TestUtil.testSerializability(boltWithNoAuth);
+ }
+
+ @Test
+ public void testFailedProducer() {
+ PulsarBoltConfiguration pulsarBoltConf = new PulsarBoltConfiguration();
+ pulsarBoltConf.setServiceUrl(serviceUrl);
+ pulsarBoltConf.setTopic("persistent://invalid");
+ pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
+ pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
+ PulsarBolt bolt = new PulsarBolt(pulsarBoltConf,
PulsarClient.builder());
+ MockOutputCollector mockCollector = new MockOutputCollector();
+ OutputCollector collector = new OutputCollector(mockCollector);
+ TopologyContext context = mock(TopologyContext.class);
+ when(context.getThisComponentId()).thenReturn("new" + methodName);
+ when(context.getThisTaskId()).thenReturn(0);
+ try {
+ bolt.prepare(Maps.newHashMap(), context, collector);
+ fail("should have failed as producer creation failed");
+ } catch (IllegalStateException ie) {
+ // Ok.
+ }
+ }
+}
diff --git
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
new file mode 100644
index 0000000..322e41b
--- /dev/null
+++
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+public class PulsarSpoutTest extends ProducerConsumerBase {
+
+ public String serviceUrl;
+ public final String topic = "persistent://my-property/my-ns/my-topic1";
+ public final String subscriptionName = "my-subscriber-name";
+
+ protected PulsarSpoutConfiguration pulsarSpoutConf;
+ protected PulsarSpout spout;
+ protected MockSpoutOutputCollector mockCollector;
+ protected Producer producer;
+
+ @Override
+ @BeforeMethod
+ public void beforeMethod(Method m) throws Exception {
+ super.beforeMethod(m);
+ setup();
+ }
+
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+
+ serviceUrl = pulsar.getWebServiceAddress();
+
+ pulsarSpoutConf = new PulsarSpoutConfiguration();
+ pulsarSpoutConf.setServiceUrl(serviceUrl);
+ pulsarSpoutConf.setTopic(topic);
+ pulsarSpoutConf.setSubscriptionName(subscriptionName);
+ pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
+ pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
+ pulsarSpoutConf.setMaxFailedRetries(2);
+ pulsarSpoutConf.setSharedConsumerEnabled(true);
+ pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
+ pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
+ spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
+ mockCollector = new MockSpoutOutputCollector();
+ SpoutOutputCollector collector = new
SpoutOutputCollector(mockCollector);
+ TopologyContext context = mock(TopologyContext.class);
+ when(context.getThisComponentId()).thenReturn("test-spout-" +
methodName);
+ when(context.getThisTaskId()).thenReturn(0);
+ spout.open(Maps.newHashMap(), context, collector);
+ producer = pulsarClient.newProducer().topic(topic).create();
+ }
+
+ @AfterMethod
+ public void cleanup() throws Exception {
+ producer.close();
+ spout.close();
+ super.internalCleanup();
+ }
+
+ @SuppressWarnings("serial")
+ public static MessageToValuesMapper messageToValuesMapper = new
MessageToValuesMapper() {
+
+ @Override
+ public Values toValues(Message msg) {
+ if ("message to be dropped".equals(new String(msg.getData()))) {
+ return null;
+ }
+ return new Values(new String(msg.getData()));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+ };
+
+ @Test
+ public void testBasic() throws Exception {
+ String msgContent = "hello world";
+ producer.send(msgContent.getBytes());
+ spout.nextTuple();
+ assertTrue(mockCollector.emitted());
+ assertEquals(mockCollector.getTupleData(), msgContent);
+ spout.ack(mockCollector.getLastMessage());
+ }
+
+ @Test
+ public void testRedeliverOnFail() throws Exception {
+ String msgContent = "hello world";
+ producer.send(msgContent.getBytes());
+ spout.nextTuple();
+ spout.fail(mockCollector.getLastMessage());
+ mockCollector.reset();
+ Thread.sleep(150);
+ spout.nextTuple();
+ assertTrue(mockCollector.emitted());
+ assertEquals(mockCollector.getTupleData(), msgContent);
+ spout.ack(mockCollector.getLastMessage());
+ }
+
+ @Test
+ public void testNoRedeliverOnAck() throws Exception {
+ String msgContent = "hello world";
+ producer.send(msgContent.getBytes());
+ spout.nextTuple();
+ spout.ack(mockCollector.getLastMessage());
+ mockCollector.reset();
+ spout.nextTuple();
+ assertFalse(mockCollector.emitted());
+ assertNull(mockCollector.getTupleData());
+ }
+
+ @Test
+ public void testLimitedRedeliveriesOnTimeout() throws Exception {
+ String msgContent = "chuck norris";
+ producer.send(msgContent.getBytes());
+
+ long startTime = System.currentTimeMillis();
+ while (startTime +
pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.MILLISECONDS) > System
+ .currentTimeMillis()) {
+ mockCollector.reset();
+ spout.nextTuple();
+ assertTrue(mockCollector.emitted());
+ assertEquals(mockCollector.getTupleData(), msgContent);
+ spout.fail(mockCollector.getLastMessage());
+ // wait to avoid backoff
+ Thread.sleep(500);
+ }
+ spout.nextTuple();
+ spout.fail(mockCollector.getLastMessage());
+ mockCollector.reset();
+ Thread.sleep(500);
+ spout.nextTuple();
+ assertFalse(mockCollector.emitted());
+ assertNull(mockCollector.getTupleData());
+ }
+
+ @Test
+ public void testLimitedRedeliveriesOnCount() throws Exception {
+ String msgContent = "hello world";
+ producer.send(msgContent.getBytes());
+
+ spout.nextTuple();
+ assertTrue(mockCollector.emitted());
+ assertEquals(mockCollector.getTupleData(), msgContent);
+ spout.fail(mockCollector.getLastMessage());
+
+ mockCollector.reset();
+ Thread.sleep(150);
+
+ spout.nextTuple();
+ assertTrue(mockCollector.emitted());
+ assertEquals(mockCollector.getTupleData(), msgContent);
+ spout.fail(mockCollector.getLastMessage());
+
+ mockCollector.reset();
+ Thread.sleep(300);
+
+ spout.nextTuple();
+ assertTrue(mockCollector.emitted());
+ assertEquals(mockCollector.getTupleData(), msgContent);
+ spout.fail(mockCollector.getLastMessage());
+
+ mockCollector.reset();
+ Thread.sleep(500);
+ spout.nextTuple();
+ assertFalse(mockCollector.emitted());
+ assertNull(mockCollector.getTupleData());
+ }
+
+ @Test
+ public void testBackoffOnRetry() throws Exception {
+ String msgContent = "chuck norris";
+ producer.send(msgContent.getBytes());
+ spout.nextTuple();
+ spout.fail(mockCollector.getLastMessage());
+ mockCollector.reset();
+ // due to backoff we should not get the message again immediately
+ spout.nextTuple();
+ assertFalse(mockCollector.emitted());
+ assertNull(mockCollector.getTupleData());
+ Thread.sleep(100);
+ spout.nextTuple();
+ assertTrue(mockCollector.emitted());
+ assertEquals(mockCollector.getTupleData(), msgContent);
+ spout.ack(mockCollector.getLastMessage());
+ }
+
+ @Test
+ public void testMessageDrop() throws Exception {
+ String msgContent = "message to be dropped";
+ producer.send(msgContent.getBytes());
+ spout.nextTuple();
+ assertFalse(mockCollector.emitted());
+ assertNull(mockCollector.getTupleData());
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ @Test
+ public void testMetrics() throws Exception {
+ spout.resetMetrics();
+ String msgContent = "hello world";
+ producer.send(msgContent.getBytes());
+ spout.nextTuple();
+ Map metrics = spout.getMetrics();
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
+ assertEquals(((Double)
metrics.get(PulsarSpout.CONSUMER_RATE)).doubleValue(),
+ 1.0 / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
+ assertEquals(((Double)
metrics.get(PulsarSpout.CONSUMER_THROUGHPUT_BYTES)).doubleValue(),
+ ((double) msgContent.getBytes().length) /
pulsarSpoutConf.getMetricsTimeIntervalInSecs());
+ spout.fail(mockCollector.getLastMessage());
+ metrics = spout.getMetrics();
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
+ Thread.sleep(150);
+ spout.nextTuple();
+ metrics = spout.getMetrics();
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
+ spout.ack(mockCollector.getLastMessage());
+ metrics = (Map) spout.getValueAndReset();
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
+ assertEquals(((Long)
metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
+ }
+
+ @Test
+ public void testSharedConsumer() throws Exception {
+ TopicStats topicStats = admin.topics().getStats(topic);
+
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
1);
+ PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf,
PulsarClient.builder());
+ MockSpoutOutputCollector otherMockCollector = new
MockSpoutOutputCollector();
+ SpoutOutputCollector collector = new
SpoutOutputCollector(otherMockCollector);
+ TopologyContext context = mock(TopologyContext.class);
+ when(context.getThisComponentId()).thenReturn("test-spout-" +
methodName);
+ when(context.getThisTaskId()).thenReturn(1);
+ otherSpout.open(Maps.newHashMap(), context, collector);
+
+ topicStats = admin.topics().getStats(topic);
+
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
1);
+
+ otherSpout.close();
+
+ topicStats = admin.topics().getStats(topic);
+
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
1);
+ }
+
+ @Test
+ public void testNoSharedConsumer() throws Exception {
+ TopicStats topicStats = admin.topics().getStats(topic);
+
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
1);
+ pulsarSpoutConf.setSharedConsumerEnabled(false);
+ PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf,
PulsarClient.builder());
+ MockSpoutOutputCollector otherMockCollector = new
MockSpoutOutputCollector();
+ SpoutOutputCollector collector = new
SpoutOutputCollector(otherMockCollector);
+ TopologyContext context = mock(TopologyContext.class);
+ when(context.getThisComponentId()).thenReturn("test-spout-" +
methodName);
+ when(context.getThisTaskId()).thenReturn(1);
+ otherSpout.open(Maps.newHashMap(), context, collector);
+
+ topicStats = admin.topics().getStats(topic);
+
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
2);
+
+ otherSpout.close();
+
+ topicStats = admin.topics().getStats(topic);
+
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(),
1);
+ }
+
+ @Test
+ public void testSerializability() throws Exception {
+ // test serializability with no auth
+ PulsarSpout spoutWithNoAuth = new PulsarSpout(pulsarSpoutConf,
PulsarClient.builder());
+ TestUtil.testSerializability(spoutWithNoAuth);
+ }
+
+ @Test
+ public void testFailedConsumer() {
+ PulsarSpoutConfiguration pulsarSpoutConf = new
PulsarSpoutConfiguration();
+ pulsarSpoutConf.setServiceUrl(serviceUrl);
+ pulsarSpoutConf.setTopic("persistent://invalidTopic");
+ pulsarSpoutConf.setSubscriptionName(subscriptionName);
+ pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
+ pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
+ pulsarSpoutConf.setMaxFailedRetries(2);
+ pulsarSpoutConf.setSharedConsumerEnabled(false);
+ pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
+ pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
+ PulsarSpout spout = new PulsarSpout(pulsarSpoutConf,
PulsarClient.builder());
+ MockSpoutOutputCollector mockCollector = new
MockSpoutOutputCollector();
+ SpoutOutputCollector collector = new
SpoutOutputCollector(mockCollector);
+ TopologyContext context = mock(TopologyContext.class);
+ when(context.getThisComponentId()).thenReturn("new-test" + methodName);
+ when(context.getThisTaskId()).thenReturn(0);
+ try {
+ spout.open(Maps.newHashMap(), context, collector);
+ fail("should have failed as consumer creation failed");
+ } catch (IllegalStateException e) {
+ // Ok
+ }
+ }
+}
diff --git
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
new file mode 100644
index 0000000..a71e088
--- /dev/null
+++
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
+import org.testng.Assert;
+
+public class TestUtil {
+
+ public static void testSerializability(Object object) throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ oos.writeObject(object);
+ oos.close();
+ Assert.assertTrue(out.toByteArray().length > 0);
+ }
+}
diff --git
a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
new file mode 100644
index 0000000..93404ea
--- /dev/null
+++
b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm.example;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.storm.MessageToValuesMapper;
+import org.apache.pulsar.storm.PulsarBolt;
+import org.apache.pulsar.storm.PulsarBoltConfiguration;
+import org.apache.pulsar.storm.PulsarSpout;
+import org.apache.pulsar.storm.PulsarSpoutConfiguration;
+import org.apache.pulsar.storm.TupleToMessageMapper;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.task.IErrorReporter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StormExample {
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarSpout.class);
+ private static final String serviceUrl =
"http://broker-pdev.messaging.corp.usw.example.com:8080";
+
+ @SuppressWarnings("serial")
+ static MessageToValuesMapper messageToValuesMapper = new
MessageToValuesMapper() {
+
+ @Override
+ public Values toValues(Message msg) {
+ return new Values(new String(msg.getData()));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // declare the output fields
+ declarer.declare(new Fields("string"));
+ }
+ };
+
+ @SuppressWarnings("serial")
+ static TupleToMessageMapper tupleToMessageMapper = new
TupleToMessageMapper() {
+
+ @Override
+ public TypedMessageBuilder<byte[]>
toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
+ String receivedMessage = tuple.getString(0);
+ // message processing
+ String processedMsg = receivedMessage + "-processed";
+ return msgBuilder.value(processedMsg.getBytes());
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // declare the output fields
+ }
+ };
+
+ public static void main(String[] args) throws Exception {
+ // String authPluginClassName =
"org.apache.pulsar.client.impl.auth.MyAuthentication";
+ // String authParams = "key1:val1,key2:val2";
+ // clientConf.setAuthentication(authPluginClassName, authParams);
+
+ String topic1 = "persistent://my-property/use/my-ns/my-topic1";
+ String topic2 = "persistent://my-property/use/my-ns/my-topic2";
+ String subscriptionName1 = "my-subscriber-name1";
+ String subscriptionName2 = "my-subscriber-name2";
+
+ // create spout
+ PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
+ spoutConf.setServiceUrl(serviceUrl);
+ spoutConf.setTopic(topic1);
+ spoutConf.setSubscriptionName(subscriptionName1);
+ spoutConf.setMessageToValuesMapper(messageToValuesMapper);
+ PulsarSpout spout = new PulsarSpout(spoutConf, PulsarClient.builder());
+
+ // create bolt
+ PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
+ boltConf.setServiceUrl(serviceUrl);
+ boltConf.setTopic(topic2);
+ boltConf.setTupleToMessageMapper(tupleToMessageMapper);
+ PulsarBolt bolt = new PulsarBolt(boltConf, PulsarClient.builder());
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("testSpout", spout);
+ builder.setBolt("testBolt", bolt).shuffleGrouping("testSpout");
+
+ Config conf = new Config();
+ conf.setNumWorkers(2);
+ conf.setDebug(true);
+ conf.registerMetricsConsumer(PulsarMetricsConsumer.class);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", conf, builder.createTopology());
+ Utils.sleep(10000);
+
+ PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(serviceUrl).build();
+ // create a consumer on topic2 to receive messages from the bolt when
the processing is done
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic2).subscriptionName(subscriptionName2).subscribe();
+ // create a producer on topic1 to send messages that will be received
by the spout
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic1).create();
+
+ for (int i = 0; i < 10; i++) {
+ String msg = "msg-" + i;
+ producer.send(msg.getBytes());
+ LOG.info("Message {} sent", msg);
+ }
+ Message<byte[]> msg = null;
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(1, TimeUnit.SECONDS);
+ LOG.info("Message {} received", new String(msg.getData()));
+ }
+ cluster.killTopology("test");
+ cluster.shutdown();
+
+ }
+
+ class PulsarMetricsConsumer implements IMetricsConsumer {
+
+ @Override
+ public void prepare(Map stormConf, Object registrationArgument,
TopologyContext context,
+ IErrorReporter errorReporter) {
+ }
+
+ @Override
+ public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint>
dataPoints) {
+ // The collection will contain metrics for all the spouts/bolts
that register the metrics in the topology.
+ // The name for the Pulsar Spout is
"PulsarSpoutMetrics-{componentId}-{taskIndex}" and for the Pulsar Bolt
+ // is
+ // "PulsarBoltMetrics-{componentId}-{taskIndex}".
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ }
+}