This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6fd7406 Remove guava usage in pulsar-storm (#2898)
6fd7406 is described below
commit 6fd7406a4de061235ac718bb199f5e2f36fb2cc9
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Nov 7 13:34:12 2018 -0800
Remove guava usage in pulsar-storm (#2898)
---
pulsar-storm/pom.xml | 34 ---------------------
.../java/org/apache/pulsar/storm/PulsarBolt.java | 19 ++++++------
.../pulsar/storm/PulsarBoltConfiguration.java | 4 +--
.../java/org/apache/pulsar/storm/PulsarSpout.java | 35 +++++++++++-----------
.../pulsar/storm/PulsarSpoutConfiguration.java | 5 ++--
.../apache/pulsar/storm/SharedPulsarClient.java | 5 ++--
6 files changed, 32 insertions(+), 70 deletions(-)
diff --git a/pulsar-storm/pom.xml b/pulsar-storm/pom.xml
index a025fdf..93a58cc 100644
--- a/pulsar-storm/pom.xml
+++ b/pulsar-storm/pom.xml
@@ -63,11 +63,6 @@
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
@@ -109,34 +104,5 @@
<filtering>true</filtering>
</resource>
</resources>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <createDependencyReducedPom>true</createDependencyReducedPom>
-
<promoteTransitiveDependencies>false</promoteTransitiveDependencies>
- <artifactSet>
- <includes>
- <include>com.google.guava:guava</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>com.google</pattern>
- <shadedPattern>pulsar-storm-shade.com.google</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
</build>
</project>
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index 0aa1ee3..bc95e31 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.storm;
import static java.lang.String.format;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.pulsar.client.api.ClientBuilder;
@@ -42,9 +44,6 @@ import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
@SuppressWarnings("deprecation")
public class PulsarBolt extends BaseRichBolt implements IMetric {
/**
@@ -60,7 +59,7 @@ public class PulsarBolt extends BaseRichBolt implements
IMetric {
private final ClientConfigurationData clientConf;
private final ProducerConfigurationData producerConf;
private final PulsarBoltConfiguration pulsarBoltConf;
- private final ConcurrentMap<String, Object> metricsMap =
Maps.newConcurrentMap();
+ private final ConcurrentMap<String, Object> metricsMap = new
ConcurrentHashMap<>();
private SharedPulsarClient sharedPulsarClient;
private String componentId;
@@ -73,9 +72,9 @@ public class PulsarBolt extends BaseRichBolt implements
IMetric {
public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder
clientBuilder) {
this.clientConf = ((ClientBuilderImpl)
clientBuilder).getClientConfigurationData().clone();
this.producerConf = new ProducerConfigurationData();
- Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
- Preconditions.checkNotNull(pulsarBoltConf.getTopic());
- Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+ Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
+ Objects.requireNonNull(pulsarBoltConf.getTopic());
+ Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
this.producerConf.setTopicName(pulsarBoltConf.getTopic());
@@ -98,9 +97,9 @@ public class PulsarBolt extends BaseRichBolt implements
IMetric {
ProducerConfiguration producerConf) {
this.clientConf = clientConf.getConfigurationData().clone();
this.producerConf =
producerConf.getProducerConfigurationData().clone();
- Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
- Preconditions.checkNotNull(pulsarBoltConf.getTopic());
- Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+ Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
+ Objects.requireNonNull(pulsarBoltConf.getTopic());
+ Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
this.producerConf.setTopicName(pulsarBoltConf.getTopic());
this.pulsarBoltConf = pulsarBoltConf;
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
index a67ac2c..714e435 100644
---
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
+++
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.storm;
-import com.google.common.base.Preconditions;
+import java.util.Objects;
/**
* Class used to specify Pulsar bolt configuration
@@ -51,7 +51,7 @@ public class PulsarBoltConfiguration extends
PulsarStormConfiguration {
* @param mapper
*/
public void setTupleToMessageMapper(TupleToMessageMapper mapper) {
- this.tupleToMessageMapper = Preconditions.checkNotNull(mapper);
+ this.tupleToMessageMapper = Objects.requireNonNull(mapper);
}
}
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index af26035..5df0804 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -20,9 +20,13 @@ package org.apache.pulsar.storm;
import static java.lang.String.format;
+import java.util.Collections;
import java.util.Map;
+import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -38,7 +42,6 @@ import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -48,10 +51,6 @@ import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-
@SuppressWarnings("deprecation")
public class PulsarSpout extends BaseRichSpout implements IMetric {
@@ -70,9 +69,9 @@ public class PulsarSpout extends BaseRichSpout implements
IMetric {
private final PulsarSpoutConfiguration pulsarSpoutConf;
private final long failedRetriesTimeoutNano;
private final int maxFailedRetries;
- private final ConcurrentMap<MessageId, MessageRetries>
pendingMessageRetries = Maps.newConcurrentMap();
- private final Queue<Message<byte[]>> failedMessages =
Queues.newConcurrentLinkedQueue();
- private final ConcurrentMap<String, Object> metricsMap =
Maps.newConcurrentMap();
+ private final ConcurrentMap<MessageId, MessageRetries>
pendingMessageRetries = new ConcurrentHashMap<>();
+ private final Queue<Message<byte[]>> failedMessages = new
ConcurrentLinkedQueue<>();
+ private final ConcurrentMap<String, Object> metricsMap = new
ConcurrentHashMap<>();
private SharedPulsarClient sharedPulsarClient;
private String componentId;
@@ -85,15 +84,15 @@ public class PulsarSpout extends BaseRichSpout implements
IMetric {
private volatile long messageSizeReceived = 0;
public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder
clientBuilder) {
- Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
- Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
- Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
- Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+ Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
+ Objects.requireNonNull(pulsarSpoutConf.getTopic());
+ Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
+ Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
this.clientConf = ((ClientBuilderImpl)
clientBuilder).getClientConfigurationData().clone();
this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
this.consumerConf = new ConsumerConfigurationData<>();
-
this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+
this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
this.pulsarSpoutConf = pulsarSpoutConf;
@@ -111,13 +110,13 @@ public class PulsarSpout extends BaseRichSpout implements
IMetric {
ConsumerConfiguration consumerConf) {
this.clientConf = clientConf.getConfigurationData().clone();
this.consumerConf = consumerConf.getConfigurationData().clone();
- Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
- Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
- Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
- Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+ Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
+ Objects.requireNonNull(pulsarSpoutConf.getTopic());
+ Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
+ Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
-
this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+
this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
this.pulsarSpoutConf = pulsarSpoutConf;
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
index 79e15d2..7582d74 100644
---
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
+++
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
@@ -18,10 +18,9 @@
*/
package org.apache.pulsar.storm;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Preconditions;
-
/**
* Class used to specify pulsar spout configuration
*
@@ -75,7 +74,7 @@ public class PulsarSpoutConfiguration extends
PulsarStormConfiguration {
* @param mapper
*/
public void setMessageToValuesMapper(MessageToValuesMapper mapper) {
- this.messageToValuesMapper = Preconditions.checkNotNull(mapper);
+ this.messageToValuesMapper = Objects.requireNonNull(mapper);
}
/**
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
index 4506e11..d07903e 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.storm;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,11 +34,9 @@ import
org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-
public class SharedPulsarClient {
private static final Logger LOG =
LoggerFactory.getLogger(SharedPulsarClient.class);
- private static final ConcurrentMap<String, SharedPulsarClient> instances =
Maps.newConcurrentMap();
+ private static final ConcurrentMap<String, SharedPulsarClient> instances =
new ConcurrentHashMap<>();
private final String componentId;
private final PulsarClientImpl client;