This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 01e3074  Replace map with set (#8051)
01e3074 is described below

commit 01e30741ec05ac0c582f9bb025bd71ac3d1f32eb
Author: Tboy <[email protected]>
AuthorDate: Tue Sep 22 07:52:55 2020 +0800

    Replace map with set (#8051)
    
     Fix #8050 .
---
 .../client/impl/BrokerClientIntegrationTest.java   |  3 +-
 .../pulsar/client/impl/PulsarClientImpl.java       | 57 ++++++++--------------
 2 files changed, 20 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 528dee7..afee17b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -38,7 +38,6 @@ import static org.testng.Assert.fail;
 import java.lang.reflect.Field;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
-import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -767,7 +766,7 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         Field prodField = PulsarClientImpl.class.getDeclaredField("producers");
         prodField.setAccessible(true);
         @SuppressWarnings("unchecked")
-        IdentityHashMap<ProducerBase<byte[]>, Boolean> producers = 
(IdentityHashMap<ProducerBase<byte[]>, Boolean>) prodField
+        Set<ProducerBase<byte[]>> producers = (Set<ProducerBase<byte[]>>) 
prodField
                 .get(pulsarClient);
         assertTrue(producers.isEmpty());
         pulsarClient.close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c33eb9b..cbdcb27 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -25,7 +25,6 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
@@ -35,10 +34,11 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -102,8 +102,8 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private AtomicReference<State> state = new AtomicReference<>();
-    private final IdentityHashMap<ProducerBase<?>, Boolean> producers;
-    private final IdentityHashMap<ConsumerBase<?>, Boolean> consumers;
+    private final Set<ProducerBase<?>> producers;
+    private final Set<ConsumerBase<?>> consumers;
 
     private final AtomicLong producerIdGenerator = new AtomicLong();
     private final AtomicLong consumerIdGenerator = new AtomicLong();
@@ -151,8 +151,8 @@ public class PulsarClientImpl implements PulsarClient {
             lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), 
conf.getListenerName(), conf.isUseTls(), 
externalExecutorProvider.getExecutor());
         }
         timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, 
TimeUnit.MILLISECONDS);
-        producers = Maps.newIdentityHashMap();
-        consumers = Maps.newIdentityHashMap();
+        producers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+        consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
         if (conf.isEnableTransaction()) {
             tcClient = new TransactionCoordinatorClientImpl(this);
@@ -292,10 +292,8 @@ public class PulsarClientImpl implements PulsarClient {
             } else {
                 producer = new ProducerImpl<>(PulsarClientImpl.this, topic, 
conf, producerCreatedFuture, -1, schema, interceptors);
             }
-
-            synchronized (producers) {
-                producers.put(producer, Boolean.TRUE);
-            }
+    
+            producers.add(producer);
         }).exceptionally(ex -> {
             log.warn("[{}] Failed to get partitioned topic metadata: {}", 
topic, ex.getMessage());
             producerCreatedFuture.completeExceptionally(ex);
@@ -384,10 +382,8 @@ public class PulsarClientImpl implements PulsarClient {
                         consumerSubscribedFuture,null, schema, interceptors,
                         true /* createTopicIfDoesNotExist */);
             }
-
-            synchronized (consumers) {
-                consumers.put(consumer, Boolean.TRUE);
-            }
+            
+            consumers.add(consumer);
         }).exceptionally(ex -> {
             log.warn("[{}] Failed to get partitioned topic metadata", topic, 
ex);
             consumerSubscribedFuture.completeExceptionally(ex);
@@ -403,10 +399,8 @@ public class PulsarClientImpl implements PulsarClient {
         ConsumerBase<T> consumer = new 
MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
                 externalExecutorProvider.getExecutor(), 
consumerSubscribedFuture, schema, interceptors,
                 true /* createTopicIfDoesNotExist */);
-
-        synchronized (consumers) {
-            consumers.put(consumer, Boolean.TRUE);
-        }
+        
+        consumers.add(consumer);
 
         return consumerSubscribedFuture;
     }
@@ -439,10 +433,8 @@ public class PulsarClientImpl implements PulsarClient {
                     externalExecutorProvider.getExecutor(),
                     consumerSubscribedFuture,
                     schema, subscriptionMode, interceptors);
-
-                synchronized (consumers) {
-                    consumers.put(consumer, Boolean.TRUE);
-                }
+                
+                consumers.add(consumer);
             })
             .exceptionally(ex -> {
                 log.warn("[{}] Failed to get topics under namespace", 
namespaceName);
@@ -513,10 +505,8 @@ public class PulsarClientImpl implements PulsarClient {
             // gets the next single threaded executor from the list of 
executors
             ExecutorService listenerThread = 
externalExecutorProvider.getExecutor();
             ReaderImpl<T> reader = new ReaderImpl<>(PulsarClientImpl.this, 
conf, listenerThread, consumerSubscribedFuture, schema);
-
-            synchronized (consumers) {
-                consumers.put(reader.getConsumer(), Boolean.TRUE);
-            }
+            
+            consumers.add(reader.getConsumer());
 
             consumerSubscribedFuture.thenRun(() -> {
                 readerFuture.complete(reader);
@@ -569,18 +559,9 @@ public class PulsarClientImpl implements PulsarClient {
 
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = Lists.newArrayList();
-
-        synchronized (producers) {
-            // Copy to a new list, because the closing will trigger a removal 
from the map
-            // and invalidate the iterator
-            List<ProducerBase<?>> producersToClose = 
Lists.newArrayList(producers.keySet());
-            producersToClose.forEach(p -> futures.add(p.closeAsync()));
-        }
-
-        synchronized (consumers) {
-            List<ConsumerBase<?>> consumersToClose = 
Lists.newArrayList(consumers.keySet());
-            consumersToClose.forEach(c -> futures.add(c.closeAsync()));
-        }
+    
+        producers.forEach(p -> futures.add(p.closeAsync()));
+        consumers.forEach(c -> futures.add(c.closeAsync()));
 
         // Need to run the shutdown sequence in a separate thread to prevent 
deadlocks
         // If there are consumers or producers that need to be shutdown we 
cannot use the same thread

Reply via email to