This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 01e3074 Replace map with set (#8051)
01e3074 is described below
commit 01e30741ec05ac0c582f9bb025bd71ac3d1f32eb
Author: Tboy <[email protected]>
AuthorDate: Tue Sep 22 07:52:55 2020 +0800
Replace map with set (#8051)
Fix #8050 .
---
.../client/impl/BrokerClientIntegrationTest.java | 3 +-
.../pulsar/client/impl/PulsarClientImpl.java | 57 ++++++++--------------
2 files changed, 20 insertions(+), 40 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 528dee7..afee17b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -38,7 +38,6 @@ import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
-import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -767,7 +766,7 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
Field prodField = PulsarClientImpl.class.getDeclaredField("producers");
prodField.setAccessible(true);
@SuppressWarnings("unchecked")
- IdentityHashMap<ProducerBase<byte[]>, Boolean> producers =
(IdentityHashMap<ProducerBase<byte[]>, Boolean>) prodField
+ Set<ProducerBase<byte[]>> producers = (Set<ProducerBase<byte[]>>)
prodField
.get(pulsarClient);
assertTrue(producers.isEmpty());
pulsarClient.close();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c33eb9b..cbdcb27 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -25,7 +25,6 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
@@ -35,10 +34,11 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -102,8 +102,8 @@ public class PulsarClientImpl implements PulsarClient {
}
private AtomicReference<State> state = new AtomicReference<>();
- private final IdentityHashMap<ProducerBase<?>, Boolean> producers;
- private final IdentityHashMap<ConsumerBase<?>, Boolean> consumers;
+ private final Set<ProducerBase<?>> producers;
+ private final Set<ConsumerBase<?>> consumers;
private final AtomicLong producerIdGenerator = new AtomicLong();
private final AtomicLong consumerIdGenerator = new AtomicLong();
@@ -151,8 +151,8 @@ public class PulsarClientImpl implements PulsarClient {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(),
conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
}
timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1,
TimeUnit.MILLISECONDS);
- producers = Maps.newIdentityHashMap();
- consumers = Maps.newIdentityHashMap();
+ producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
if (conf.isEnableTransaction()) {
tcClient = new TransactionCoordinatorClientImpl(this);
@@ -292,10 +292,8 @@ public class PulsarClientImpl implements PulsarClient {
} else {
producer = new ProducerImpl<>(PulsarClientImpl.this, topic,
conf, producerCreatedFuture, -1, schema, interceptors);
}
-
- synchronized (producers) {
- producers.put(producer, Boolean.TRUE);
- }
+
+ producers.add(producer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata: {}",
topic, ex.getMessage());
producerCreatedFuture.completeExceptionally(ex);
@@ -384,10 +382,8 @@ public class PulsarClientImpl implements PulsarClient {
consumerSubscribedFuture,null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
}
-
- synchronized (consumers) {
- consumers.put(consumer, Boolean.TRUE);
- }
+
+ consumers.add(consumer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata", topic,
ex);
consumerSubscribedFuture.completeExceptionally(ex);
@@ -403,10 +399,8 @@ public class PulsarClientImpl implements PulsarClient {
ConsumerBase<T> consumer = new
MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
externalExecutorProvider.getExecutor(),
consumerSubscribedFuture, schema, interceptors,
true /* createTopicIfDoesNotExist */);
-
- synchronized (consumers) {
- consumers.put(consumer, Boolean.TRUE);
- }
+
+ consumers.add(consumer);
return consumerSubscribedFuture;
}
@@ -439,10 +433,8 @@ public class PulsarClientImpl implements PulsarClient {
externalExecutorProvider.getExecutor(),
consumerSubscribedFuture,
schema, subscriptionMode, interceptors);
-
- synchronized (consumers) {
- consumers.put(consumer, Boolean.TRUE);
- }
+
+ consumers.add(consumer);
})
.exceptionally(ex -> {
log.warn("[{}] Failed to get topics under namespace",
namespaceName);
@@ -513,10 +505,8 @@ public class PulsarClientImpl implements PulsarClient {
// gets the next single threaded executor from the list of
executors
ExecutorService listenerThread =
externalExecutorProvider.getExecutor();
ReaderImpl<T> reader = new ReaderImpl<>(PulsarClientImpl.this,
conf, listenerThread, consumerSubscribedFuture, schema);
-
- synchronized (consumers) {
- consumers.put(reader.getConsumer(), Boolean.TRUE);
- }
+
+ consumers.add(reader.getConsumer());
consumerSubscribedFuture.thenRun(() -> {
readerFuture.complete(reader);
@@ -569,18 +559,9 @@ public class PulsarClientImpl implements PulsarClient {
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futures = Lists.newArrayList();
-
- synchronized (producers) {
- // Copy to a new list, because the closing will trigger a removal
from the map
- // and invalidate the iterator
- List<ProducerBase<?>> producersToClose =
Lists.newArrayList(producers.keySet());
- producersToClose.forEach(p -> futures.add(p.closeAsync()));
- }
-
- synchronized (consumers) {
- List<ConsumerBase<?>> consumersToClose =
Lists.newArrayList(consumers.keySet());
- consumersToClose.forEach(c -> futures.add(c.closeAsync()));
- }
+
+ producers.forEach(p -> futures.add(p.closeAsync()));
+ consumers.forEach(c -> futures.add(c.closeAsync()));
// Need to run the shutdown sequence in a separate thread to prevent
deadlocks
// If there are consumers or producers that need to be shutdown we
cannot use the same thread