This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch STORM-3988 in repository https://gitbox.apache.org/repos/asf/storm.git
commit e9db2b848ff609c0feab8114514db70b08b2fc7f Author: Richard Zowalla <[email protected]> AuthorDate: Thu Oct 19 08:59:58 2023 +0200 STORM-3988 - Remove "storm-pulsar" --- external/storm-pulsar/pom.xml | 277 ------------ .../apache/pulsar/storm/MessageToValuesMapper.java | 45 -- .../java/org/apache/pulsar/storm/PulsarBolt.java | 208 --------- .../pulsar/storm/PulsarBoltConfiguration.java | 54 --- .../java/org/apache/pulsar/storm/PulsarSpout.java | 486 --------------------- .../pulsar/storm/PulsarSpoutConfiguration.java | 195 --------- .../apache/pulsar/storm/PulsarSpoutConsumer.java | 59 --- .../pulsar/storm/PulsarStormConfiguration.java | 89 ---- .../java/org/apache/pulsar/storm/PulsarTuple.java | 45 -- .../apache/pulsar/storm/SharedPulsarClient.java | 153 ------- .../apache/pulsar/storm/TupleToMessageMapper.java | 68 --- .../storm-pulsar/src/main/javadoc/overview.html | 29 -- .../apache/pulsar/storm/MockOutputCollector.java | 101 ----- .../pulsar/storm/MockSpoutOutputCollector.java | 80 ---- .../pulsar/storm/PulsarBoltIntegrationTest.java | 243 ----------- .../pulsar/storm/PulsarSpoutIntegrationTest.java | 354 --------------- .../org/apache/pulsar/storm/PulsarSpoutTest.java | 178 -------- .../java/org/apache/pulsar/storm/TestUtil.java | 35 -- .../apache/pulsar/storm/example/StormExample.java | 166 ------- 19 files changed, 2865 deletions(-) diff --git a/external/storm-pulsar/pom.xml b/external/storm-pulsar/pom.xml deleted file mode 100644 index 81785bcf0..000000000 --- a/external/storm-pulsar/pom.xml +++ /dev/null @@ -1,277 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" - xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>2.5.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>storm-pulsar</artifactId> - <name>Pulsar Storm adapter</name> - - <developers> - <developer> - <organization>Apache Pulsar developers</organization> - <organizationUrl>http://pulsar.apache.org/</organizationUrl> - </developer> - </developers> - - <licenses> - <license> - <name>Apache License, Version 2.0</name> - <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url> - <distribution>repo</distribution> - </license> - </licenses> - - <properties> - <pulsar.version>2.8.2</pulsar.version> - <powermock.version>2.0.9</powermock.version> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>log4j-over-slf4j</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-client-original</artifactId> - <version>${pulsar.version}</version> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> - <version>${netty.version}</version> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-common</artifactId> - <version>${pulsar.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>buildtools</artifactId> - <version>${pulsar.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-broker</artifactId> - <version>${pulsar.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-broker</artifactId> - <version>${pulsar.version}</version> - <type>test-jar</type> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>testmocks</artifactId> - <version>${pulsar.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-testng</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-api-mockito2</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4-rule-agent</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-core</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>log4j-over-slf4j</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-server</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>log4j-over-slf4j</artifactId> - </exclusion> - </exclusions> - </dependency> - - </dependencies> - - <build> - <resources> - <resource> - <directory>src/main/resources</directory> - <filtering>true</filtering> - </resource> - </resources> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <testNGArtifactName>none:none</testNGArtifactName> - <argLine> -Xmx1G -XX:+UseG1GC - -Dpulsar.allocator.pooled=false - -Dpulsar.allocator.leak_detection=Advanced - -Dpulsar.allocator.exit_on_oom=false - </argLine> - <reuseForks>true</reuseForks> - <forkCount>1</forkCount> - <shutdown>kill</shutdown> - <trimStackTrace>false</trimStackTrace> - <properties> - <property> - <name>listener</name> - <value>org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener,org.apache.pulsar.tests.FailFastNotifier,org.apache.pulsar.tests.MockitoCleanupListener,org.apache.pulsar.tests.FastThreadLocalCleanupListener,org.apache.pulsar.tests.ThreadLeakDetectorListener</value> - </property> - </properties> - </configuration> - </plugin> - </plugins> - - </build> -</project> diff --git a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java b/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java deleted file mode 100644 index 00575a6a7..000000000 --- a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.storm; - -import java.io.Serializable; - -import org.apache.pulsar.client.api.Message; - -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Values; - -public interface MessageToValuesMapper extends Serializable { - - /** - * Convert {@link org.apache.pulsar.client.api.Message} to tuple values. - * - * @param msg - message - * @return values - */ - Values toValues(Message<byte[]> msg); - - /** - * Declare the output schema for the spout. - * - * @param declarer - output field declarer. - */ - void declareOutputFields(OutputFieldsDeclarer declarer); -} diff --git a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarBolt.java deleted file mode 100644 index fd02d6c8c..000000000 --- a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarBolt.java +++ /dev/null @@ -1,208 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.storm; - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.String.format; - -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.apache.pulsar.client.impl.ClientBuilderImpl; -import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -import org.apache.storm.metric.api.IMetric; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.TupleUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PulsarBolt extends BaseRichBolt implements IMetric { - - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(PulsarBolt.class); - - public static final String NO_OF_MESSAGES_SENT = "numberOfMessagesSent"; - public static final String PRODUCER_RATE = "producerRate"; - public static final String PRODUCER_THROUGHPUT_BYTES = "producerThroughput"; - - private final ClientConfigurationData clientConf; - private final ProducerConfigurationData producerConf; - private final PulsarBoltConfiguration pulsarBoltConf; - private final ConcurrentMap<String, Object> metricsMap = new ConcurrentHashMap<>(); - - private SharedPulsarClient sharedPulsarClient; - private String componentId; - private String boltId; - private OutputCollector collector; - private Producer<byte[]> producer; - private volatile long messagesSent = 0; - private volatile long messageSizeSent = 0; - - public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf) { - this(pulsarBoltConf, PulsarClient.builder()); - } - - public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder clientBuilder) { - this(pulsarBoltConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(), - new ProducerConfigurationData()); - } - - public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfigurationData clientConf, - ProducerConfigurationData producerConf) { - checkNotNull(pulsarBoltConf, "bolt configuration can't be null"); - checkNotNull(clientConf, "client configuration can't be null"); - checkNotNull(producerConf, "producer configuration can't be null"); - Objects.requireNonNull(pulsarBoltConf.getServiceUrl()); - Objects.requireNonNull(pulsarBoltConf.getTopic()); - Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper()); - this.pulsarBoltConf = pulsarBoltConf; - this.clientConf = clientConf; - this.producerConf = producerConf; - this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl()); - this.producerConf.setTopicName(pulsarBoltConf.getTopic()); - this.producerConf.setBatcherBuilder(null); - } - - @SuppressWarnings({ "rawtypes" }) - @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { - this.componentId = context.getThisComponentId(); - this.boltId = String.format("%s-%s", componentId, context.getThisTaskId()); - this.collector = collector; - try { - sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf); - producer = sharedPulsarClient.getSharedProducer(producerConf); - LOG.info("[{}] Created a pulsar producer on topic {} to send messages", boltId, pulsarBoltConf.getTopic()); - } catch (PulsarClientException e) { - LOG.error("[{}] Error initializing pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic(), e); - throw new IllegalStateException( - format("Failed to initialize producer for %s : %s", pulsarBoltConf.getTopic(), e.getMessage()), e); - } - context.registerMetric(String.format("PulsarBoltMetrics-%s-%s", componentId, context.getThisTaskIndex()), this, - pulsarBoltConf.getMetricsTimeIntervalInSecs()); - } - - @Override - public void execute(Tuple input) { - if (TupleUtils.isTick(input)) { - collector.ack(input); - return; - } - try { - if (producer != null) { - // a message key can be provided in the mapper - TypedMessageBuilder<byte[]> msgBuilder = pulsarBoltConf.getTupleToMessageMapper() - .toMessage(producer.newMessage(), input); - if (msgBuilder == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Cannot send null message, acking the collector", boltId); - } - collector.ack(input); - } else { - final long messageSizeToBeSent = ((TypedMessageBuilderImpl<byte[]>) msgBuilder).getContent() - .remaining(); - msgBuilder.sendAsync().handle((msgId, ex) -> { - synchronized (collector) { - if (ex != null) { - collector.reportError(ex); - collector.fail(input); - LOG.error("[{}] Message send failed", boltId, ex); - - } else { - collector.ack(input); - ++messagesSent; - messageSizeSent += messageSizeToBeSent; - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Message sent with id {}", boltId, msgId); - } - } - } - - return null; - }); - } - } - } catch (Exception e) { - LOG.error("[{}] Message processing failed", boltId, e); - collector.reportError(e); - collector.fail(input); - } - } - - public void close() { - try { - LOG.info("[{}] Closing Pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic()); - if (sharedPulsarClient != null) { - sharedPulsarClient.close(); - } - } catch (PulsarClientException e) { - LOG.error("[{}] Error closing Pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic(), e); - } - } - - @Override - public void cleanup() { - close(); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - pulsarBoltConf.getTupleToMessageMapper().declareOutputFields(declarer); - } - - /** - * Helpers for metrics. - */ - - @SuppressWarnings({ "rawtypes" }) - ConcurrentMap getMetrics() { - metricsMap.put(NO_OF_MESSAGES_SENT, messagesSent); - metricsMap.put(PRODUCER_RATE, ((double) messagesSent) / pulsarBoltConf.getMetricsTimeIntervalInSecs()); - metricsMap.put(PRODUCER_THROUGHPUT_BYTES, - ((double) messageSizeSent) / pulsarBoltConf.getMetricsTimeIntervalInSecs()); - return metricsMap; - } - - void resetMetrics() { - messagesSent = 0; - messageSizeSent = 0; - } - - @SuppressWarnings("rawtypes") - @Override - public Object getValueAndReset() { - ConcurrentMap metrics = getMetrics(); - resetMetrics(); - return metrics; - } -} diff --git a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java b/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java deleted file mode 100644 index 44a4909ba..000000000 --- a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.storm; - -import java.util.Objects; - -/** - * Class used to specify Pulsar bolt configuration. - */ -public class PulsarBoltConfiguration extends PulsarStormConfiguration { - - private static final long serialVersionUID = 1L; - - private TupleToMessageMapper tupleToMessageMapper = null; - - /** - * Get the tuple to message mapper. - * @return the mapper to convert storm tuples to a pulsar message. - */ - public TupleToMessageMapper getTupleToMessageMapper() { - return tupleToMessageMapper; - } - - /** - * Sets the mapper to convert storm tuples to a pulsar message. - * <p> - * Note: If the mapper returns null, the message is not sent by the producer and is acked immediately on the - * collector - * </p> - * - * @param mapper - tuple to message mapper - */ - public void setTupleToMessageMapper(TupleToMessageMapper mapper) { - this.tupleToMessageMapper = Objects.requireNonNull(mapper); - } - -} diff --git a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpout.java deleted file mode 100644 index 1ffe1ca82..000000000 --- a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpout.java +++ /dev/null @@ -1,486 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.storm; - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.String.format; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.impl.Backoff; -import org.apache.pulsar.client.impl.ClientBuilderImpl; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; -import org.apache.storm.metric.api.IMetric; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichSpout; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PulsarSpout extends BaseRichSpout implements IMetric { - - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class); - - public static final String NO_OF_PENDING_FAILED_MESSAGES = "numberOfPendingFailedMessages"; - public static final String NO_OF_MESSAGES_RECEIVED = "numberOfMessagesReceived"; - public static final String NO_OF_MESSAGES_EMITTED = "numberOfMessagesEmitted"; - public static final String NO_OF_MESSAGES_FAILED = "numberOfMessagesFailed"; - public static final String MESSAGE_NOT_AVAILABLE_COUNT = "messageNotAvailableCount"; - public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks"; - public static final String CONSUMER_RATE = "consumerRate"; - public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput"; - - private final ClientConfigurationData clientConf; - private final PulsarSpoutConfiguration pulsarSpoutConf; - private final ConsumerConfigurationData<byte[]> consumerConf; - private final long failedRetriesTimeoutNano; - private final int maxFailedRetries; - private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries = new ConcurrentHashMap<>(); - private final Queue<Message<byte[]>> failedMessages = new ConcurrentLinkedQueue<>(); - private final ConcurrentMap<String, Object> metricsMap = new ConcurrentHashMap<>(); - - private SharedPulsarClient sharedPulsarClient; - private String componentId; - private String spoutId; - private SpoutOutputCollector collector; - private PulsarSpoutConsumer consumer; - private volatile long messagesReceived = 0; - private volatile long messagesEmitted = 0; - private volatile long messagesFailed = 0; - private volatile long messageNotAvailableCount = 0; - private volatile long pendingAcks = 0; - private volatile long messageSizeReceived = 0; - - public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf) { - this(pulsarSpoutConf, PulsarClient.builder()); - } - - public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clientBuilder) { - this(pulsarSpoutConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(), - new ConsumerConfigurationData<byte[]>()); - } - - public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfigurationData clientConfig, - ConsumerConfigurationData<byte[]> consumerConfig) { - Objects.requireNonNull(pulsarSpoutConf.getServiceUrl()); - Objects.requireNonNull(pulsarSpoutConf.getTopic()); - Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName()); - Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper()); - - checkNotNull(pulsarSpoutConf, "spout configuration can't be null"); - checkNotNull(clientConfig, "client configuration can't be null"); - checkNotNull(consumerConfig, "consumer configuration can't be null"); - this.clientConf = clientConfig; - this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl()); - this.consumerConf = consumerConfig; - this.pulsarSpoutConf = pulsarSpoutConf; - this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS); - this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries(); - } - - @Override - public void close() { - try { - LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic()); - - if (pulsarSpoutConf.isAutoUnsubscribe()) { - try { - consumer.unsubscribe(); - } catch (PulsarClientException e) { - LOG.error("[{}] Failed to unsubscribe {} on topic {}", spoutId, - this.pulsarSpoutConf.getSubscriptionName(), pulsarSpoutConf.getTopic(), e); - } - } - - if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer != null) { - consumer.close(); - } - if (sharedPulsarClient != null) { - sharedPulsarClient.close(); - } - pendingMessageRetries.clear(); - failedMessages.clear(); - } catch (PulsarClientException e) { - LOG.error("[{}] Error closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic(), e); - } - } - - @Override - public void ack(Object msgId) { - if (msgId instanceof Message) { - Message<?> msg = (Message<?>) msgId; - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Received ack for message {}", spoutId, msg.getMessageId()); - } - consumer.acknowledgeAsync(msg); - pendingMessageRetries.remove(msg.getMessageId()); - // we should also remove message from failedMessages but it will be eventually removed while emitting next - // tuple - --pendingAcks; - } - } - - @Override - public void fail(Object msgId) { - if (msgId instanceof Message) { - @SuppressWarnings("unchecked") - Message<byte[]> msg = (Message<byte[]>) msgId; - MessageId id = msg.getMessageId(); - LOG.warn("[{}] Error processing message {}", spoutId, id); - - // Since the message processing failed, we put it in the failed messages queue if there are more retries - // remaining for the message - MessageRetries messageRetries = pendingMessageRetries.computeIfAbsent(id, (k) -> new MessageRetries()); - if ((failedRetriesTimeoutNano < 0 - || (messageRetries.getTimeStamp() + failedRetriesTimeoutNano) > System.nanoTime()) - && (maxFailedRetries < 0 || messageRetries.numRetries < maxFailedRetries)) { - // since we can retry again, we increment retry count and put it in the queue - LOG.info("[{}] Putting message {} in the retry queue", spoutId, id); - messageRetries.incrementAndGet(); - pendingMessageRetries.putIfAbsent(id, messageRetries); - failedMessages.add(msg); - --pendingAcks; - messagesFailed++; - } else { - LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id); - ack(msg); - } - } - - } - - /** - * Emits a tuple received from the Pulsar consumer unless there are any failed messages. - */ - @Override - public void nextTuple() { - emitNextAvailableTuple(); - } - - /** - * It makes sure that it emits next available non-tuple to topology unless consumer queue doesn't have any message - * available. It receives message from consumer queue and converts it to tuple and emits to topology. if the - * converted tuple is null then it tries to receives next message and perform the same until it finds non-tuple to - * emit. - */ - public void emitNextAvailableTuple() { - // check if there are any failed messages to re-emit in the topology - if (emitFailedMessage()) { - return; - } - - Message<byte[]> msg; - // receive from consumer if no failed messages - if (consumer != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Receiving the next message from pulsar consumer to emit to the collector", spoutId); - } - try { - boolean done = false; - while (!done) { - msg = consumer.receive(100, TimeUnit.MILLISECONDS); - if (msg != null) { - ++messagesReceived; - messageSizeReceived += msg.getData().length; - done = mapToValueAndEmit(msg); - } else { - // queue is empty and nothing to emit - done = true; - messageNotAvailableCount++; - } - } - } catch (PulsarClientException e) { - LOG.error("[{}] Error receiving message from pulsar consumer", spoutId, e); - } - } - } - - private boolean emitFailedMessage() { - Message<byte[]> msg; - - while ((msg = failedMessages.peek()) != null) { - MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId()); - if (messageRetries != null) { - // emit the tuple if retry doesn't need backoff else sleep with backoff time and return without doing - // anything - if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS, - messageRetries.getNumRetries(), clientConf.getInitialBackoffIntervalNanos(), - clientConf.getMaxBackoffIntervalNanos())) { - Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getInitialBackoffIntervalNanos())); - } else { - // remove the message from the queue and emit to the topology, only if it should not be backedoff - LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId()); - failedMessages.remove(); - mapToValueAndEmit(msg); - } - return true; - } - - // messageRetries is null because messageRetries is already acked and removed from pendingMessageRetries - // then remove it from failed message queue as well. - if (LOG.isDebugEnabled()) { - LOG.debug("[{}]-{} removing {} from failedMessage because it's already acked", - pulsarSpoutConf.getTopic(), spoutId, msg.getMessageId()); - } - failedMessages.remove(); - // try to find out next failed message - continue; - } - return false; - } - - @Override - @SuppressWarnings({ "rawtypes" }) - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - this.componentId = context.getThisComponentId(); - this.spoutId = String.format("%s-%s", componentId, context.getThisTaskId()); - this.collector = collector; - pendingMessageRetries.clear(); - failedMessages.clear(); - try { - consumer = createConsumer(); - LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", spoutId, - pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName()); - } catch (PulsarClientException e) { - LOG.error("[{}] Error creating pulsar consumer on topic {}", spoutId, pulsarSpoutConf.getTopic(), e); - throw new IllegalStateException(format("Failed to initialize consumer for %s-%s : %s", - pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName(), e.getMessage()), e); - } - context.registerMetric(String.format("PulsarSpoutMetrics-%s-%s", componentId, context.getThisTaskIndex()), this, - pulsarSpoutConf.getMetricsTimeIntervalInSecs()); - } - - private PulsarSpoutConsumer createConsumer() throws PulsarClientException { - sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf); - PulsarSpoutConsumer consumer; - if (pulsarSpoutConf.isSharedConsumerEnabled()) { - consumer = pulsarSpoutConf.isDurableSubscription() - ? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration())) - : new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration())); - } else { - try { - consumer = pulsarSpoutConf.isDurableSubscription() - ? new SpoutConsumer(sharedPulsarClient.getClient() - .subscribeAsync(newConsumerConfiguration()).join()) - : new SpoutReader(sharedPulsarClient.getClient() - .createReaderAsync(newReaderConfiguration()).join()); - } catch (CompletionException e) { - throw (PulsarClientException) e.getCause(); - } - } - return consumer; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(declarer); - - } - - private boolean mapToValueAndEmit(Message<byte[]> msg) { - if (msg != null) { - Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg); - ++pendingAcks; - if (values == null) { - // since the mapper returned null, we can drop the message and ack it immediately - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Dropping message {}", spoutId, msg.getMessageId()); - } - ack(msg); - } else { - if (values instanceof PulsarTuple) { - collector.emit(((PulsarTuple) values).getOutputStream(), values, msg); - } else { - collector.emit(values, msg); - } - ++messagesEmitted; - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Emitted message {} to the collector", spoutId, msg.getMessageId()); - } - return true; - } - } - return false; - } - - public class MessageRetries { - private final long timestampInNano; - private int numRetries; - - public MessageRetries() { - this.timestampInNano = System.nanoTime(); - this.numRetries = 0; - } - - public long getTimeStamp() { - return timestampInNano; - } - - public int incrementAndGet() { - return ++numRetries; - } - - public int getNumRetries() { - return numRetries; - } - } - - /** - * Helpers for metrics. - */ - - @SuppressWarnings({ "rawtypes" }) - ConcurrentMap getMetrics() { - metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, (long) pendingMessageRetries.size()); - metricsMap.put(NO_OF_MESSAGES_RECEIVED, messagesReceived); - metricsMap.put(NO_OF_MESSAGES_EMITTED, messagesEmitted); - metricsMap.put(NO_OF_MESSAGES_FAILED, messagesFailed); - metricsMap.put(MESSAGE_NOT_AVAILABLE_COUNT, messageNotAvailableCount); - metricsMap.put(NO_OF_PENDING_ACKS, pendingAcks); - metricsMap.put(CONSUMER_RATE, ((double) messagesReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs()); - metricsMap.put(CONSUMER_THROUGHPUT_BYTES, - ((double) messageSizeReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs()); - return metricsMap; - } - - void resetMetrics() { - messagesReceived = 0; - messagesEmitted = 0; - messageSizeReceived = 0; - messagesFailed = 0; - messageNotAvailableCount = 0; - } - - @SuppressWarnings("rawtypes") - @Override - public Object getValueAndReset() { - ConcurrentMap metrics = getMetrics(); - resetMetrics(); - return metrics; - } - - private ReaderConfigurationData<byte[]> newReaderConfiguration() { - ReaderConfigurationData<byte[]> readerConf = new ReaderConfigurationData<>(); - readerConf.setTopicName(pulsarSpoutConf.getTopic()); - readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName()); - readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition()); - if (this.consumerConf != null) { - readerConf.setCryptoFailureAction(consumerConf.getCryptoFailureAction()); - readerConf.setCryptoKeyReader(consumerConf.getCryptoKeyReader()); - readerConf.setReadCompacted(consumerConf.isReadCompacted()); - readerConf.setReceiverQueueSize(consumerConf.getReceiverQueueSize()); - } - return readerConf; - } - - private ConsumerConfigurationData<byte[]> newConsumerConfiguration() { - ConsumerConfigurationData<byte[]> consumerConf = this.consumerConf != null ? this.consumerConf - : new ConsumerConfigurationData<>(); - consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic())); - consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName()); - consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType()); - return consumerConf; - } - - static class SpoutConsumer implements PulsarSpoutConsumer { - private Consumer<byte[]> consumer; - - SpoutConsumer(Consumer<byte[]> consumer) { - super(); - this.consumer = consumer; - } - - @Override - public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException { - return consumer.receive(timeout, unit); - } - - @Override - public void acknowledgeAsync(Message<?> msg) { - consumer.acknowledgeAsync(msg); - } - - @Override - public void close() throws PulsarClientException { - consumer.close(); - } - - @Override - public void unsubscribe() throws PulsarClientException { - consumer.unsubscribe(); - } - - } - - static class SpoutReader implements PulsarSpoutConsumer { - private Reader<byte[]> reader; - - SpoutReader(Reader<byte[]> reader) { - super(); - this.reader = reader; - } - - @Override - public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException { - return reader.readNext(timeout, unit); - } - - @Override - public void acknowledgeAsync(Message<?> msg) { - // No-op - } - - @Override - public void close() throws PulsarClientException { - try { - reader.close(); - } catch (IOException e) { - throw new PulsarClientException(e); - } - } - - @Override - public void unsubscribe() throws PulsarClientException { - // No-op - } - } -} diff --git a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java deleted file mode 100644 index cd8292ad8..000000000 --- a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.storm; - -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.SubscriptionType; - -/** - * Class used to specify pulsar spout configuration. - */ -public class PulsarSpoutConfiguration extends PulsarStormConfiguration { - - private static final long serialVersionUID = 1L; - - public static final long DEFAULT_FAILED_RETRIES_TIMEOUT_NANO = TimeUnit.SECONDS.toNanos(60); - public static final int DEFAULT_MAX_FAILED_RETRIES = -1; - - private String subscriptionName = null; - private MessageToValuesMapper messageToValuesMapper = null; - private long failedRetriesTimeoutNano = DEFAULT_FAILED_RETRIES_TIMEOUT_NANO; - private int maxFailedRetries = DEFAULT_MAX_FAILED_RETRIES; - private boolean sharedConsumerEnabled = false; - - private SubscriptionType subscriptionType = SubscriptionType.Shared; - private boolean autoUnsubscribe = false; - private boolean durableSubscription = true; - // read position if non-durable subscription is enabled : default oldest message available in topic - private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest; - - - /** - * Return subscription name. - * @return the subscription name for the consumer in the spout - */ - public String getSubscriptionName() { - return subscriptionName; - } - - /** - * Sets the subscription name for the consumer in the spout. - * - * @param subscriptionName - subscription name - */ - public void setSubscriptionName(String subscriptionName) { - this.subscriptionName = subscriptionName; - } - - public SubscriptionType getSubscriptionType() { - return subscriptionType; - } - - public void setSubscriptionType(SubscriptionType subscriptionType) { - this.subscriptionType = subscriptionType; - } - - /** - * Get value mapper. - * - * @return the mapper to convert pulsar message to a storm tuple - */ - public MessageToValuesMapper getMessageToValuesMapper() { - return messageToValuesMapper; - } - - /** - * Sets the mapper to convert pulsar message to a storm tuple. - * <p> - * Note: If the mapper returns null, the message is not emitted to the collector and is acked immediately - * </p> - * - * @param mapper - MessageToValuesMapper - */ - public void setMessageToValuesMapper(MessageToValuesMapper mapper) { - this.messageToValuesMapper = Objects.requireNonNull(mapper); - } - - /** - * Get retries timeout. - * - * @param unit - TimeUnit - * @return the timeout for retrying failed messages - */ - public long getFailedRetriesTimeout(TimeUnit unit) { - return unit.convert(failedRetriesTimeoutNano, TimeUnit.NANOSECONDS); - } - - /** - * Sets the timeout within which the spout will re-inject failed messages with an exponential backoff <i>(default: - * 60 seconds)</i> Note: If set to 0, the message will not be retried when failed. If set to < 0, the message will - * be retried forever till it is successfully processed or max message retry count is reached, whichever comes - * first. - * - * @param failedRetriesTimeout - timeout value - * @param unit - time unit - */ - public void setFailedRetriesTimeout(long failedRetriesTimeout, TimeUnit unit) { - this.failedRetriesTimeoutNano = unit.toNanos(failedRetriesTimeout); - } - - /** - * Get max retries. - * @return the maximum number of times a failed message will be retried - */ - public int getMaxFailedRetries() { - return maxFailedRetries; - } - - /** - * Sets the maximum number of times the spout will re-inject failed messages with an exponential backoff - * <i>(default: -1)</i> Note: If set to 0, the message will not be retried when failed. If set to < 0, the message - * will be retried forever till it is successfully processed or configured timeout expires, whichever comes first. - * - * @param maxFailedRetries - max number of retries - */ - public void setMaxFailedRetries(int maxFailedRetries) { - this.maxFailedRetries = maxFailedRetries; - } - - /** - * Is shared consumer enabled. - * @return if the consumer is shared across different executors of a spout - */ - public boolean isSharedConsumerEnabled() { - return sharedConsumerEnabled; - } - - /** - * Sets whether the consumer will be shared across different executors of a spout. <i>(default: false)</i> - * - * @param sharedConsumerEnabled - is shared consumer enabled - */ - public void setSharedConsumerEnabled(boolean sharedConsumerEnabled) { - this.sharedConsumerEnabled = sharedConsumerEnabled; - } - - public boolean isAutoUnsubscribe() { - return autoUnsubscribe; - } - - /** - * It unsubscribes the subscription when spout gets closed in the topology. - * - * @param autoUnsubscribe - should auto unsubscribe - */ - public void setAutoUnsubscribe(boolean autoUnsubscribe) { - this.autoUnsubscribe = autoUnsubscribe; - } - - public boolean isDurableSubscription() { - return durableSubscription; - } - - /** - * if subscription is not durable then it creates non-durable reader to start reading from the - * {@link #setNonDurableSubscriptionReadPosition(MessagePosition)} in topic. - * - * @param durableSubscription - is subscription durable - */ - public void setDurableSubscription(boolean durableSubscription) { - this.durableSubscription = durableSubscription; - } - - public MessageId getNonDurableSubscriptionReadPosition() { - return nonDurableSubscriptionReadPosition; - } - - /** - * Non-durable-subscription/Reader can be set to start reading from a specific position earliest/latest. - * - * @param nonDurableSubscriptionReadPosition - position for the non-durable subcription - */ - public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) { - this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition; - } -} diff --git a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java b/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java deleted file mode 100644 index f9a9f483e..000000000 --- a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.storm; - -import java.util.concurrent.TimeUnit; - -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClientException; - -public interface PulsarSpoutConsumer { - - /** - * Receives a single message. - * - * @param waitTime - time - * @param unit - time unit - * @return message - * @throws PulsarClientException in case of error - */ - Message<byte[]> receive(int waitTime, TimeUnit unit) throws PulsarClientException; - - /** - * Ack the message async. - * - * @param msg - message to ack - */ - void acknowledgeAsync(Message<?> msg); - - /** - * unsubscribe the consumer. - * @throws PulsarClientException in case of error - */ - void unsubscribe() throws PulsarClientException; - - /** - * Close the consumer. - * - * @throws PulsarClientException in case of error - */ - void close() throws PulsarClientException; - -} diff --git a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java b/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java deleted file mode 100644 index 81a154900..000000000 --- a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarStormConfiguration.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.storm; - -import java.io.Serializable; - -/** - * Class used to specify pulsar storm configurations like service url and topic. - */ -public class PulsarStormConfiguration implements Serializable { - - private static final long serialVersionUID = 1L; - - public static final int DEFAULT_METRICS_TIME_INTERVAL_IN_SECS = 60; - - private String serviceUrl = null; - private String topic = null; - private int metricsTimeIntervalInSecs = DEFAULT_METRICS_TIME_INTERVAL_IN_SECS; - - /** - * Get service url. - * @return the service URL to connect to from the client. - */ - public String getServiceUrl() { - return serviceUrl; - } - - /** - * Sets the service URL to connect to from the client. - * - * @param serviceUrl - service url - */ - public void setServiceUrl(String serviceUrl) { - this.serviceUrl = serviceUrl; - } - - /** - * Get topic. - * @return the topic name for the producer/consumer. - */ - public String getTopic() { - return topic; - } - - /** - * Sets the topic name for the producer/consumer. It should be of the format - * {persistent|non-persistent}://{property}/{cluster}/{namespace}/{topic}. - * - * @param topic - topic name - */ - public void setTopic(String topic) { - this.topic = topic; - } - - /** - * Get metrics interval. - * @return the time interval in seconds for metrics generation. - */ - public int getMetricsTimeIntervalInSecs() { - return metricsTimeIntervalInSecs; - } - - /** - * Sets the time interval in seconds for metrics generation <i>(default: 60 seconds)</i>. - * - * @param metricsTimeIntervalInSecs - metrics interval in sec. - */ - public void setMetricsTimeIntervalInSecs(int metricsTimeIntervalInSecs) { - this.metricsTimeIntervalInSecs = metricsTimeIntervalInSecs; - } - -} diff --git a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarTuple.java b/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarTuple.java deleted file mode 100644 index 429af33f2..000000000 --- a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarTuple.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.storm; - -import org.apache.storm.tuple.Values; - -/** - * Returned by MessageToValuesMapper, this specifies the Values - * for an output tuple and the stream it should be sent to. - */ -public class PulsarTuple extends Values { - - protected final String outputStream; - - public PulsarTuple(String outStream, Object ... values) { - super(values); - outputStream = outStream; - } - - /** - * Return stream the tuple should be emitted on. - * - * @return String - */ - public String getOutputStream() { - return outputStream; - } -} diff --git a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java b/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java deleted file mode 100644 index 4e58c2d7e..000000000 --- a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.storm; - -import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SharedPulsarClient { - private static final Logger LOG = LoggerFactory.getLogger(SharedPulsarClient.class); - private static final ConcurrentMap<String, SharedPulsarClient> instances = new ConcurrentHashMap<>(); - - private final String componentId; - private final PulsarClientImpl client; - private final AtomicInteger counter = new AtomicInteger(); - - private Consumer<byte[]> consumer; - private Reader<byte[]> reader; - private Producer<byte[]> producer; - - private SharedPulsarClient(String componentId, ClientConfigurationData clientConf) - throws PulsarClientException { - this.client = new PulsarClientImpl(clientConf); - this.componentId = componentId; - } - - /** - * Provides a shared pulsar client that is shared across all different tasks in the same component. Different - * components will not share the pulsar client since they can have different configurations. - * - * @param componentId - the id of the spout/bolt - * @param clientConf - client config - * @return SharedPulsarClient - * @throws PulsarClientException in case of an error - */ - public static SharedPulsarClient get(String componentId, ClientConfigurationData clientConf) - throws PulsarClientException { - AtomicReference<PulsarClientException> exception = new AtomicReference<PulsarClientException>(); - instances.computeIfAbsent(componentId, pulsarClient -> { - SharedPulsarClient sharedPulsarClient = null; - try { - sharedPulsarClient = new SharedPulsarClient(componentId, clientConf); - LOG.info("[{}] Created a new Pulsar Client.", componentId); - } catch (PulsarClientException e) { - exception.set(e); - } - return sharedPulsarClient; - }); - if (exception.get() != null) { - throw exception.get(); - } - return instances.get(componentId); - } - - public PulsarClientImpl getClient() { - counter.incrementAndGet(); - return client; - } - - public Consumer<byte[]> getSharedConsumer(ConsumerConfigurationData<byte[]> consumerConf) - throws PulsarClientException { - counter.incrementAndGet(); - synchronized (this) { - if (consumer == null) { - try { - consumer = client.subscribeAsync(consumerConf).join(); - } catch (CompletionException e) { - throw (PulsarClientException) e.getCause(); - } - LOG.info("[{}] Created a new Pulsar Consumer on {}", componentId, consumerConf.getSingleTopic()); - } else { - LOG.info("[{}] Using a shared consumer on {}", componentId, consumerConf.getSingleTopic()); - } - } - return consumer; - } - - public Reader<byte[]> getSharedReader(ReaderConfigurationData<byte[]> readerConf) throws PulsarClientException { - counter.incrementAndGet(); - synchronized (this) { - if (reader == null) { - try { - reader = client.createReaderAsync(readerConf).join(); - } catch (CompletionException e) { - throw (PulsarClientException) e.getCause(); - } - LOG.info("[{}] Created a new Pulsar reader on {}", componentId, readerConf.getTopicName()); - } else { - LOG.info("[{}] Using a shared reader on {}", componentId, readerConf.getTopicName()); - } - } - return reader; - } - - public Producer<byte[]> getSharedProducer(ProducerConfigurationData producerConf) throws PulsarClientException { - counter.incrementAndGet(); - synchronized (this) { - if (producer == null) { - try { - producer = client.createProducerAsync(producerConf).join(); - } catch (CompletionException e) { - throw (PulsarClientException) e.getCause(); - } - LOG.info("[{}] Created a new Pulsar Producer on {}", componentId, producerConf.getTopicName()); - } else { - LOG.info("[{}] Using a shared producer on {}", componentId, producerConf.getTopicName()); - } - } - return producer; - } - - public void close() throws PulsarClientException { - if (counter.decrementAndGet() <= 0) { - if (client != null) { - client.close(); - instances.remove(componentId); - LOG.info("[{}] Closed Pulsar Client", componentId); - } - } - } - -} diff --git a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java b/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java deleted file mode 100644 index d90591eeb..000000000 --- a/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.storm; - -import java.io.Serializable; - -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Tuple; - -public interface TupleToMessageMapper extends Serializable { - - /** - * Convert tuple to {@link org.apache.pulsar.client.api.Message}. - * - * @param tuple - tuple - * @return message - * @deprecated use {@link #toMessage(TypedMessageBuilder, Tuple)} - */ - @Deprecated - default Message<byte[]> toMessage(Tuple tuple) { - return null; - } - - /** - * Set the value on a message builder to prepare the message to be published from the Bolt. - * - * @param msgBuilder - message builder - * @param tuple - tuple - * @return message builder - */ - default TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) { - // Default implementation provided for backward compatibility - Message<byte[]> msg = toMessage(tuple); - msgBuilder.value(msg.getData()) - .properties(msg.getProperties()); - if (msg.hasKey()) { - msgBuilder.key(msg.getKey()); - } - return msgBuilder; - } - - - /** - * Declare the output schema for the bolt. - * - * @param declarer - output field declarer - */ - void declareOutputFields(OutputFieldsDeclarer declarer); -} diff --git a/external/storm-pulsar/src/main/javadoc/overview.html b/external/storm-pulsar/src/main/javadoc/overview.html deleted file mode 100644 index a1595eb33..000000000 --- a/external/storm-pulsar/src/main/javadoc/overview.html +++ /dev/null @@ -1,29 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN"> -<HTML> - <HEAD> - <TITLE>Pulsar Storm API Overview</TITLE> - </HEAD> - <BODY> - The Pulsar Storm API is a proprietary messaging API. - </BODY> -</HTML> diff --git a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java b/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java deleted file mode 100644 index 4355ad622..000000000 --- a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.storm; - -import java.util.Collection; -import java.util.List; - -import org.apache.storm.task.IOutputCollector; -import org.apache.storm.tuple.Tuple; - -public class MockOutputCollector implements IOutputCollector { - - private boolean acked = false; - private boolean failed = false; - private Throwable lastError = null; - private Tuple ackedTuple = null; - private int numTuplesAcked = 0; - - @Override - public void reportError(Throwable error) { - lastError = error; - } - - @Override - public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { - return null; - } - - @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { - } - - @Override - public void ack(Tuple input) { - acked = true; - failed = false; - ackedTuple = input; - ++numTuplesAcked; - } - - @Override - public void fail(Tuple input) { - failed = true; - acked = false; - } - - @Override - public void resetTimeout(Tuple tuple) { - - } - - public boolean acked() { - return acked; - } - - public boolean failed() { - return failed; - } - - public Throwable getLastError() { - return lastError; - } - - public Tuple getAckedTuple() { - return ackedTuple; - } - - public int getNumTuplesAcked() { - return numTuplesAcked; - } - - public void reset() { - acked = false; - failed = false; - lastError = null; - ackedTuple = null; - numTuplesAcked = 0; - } - - @Override - public void flush() { - // Nothing to flush from buffer - } - -} diff --git a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java b/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java deleted file mode 100644 index 98c8d2038..000000000 --- a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.storm; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.pulsar.client.api.Message; - -import org.apache.storm.spout.ISpoutOutputCollector; - -public class MockSpoutOutputCollector implements ISpoutOutputCollector { - - private boolean emitted = false; - private Message lastMessage = null; - private String data = null; - - @Override - public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { - emitted = true; - data = (String) tuple.get(0); - lastMessage = (Message) messageId; - return new ArrayList<Integer>(); - } - - @Override - public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { - emitted = true; - data = (String) tuple.get(0); - lastMessage = (Message) messageId; - } - - @Override - public long getPendingCount() { - return 0; - } - - @Override - public void reportError(Throwable error) { - } - - public boolean emitted() { - return emitted; - } - - public String getTupleData() { - return data; - } - - public Message getLastMessage() { - return lastMessage; - } - - public void reset() { - emitted = false; - data = null; - lastMessage = null; - } - - @Override - public void flush() { - // Nothing to flush from buffer - } -} diff --git a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/PulsarBoltIntegrationTest.java b/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/PulsarBoltIntegrationTest.java deleted file mode 100644 index 97cef7b7f..000000000 --- a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/PulsarBoltIntegrationTest.java +++ /dev/null @@ -1,243 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.storm; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Maps; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Tuple; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.powermock.core.classloader.annotations.PowerMockIgnore; - -@PowerMockIgnore({"javax.management.*", - "org.apache.http.conn.ssl.*", - "com.sun.management.internal.*", - "javax.net.ssl.*", - "com.sun.management.*", - "io.netty.*", - "org.slf4j.*", - "com.sun.org.apache.xerces.*"}) -public class PulsarBoltIntegrationTest extends ProducerConsumerBase { - - private static final int NO_OF_RETRIES = 10; - - public String serviceUrl; - public final String topic = "persistent://my-property/my-ns/my-topic1"; - public final String subscriptionName = "my-subscriber-name"; - - protected PulsarBoltConfiguration pulsarBoltConf; - protected PulsarBolt bolt; - protected MockOutputCollector mockCollector; - protected Consumer consumer; - - @Override - @Before - public void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - - serviceUrl = pulsar.getWebServiceAddress(); - - pulsarBoltConf = new PulsarBoltConfiguration(); - pulsarBoltConf.setServiceUrl(serviceUrl); - pulsarBoltConf.setTopic(topic); - pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper); - pulsarBoltConf.setMetricsTimeIntervalInSecs(60); - bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder()); - mockCollector = new MockOutputCollector(); - OutputCollector collector = new OutputCollector(mockCollector); - TopologyContext context = mock(TopologyContext.class); - when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName); - when(context.getThisTaskId()).thenReturn(0); - bolt.prepare(Maps.newHashMap(), context, collector); - consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName).subscribe(); - } - - @Override - @After - public void cleanup() throws Exception { - bolt.close(); - consumer.close(); - super.internalCleanup(); - } - - @SuppressWarnings("serial") - static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() { - - @Override - public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) { - if ("message to be dropped".equals(new String(tuple.getBinary(0)))) { - return null; - } - if ("throw exception".equals(new String(tuple.getBinary(0)))) { - throw new RuntimeException(); - } - return msgBuilder.value(tuple.getBinary(0)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - }; - - private Tuple getMockTuple(String msgContent) { - Tuple mockTuple = mock(Tuple.class); - when(mockTuple.getBinary(0)).thenReturn(msgContent.getBytes()); - when(mockTuple.getSourceComponent()).thenReturn(""); - when(mockTuple.getSourceStreamId()).thenReturn(""); - return mockTuple; - } - - @Test - public void testBasic() throws Exception { - String msgContent = "hello world"; - Tuple tuple = getMockTuple(msgContent); - bolt.execute(tuple); - for (int i = 0; i < NO_OF_RETRIES; i++) { - Thread.sleep(1000); - if (mockCollector.acked()) { - break; - } - } - Assert.assertTrue(mockCollector.acked()); - Assert.assertFalse(mockCollector.failed()); - Assert.assertNull(mockCollector.getLastError()); - Assert.assertEquals(tuple, mockCollector.getAckedTuple()); - Message msg = consumer.receive(5, TimeUnit.SECONDS); - consumer.acknowledge(msg); - Assert.assertEquals(msgContent, new String(msg.getData())); - } - - @Test - public void testExecuteFailure() throws Exception { - String msgContent = "throw exception"; - Tuple tuple = getMockTuple(msgContent); - bolt.execute(tuple); - Assert.assertFalse(mockCollector.acked()); - Assert.assertTrue(mockCollector.failed()); - Assert.assertNotNull(mockCollector.getLastError()); - } - - @Test - public void testNoMessageSend() throws Exception { - String msgContent = "message to be dropped"; - Tuple tuple = getMockTuple(msgContent); - bolt.execute(tuple); - Assert.assertTrue(mockCollector.acked()); - Message msg = consumer.receive(5, TimeUnit.SECONDS); - Assert.assertNull(msg); - } - - @Test - public void testMetrics() throws Exception { - bolt.resetMetrics(); - String msgContent = "hello world"; - Tuple tuple = getMockTuple(msgContent); - for (int i = 0; i < 10; i++) { - bolt.execute(tuple); - } - for (int i = 0; i < NO_OF_RETRIES; i++) { - Thread.sleep(1000); - if (mockCollector.getNumTuplesAcked() == 10) { - break; - } - } - @SuppressWarnings("rawtypes") - Map metrics = (Map) bolt.getValueAndReset(); - Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 10); - Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_RATE)).doubleValue(), - 10.0 / pulsarBoltConf.getMetricsTimeIntervalInSecs(), - 0.01d); - Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_THROUGHPUT_BYTES)).doubleValue(), - ((double) msgContent.getBytes().length * 10) / pulsarBoltConf.getMetricsTimeIntervalInSecs(), - 0.01d); - metrics = bolt.getMetrics(); - Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 0); - for (int i = 0; i < 10; i++) { - Message msg = consumer.receive(5, TimeUnit.SECONDS); - consumer.acknowledge(msg); - } - } - - @Test - public void testSharedProducer() throws Exception { - TopicStats topicStats = admin.topics().getStats(topic); - Assert.assertEquals(topicStats.getPublishers().size(), 1); - PulsarBolt otherBolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder()); - MockOutputCollector otherMockCollector = new MockOutputCollector(); - OutputCollector collector = new OutputCollector(otherMockCollector); - TopologyContext context = mock(TopologyContext.class); - when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName); - when(context.getThisTaskId()).thenReturn(1); - otherBolt.prepare(Maps.newHashMap(), context, collector); - - topicStats = admin.topics().getStats(topic); - Assert.assertEquals(1, topicStats.getPublishers().size()); - - otherBolt.close(); - - topicStats = admin.topics().getStats(topic); - Assert.assertEquals(1, topicStats.getPublishers().size()); - } - - @Test - public void testSerializability() throws Exception { - // test serializability with no auth - PulsarBolt boltWithNoAuth = new PulsarBolt(pulsarBoltConf, PulsarClient.builder()); - TestUtil.testSerializability(boltWithNoAuth); - } - - @Test - public void testFailedProducer() { - PulsarBoltConfiguration pulsarBoltConf = new PulsarBoltConfiguration(); - pulsarBoltConf.setServiceUrl(serviceUrl); - pulsarBoltConf.setTopic("persistent://invalid"); - pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper); - pulsarBoltConf.setMetricsTimeIntervalInSecs(60); - PulsarBolt bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder()); - MockOutputCollector mockCollector = new MockOutputCollector(); - OutputCollector collector = new OutputCollector(mockCollector); - TopologyContext context = mock(TopologyContext.class); - when(context.getThisComponentId()).thenReturn("new" + methodName); - when(context.getThisTaskId()).thenReturn(0); - try { - bolt.prepare(Maps.newHashMap(), context, collector); - Assert.fail("should have failed as producer creation failed"); - } catch (IllegalStateException ie) { - // Ok. - } - } - -} diff --git a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/PulsarSpoutIntegrationTest.java b/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/PulsarSpoutIntegrationTest.java deleted file mode 100644 index 153c3f405..000000000 --- a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/PulsarSpoutIntegrationTest.java +++ /dev/null @@ -1,354 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.storm; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Maps; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Values; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.powermock.core.classloader.annotations.PowerMockIgnore; - -@PowerMockIgnore({"javax.management.*", - "org.apache.http.conn.ssl.*", - "com.sun.management.internal.*", - "javax.net.ssl.*", - "com.sun.management.*", - "io.netty.*", - "org.slf4j.*", - "com.sun.org.apache.xerces.*"}) -public class PulsarSpoutIntegrationTest extends ProducerConsumerBase { - - public String serviceUrl; - public final String topic = "persistent://my-property/my-ns/my-topic1"; - public final String subscriptionName = "my-subscriber-name"; - - protected PulsarSpoutConfiguration pulsarSpoutConf; - protected PulsarSpout spout; - protected MockSpoutOutputCollector mockCollector; - protected Producer producer; - - @Override - @Before - public void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - - serviceUrl = pulsar.getWebServiceAddress(); - - pulsarSpoutConf = new PulsarSpoutConfiguration(); - pulsarSpoutConf.setServiceUrl(serviceUrl); - pulsarSpoutConf.setTopic(topic); - pulsarSpoutConf.setSubscriptionName(subscriptionName); - pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper); - pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS); - pulsarSpoutConf.setMaxFailedRetries(2); - pulsarSpoutConf.setSharedConsumerEnabled(true); - pulsarSpoutConf.setMetricsTimeIntervalInSecs(60); - pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared); - spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder()); - mockCollector = new MockSpoutOutputCollector(); - SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector); - TopologyContext context = mock(TopologyContext.class); - when(context.getThisComponentId()).thenReturn("test-spout-" + methodName); - when(context.getThisTaskId()).thenReturn(0); - spout.open(Maps.newHashMap(), context, collector); - producer = pulsarClient.newProducer().topic(topic).create(); - } - - @Override - @After - public void cleanup() throws Exception { - producer.close(); - spout.close(); - super.internalCleanup(); - } - - @SuppressWarnings("serial") - public static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() { - - @Override - public Values toValues(Message msg) { - if ("message to be dropped".equals(new String(msg.getData()))) { - return null; - } - return new Values(new String(msg.getData())); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - }; - - @Test - public void testBasic() throws Exception { - String msgContent = "hello world"; - producer.send(msgContent.getBytes()); - spout.nextTuple(); - Assert.assertTrue(mockCollector.emitted()); - Assert.assertEquals(mockCollector.getTupleData(), msgContent); - spout.ack(mockCollector.getLastMessage()); - } - - @Test - public void testRedeliverOnFail() throws Exception { - String msgContent = "hello world"; - producer.send(msgContent.getBytes()); - spout.nextTuple(); - spout.fail(mockCollector.getLastMessage()); - mockCollector.reset(); - Thread.sleep(150); - spout.nextTuple(); - Assert.assertTrue(mockCollector.emitted()); - Assert.assertEquals(mockCollector.getTupleData(), msgContent); - spout.ack(mockCollector.getLastMessage()); - } - - @Test - public void testNoRedeliverOnAck() throws Exception { - String msgContent = "hello world"; - producer.send(msgContent.getBytes()); - spout.nextTuple(); - spout.ack(mockCollector.getLastMessage()); - mockCollector.reset(); - spout.nextTuple(); - Assert.assertFalse(mockCollector.emitted()); - Assert.assertNull(mockCollector.getTupleData()); - } - - @Test - public void testLimitedRedeliveriesOnTimeout() throws Exception { - String msgContent = "chuck norris"; - producer.send(msgContent.getBytes()); - - long startTime = System.currentTimeMillis(); - while (startTime + pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.MILLISECONDS) > System - .currentTimeMillis()) { - mockCollector.reset(); - spout.nextTuple(); - Assert.assertTrue(mockCollector.emitted()); - Assert.assertEquals(mockCollector.getTupleData(), msgContent); - spout.fail(mockCollector.getLastMessage()); - // wait to avoid backoff - Thread.sleep(500); - } - spout.nextTuple(); - spout.fail(mockCollector.getLastMessage()); - mockCollector.reset(); - Thread.sleep(500); - spout.nextTuple(); - Assert.assertFalse(mockCollector.emitted()); - Assert.assertNull(mockCollector.getTupleData()); - } - - @Test - public void testLimitedRedeliveriesOnCount() throws Exception { - String msgContent = "hello world"; - producer.send(msgContent.getBytes()); - - spout.nextTuple(); - Assert.assertTrue(mockCollector.emitted()); - Assert.assertEquals(mockCollector.getTupleData(), msgContent); - spout.fail(mockCollector.getLastMessage()); - - mockCollector.reset(); - Thread.sleep(150); - - spout.nextTuple(); - Assert.assertTrue(mockCollector.emitted()); - Assert.assertEquals(mockCollector.getTupleData(), msgContent); - spout.fail(mockCollector.getLastMessage()); - - mockCollector.reset(); - Thread.sleep(300); - - spout.nextTuple(); - Assert.assertTrue(mockCollector.emitted()); - Assert.assertEquals(mockCollector.getTupleData(), msgContent); - spout.fail(mockCollector.getLastMessage()); - - mockCollector.reset(); - Thread.sleep(500); - spout.nextTuple(); - Assert.assertFalse(mockCollector.emitted()); - Assert.assertNull(mockCollector.getTupleData()); - } - - @Test - public void testBackoffOnRetry() throws Exception { - String msgContent = "chuck norris"; - producer.send(msgContent.getBytes()); - spout.nextTuple(); - spout.fail(mockCollector.getLastMessage()); - mockCollector.reset(); - // due to backoff we should not get the message again immediately - spout.nextTuple(); - Assert.assertFalse(mockCollector.emitted()); - Assert.assertNull(mockCollector.getTupleData()); - Thread.sleep(100); - spout.nextTuple(); - Assert.assertTrue(mockCollector.emitted()); - Assert.assertEquals(mockCollector.getTupleData(), msgContent); - spout.ack(mockCollector.getLastMessage()); - } - - @Test - public void testMessageDrop() throws Exception { - String msgContent = "message to be dropped"; - producer.send(msgContent.getBytes()); - spout.nextTuple(); - Assert.assertFalse(mockCollector.emitted()); - Assert.assertNull(mockCollector.getTupleData()); - } - - @SuppressWarnings({ "rawtypes" }) - @Test - public void testMetrics() throws Exception { - spout.resetMetrics(); - String msgContent = "hello world"; - producer.send(msgContent.getBytes()); - spout.nextTuple(); - Map metrics = spout.getMetrics(); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1); - Assert.assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_RATE)).doubleValue(), - 1.0 / pulsarSpoutConf.getMetricsTimeIntervalInSecs(), - 0.01d); - Assert.assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_THROUGHPUT_BYTES)).doubleValue(), - ((double) msgContent.getBytes().length) / pulsarSpoutConf.getMetricsTimeIntervalInSecs(), - 0.01d); - spout.fail(mockCollector.getLastMessage()); - metrics = spout.getMetrics(); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0); - Thread.sleep(150); - spout.nextTuple(); - metrics = spout.getMetrics(); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1); - spout.ack(mockCollector.getLastMessage()); - metrics = (Map) spout.getValueAndReset(); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0); - Assert.assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0); - } - - @Test - public void testSharedConsumer() throws Exception { - TopicStats topicStats = admin.topics().getStats(topic); - Assert.assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1); - PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder()); - MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector(); - SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector); - TopologyContext context = mock(TopologyContext.class); - when(context.getThisComponentId()).thenReturn("test-spout-" + methodName); - when(context.getThisTaskId()).thenReturn(1); - - topicStats = admin.topics().getStats(topic); - Assert.assertEquals(1, topicStats.getSubscriptions().get(subscriptionName).getConsumers().size()); - - otherSpout.open(Maps.newHashMap(), context, collector); - - topicStats = admin.topics().getStats(topic); - Assert.assertEquals(1, topicStats.getSubscriptions().get(subscriptionName).getConsumers().size()); - - otherSpout.close(); - - topicStats = admin.topics().getStats(topic); - Assert.assertEquals(1, topicStats.getSubscriptions().get(subscriptionName).getConsumers().size()); - } - - @Test - public void testNoSharedConsumer() throws Exception { - TopicStats topicStats = admin.topics().getStats(topic); - Assert.assertEquals(topicStats.getSubscriptions().get(subscriptionName).getConsumers().size(), 1); - pulsarSpoutConf.setSharedConsumerEnabled(false); - PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder()); - MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector(); - SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector); - TopologyContext context = mock(TopologyContext.class); - when(context.getThisComponentId()).thenReturn("test-spout-" + methodName); - when(context.getThisTaskId()).thenReturn(1); - otherSpout.open(Maps.newHashMap(), context, collector); - - topicStats = admin.topics().getStats(topic); - Assert.assertEquals(2, topicStats.getSubscriptions().get(subscriptionName).getConsumers().size()); - - otherSpout.close(); - - topicStats = admin.topics().getStats(topic); - Assert.assertEquals(1, topicStats.getSubscriptions().get(subscriptionName).getConsumers().size()); - } - - @Test - public void testSerializability() throws Exception { - // test serializability with no auth - PulsarSpout spoutWithNoAuth = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder()); - TestUtil.testSerializability(spoutWithNoAuth); - } - - @Test - public void testFailedConsumer() { - PulsarSpoutConfiguration pulsarSpoutConf = new PulsarSpoutConfiguration(); - pulsarSpoutConf.setServiceUrl(serviceUrl); - pulsarSpoutConf.setTopic("persistent://invalidTopic"); - pulsarSpoutConf.setSubscriptionName(subscriptionName); - pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper); - pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS); - pulsarSpoutConf.setMaxFailedRetries(2); - pulsarSpoutConf.setSharedConsumerEnabled(false); - pulsarSpoutConf.setMetricsTimeIntervalInSecs(60); - pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared); - PulsarSpout spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder()); - MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector(); - SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector); - TopologyContext context = mock(TopologyContext.class); - when(context.getThisComponentId()).thenReturn("new-test" + methodName); - when(context.getThisTaskId()).thenReturn(0); - try { - spout.open(Maps.newHashMap(), context, collector); - Assert.fail("should have failed as consumer creation failed"); - } catch (IllegalStateException e) { - // Ok - } - } -} diff --git a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java deleted file mode 100644 index 0f343f92c..000000000 --- a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.storm; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.lang.reflect.Field; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.ClientBuilderImpl; -import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.common.api.proto.MessageMetadata; -import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Values; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -import com.google.common.collect.Maps; - -public class PulsarSpoutTest { - - @Test - public void testAckFailedMessage() throws Exception { - - PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration(); - conf.setServiceUrl("http://localhost:8080"); - conf.setSubscriptionName("sub1"); - conf.setTopic("persistent://prop/ns1/topic1"); - conf.setSubscriptionType(SubscriptionType.Exclusive); - conf.setMessageToValuesMapper(new MessageToValuesMapper() { - @Override - public Values toValues(Message<byte[]> msg) { - return null; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - - }); - - ClientBuilder builder = spy(new ClientBuilderImpl()); - PulsarSpout spout = spy(new PulsarSpout(conf, builder)); - - Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), - new byte[0], Schema.BYTES, new MessageMetadata()); - Consumer<byte[]> consumer = mock(Consumer.class); - SpoutConsumer spoutConsumer = new SpoutConsumer(consumer); - CompletableFuture<Void> future = new CompletableFuture<>(); - future.complete(null); - doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId()); - Field consField = PulsarSpout.class.getDeclaredField("consumer"); - consField.setAccessible(true); - consField.set(spout, spoutConsumer); - - spout.fail(msg); - spout.ack(msg); - spout.emitNextAvailableTuple(); - verify(consumer, atLeast(1)).receive(anyInt(), any()); - } - - @Test - public void testPulsarTuple() throws Exception { - testPulsarSpout(true); - } - - @Test - public void testPulsarSpout() throws Exception { - testPulsarSpout(false); - } - - public void testPulsarSpout(boolean pulsarTuple) throws Exception { - PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration(); - conf.setServiceUrl("http://localhost:8080"); - conf.setSubscriptionName("sub1"); - conf.setTopic("persistent://prop/ns1/topic1"); - conf.setSubscriptionType(SubscriptionType.Exclusive); - conf.setSharedConsumerEnabled(true); - AtomicBoolean called = new AtomicBoolean(false); - conf.setMessageToValuesMapper(new MessageToValuesMapper() { - @Override - public Values toValues(Message<byte[]> msg) { - called.set(true); - if ("message to be dropped".equals(new String(msg.getData()))) { - return null; - } - String val = new String(msg.getData()); - if (val.startsWith("stream:")) { - String stream = val.split(":")[1]; - return new PulsarTuple(stream, val); - } - return new Values(val); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - - }); - - String msgContent = pulsarTuple ? "stream:pstream" : "test"; - - ClientBuilder builder = spy(new ClientBuilderImpl()); - PulsarSpout spout = spy(new PulsarSpout(conf, builder)); - TopologyContext context = mock(TopologyContext.class); - final String componentId = "test-component-id"; - doReturn(componentId).when(context).getThisComponentId(); - SpoutOutputCollector collector = mock(SpoutOutputCollector.class); - Map config = new HashMap<>(); - Field field = SharedPulsarClient.class.getDeclaredField("instances"); - field.setAccessible(true); - ConcurrentMap<String, SharedPulsarClient> instances = (ConcurrentMap<String, SharedPulsarClient>) field - .get(SharedPulsarClient.class); - - SharedPulsarClient client = mock(SharedPulsarClient.class); - Consumer<byte[]> consumer = mock(Consumer.class); - when(client.getSharedConsumer(any())).thenReturn(consumer); - instances.put(componentId, client); - - Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), - msgContent.getBytes(), Schema.BYTES, new MessageMetadata()); - when(consumer.receive(anyInt(), any())).thenReturn(msg); - - spout.open(config, context, collector); - spout.emitNextAvailableTuple(); - - assertTrue(called.get()); - verify(consumer, atLeast(1)).receive(anyInt(), any()); - ArgumentCaptor<Values> capt = ArgumentCaptor.forClass(Values.class); - if (pulsarTuple) { - verify(collector, times(1)).emit(eq("pstream"), capt.capture(), eq(msg)); - } else { - verify(collector, times(1)).emit(capt.capture(), eq(msg)); - } - Values vals = capt.getValue(); - assertEquals(msgContent, vals.get(0)); - } - -} diff --git a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/TestUtil.java b/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/TestUtil.java deleted file mode 100644 index e01f96968..000000000 --- a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/TestUtil.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.storm; - -import org.junit.Assert; - -import java.io.ByteArrayOutputStream; -import java.io.ObjectOutputStream; - -public class TestUtil { - - public static void testSerializability(Object object) throws Exception { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(out); - oos.writeObject(object); - oos.close(); - Assert.assertTrue(out.toByteArray().length > 0); - } -} diff --git a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/example/StormExample.java b/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/example/StormExample.java deleted file mode 100644 index 93404ea77..000000000 --- a/external/storm-pulsar/src/test/java/org/apache/pulsar/storm/example/StormExample.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.storm.example; - -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.apache.pulsar.storm.MessageToValuesMapper; -import org.apache.pulsar.storm.PulsarBolt; -import org.apache.pulsar.storm.PulsarBoltConfiguration; -import org.apache.pulsar.storm.PulsarSpout; -import org.apache.pulsar.storm.PulsarSpoutConfiguration; -import org.apache.pulsar.storm.TupleToMessageMapper; -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.metric.api.IMetricsConsumer; -import org.apache.storm.task.IErrorReporter; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class StormExample { - private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class); - private static final String serviceUrl = "http://broker-pdev.messaging.corp.usw.example.com:8080"; - - @SuppressWarnings("serial") - static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() { - - @Override - public Values toValues(Message msg) { - return new Values(new String(msg.getData())); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - // declare the output fields - declarer.declare(new Fields("string")); - } - }; - - @SuppressWarnings("serial") - static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() { - - @Override - public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) { - String receivedMessage = tuple.getString(0); - // message processing - String processedMsg = receivedMessage + "-processed"; - return msgBuilder.value(processedMsg.getBytes()); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - // declare the output fields - } - }; - - public static void main(String[] args) throws Exception { - // String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication"; - // String authParams = "key1:val1,key2:val2"; - // clientConf.setAuthentication(authPluginClassName, authParams); - - String topic1 = "persistent://my-property/use/my-ns/my-topic1"; - String topic2 = "persistent://my-property/use/my-ns/my-topic2"; - String subscriptionName1 = "my-subscriber-name1"; - String subscriptionName2 = "my-subscriber-name2"; - - // create spout - PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration(); - spoutConf.setServiceUrl(serviceUrl); - spoutConf.setTopic(topic1); - spoutConf.setSubscriptionName(subscriptionName1); - spoutConf.setMessageToValuesMapper(messageToValuesMapper); - PulsarSpout spout = new PulsarSpout(spoutConf, PulsarClient.builder()); - - // create bolt - PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration(); - boltConf.setServiceUrl(serviceUrl); - boltConf.setTopic(topic2); - boltConf.setTupleToMessageMapper(tupleToMessageMapper); - PulsarBolt bolt = new PulsarBolt(boltConf, PulsarClient.builder()); - - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("testSpout", spout); - builder.setBolt("testBolt", bolt).shuffleGrouping("testSpout"); - - Config conf = new Config(); - conf.setNumWorkers(2); - conf.setDebug(true); - conf.registerMetricsConsumer(PulsarMetricsConsumer.class); - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", conf, builder.createTopology()); - Utils.sleep(10000); - - PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build(); - // create a consumer on topic2 to receive messages from the bolt when the processing is done - Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic2).subscriptionName(subscriptionName2).subscribe(); - // create a producer on topic1 to send messages that will be received by the spout - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic1).create(); - - for (int i = 0; i < 10; i++) { - String msg = "msg-" + i; - producer.send(msg.getBytes()); - LOG.info("Message {} sent", msg); - } - Message<byte[]> msg = null; - for (int i = 0; i < 10; i++) { - msg = consumer.receive(1, TimeUnit.SECONDS); - LOG.info("Message {} received", new String(msg.getData())); - } - cluster.killTopology("test"); - cluster.shutdown(); - - } - - class PulsarMetricsConsumer implements IMetricsConsumer { - - @Override - public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, - IErrorReporter errorReporter) { - } - - @Override - public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { - // The collection will contain metrics for all the spouts/bolts that register the metrics in the topology. - // The name for the Pulsar Spout is "PulsarSpoutMetrics-{componentId}-{taskIndex}" and for the Pulsar Bolt - // is - // "PulsarBoltMetrics-{componentId}-{taskIndex}". - } - - @Override - public void cleanup() { - } - - } -}
