merlimat closed pull request #2898: Remove guava usage in pulsar-storm
URL: https://github.com/apache/pulsar/pull/2898
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/pulsar-storm/pom.xml b/pulsar-storm/pom.xml
index a025fdf4ad..93a58cc95f 100644
--- a/pulsar-storm/pom.xml
+++ b/pulsar-storm/pom.xml
@@ -62,11 +62,6 @@
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -109,34 +104,5 @@
<filtering>true</filtering>
</resource>
</resources>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <createDependencyReducedPom>true</createDependencyReducedPom>
-
<promoteTransitiveDependencies>false</promoteTransitiveDependencies>
- <artifactSet>
- <includes>
- <include>com.google.guava:guava</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>com.google</pattern>
- <shadedPattern>pulsar-storm-shade.com.google</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
</build>
</project>
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index 0aa1ee35fc..bc95e31261 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -21,6 +21,8 @@
import static java.lang.String.format;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.pulsar.client.api.ClientBuilder;
@@ -42,9 +44,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
@SuppressWarnings("deprecation")
public class PulsarBolt extends BaseRichBolt implements IMetric {
/**
@@ -60,7 +59,7 @@
private final ClientConfigurationData clientConf;
private final ProducerConfigurationData producerConf;
private final PulsarBoltConfiguration pulsarBoltConf;
- private final ConcurrentMap<String, Object> metricsMap =
Maps.newConcurrentMap();
+ private final ConcurrentMap<String, Object> metricsMap = new
ConcurrentHashMap<>();
private SharedPulsarClient sharedPulsarClient;
private String componentId;
@@ -73,9 +72,9 @@
public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder
clientBuilder) {
this.clientConf = ((ClientBuilderImpl)
clientBuilder).getClientConfigurationData().clone();
this.producerConf = new ProducerConfigurationData();
- Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
- Preconditions.checkNotNull(pulsarBoltConf.getTopic());
- Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+ Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
+ Objects.requireNonNull(pulsarBoltConf.getTopic());
+ Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
this.producerConf.setTopicName(pulsarBoltConf.getTopic());
@@ -98,9 +97,9 @@ public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf,
ClientConfiguration cl
ProducerConfiguration producerConf) {
this.clientConf = clientConf.getConfigurationData().clone();
this.producerConf =
producerConf.getProducerConfigurationData().clone();
- Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
- Preconditions.checkNotNull(pulsarBoltConf.getTopic());
- Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
+ Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
+ Objects.requireNonNull(pulsarBoltConf.getTopic());
+ Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
this.producerConf.setTopicName(pulsarBoltConf.getTopic());
this.pulsarBoltConf = pulsarBoltConf;
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
index a67ac2cf50..714e4351cd 100644
---
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
+++
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBoltConfiguration.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.storm;
-import com.google.common.base.Preconditions;
+import java.util.Objects;
/**
* Class used to specify Pulsar bolt configuration
@@ -51,7 +51,7 @@ public TupleToMessageMapper getTupleToMessageMapper() {
* @param mapper
*/
public void setTupleToMessageMapper(TupleToMessageMapper mapper) {
- this.tupleToMessageMapper = Preconditions.checkNotNull(mapper);
+ this.tupleToMessageMapper = Objects.requireNonNull(mapper);
}
}
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index af26035ed8..5df0804233 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -20,9 +20,13 @@
import static java.lang.String.format;
+import java.util.Collections;
import java.util.Map;
+import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -38,7 +42,6 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -48,10 +51,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-
@SuppressWarnings("deprecation")
public class PulsarSpout extends BaseRichSpout implements IMetric {
@@ -70,9 +69,9 @@
private final PulsarSpoutConfiguration pulsarSpoutConf;
private final long failedRetriesTimeoutNano;
private final int maxFailedRetries;
- private final ConcurrentMap<MessageId, MessageRetries>
pendingMessageRetries = Maps.newConcurrentMap();
- private final Queue<Message<byte[]>> failedMessages =
Queues.newConcurrentLinkedQueue();
- private final ConcurrentMap<String, Object> metricsMap =
Maps.newConcurrentMap();
+ private final ConcurrentMap<MessageId, MessageRetries>
pendingMessageRetries = new ConcurrentHashMap<>();
+ private final Queue<Message<byte[]>> failedMessages = new
ConcurrentLinkedQueue<>();
+ private final ConcurrentMap<String, Object> metricsMap = new
ConcurrentHashMap<>();
private SharedPulsarClient sharedPulsarClient;
private String componentId;
@@ -85,15 +84,15 @@
private volatile long messageSizeReceived = 0;
public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder
clientBuilder) {
- Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
- Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
- Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
- Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+ Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
+ Objects.requireNonNull(pulsarSpoutConf.getTopic());
+ Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
+ Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
this.clientConf = ((ClientBuilderImpl)
clientBuilder).getClientConfigurationData().clone();
this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
this.consumerConf = new ConsumerConfigurationData<>();
-
this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+
this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
this.pulsarSpoutConf = pulsarSpoutConf;
@@ -111,13 +110,13 @@ public PulsarSpout(PulsarSpoutConfiguration
pulsarSpoutConf, ClientConfiguration
ConsumerConfiguration consumerConf) {
this.clientConf = clientConf.getConfigurationData().clone();
this.consumerConf = consumerConf.getConfigurationData().clone();
- Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
- Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
- Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
- Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
+ Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
+ Objects.requireNonNull(pulsarSpoutConf.getTopic());
+ Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
+ Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
-
this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic()));
+
this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
this.pulsarSpoutConf = pulsarSpoutConf;
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
index 79e15d2d7f..7582d74c0e 100644
---
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
+++
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
@@ -18,10 +18,9 @@
*/
package org.apache.pulsar.storm;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Preconditions;
-
/**
* Class used to specify pulsar spout configuration
*
@@ -75,7 +74,7 @@ public MessageToValuesMapper getMessageToValuesMapper() {
* @param mapper
*/
public void setMessageToValuesMapper(MessageToValuesMapper mapper) {
- this.messageToValuesMapper = Preconditions.checkNotNull(mapper);
+ this.messageToValuesMapper = Objects.requireNonNull(mapper);
}
/**
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
index 4506e11a7b..d07903ef74 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.storm;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,11 +34,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-
public class SharedPulsarClient {
private static final Logger LOG =
LoggerFactory.getLogger(SharedPulsarClient.class);
- private static final ConcurrentMap<String, SharedPulsarClient> instances =
Maps.newConcurrentMap();
+ private static final ConcurrentMap<String, SharedPulsarClient> instances =
new ConcurrentHashMap<>();
private final String componentId;
private final PulsarClientImpl client;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services