This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9c2888f7df738f579a9f295b79297ebe325c85ae
Author: lipenghui <[email protected]>
AuthorDate: Tue Aug 24 02:16:00 2021 +0800

    Avoid duplicated disconnecting producer when after add entry failed. 
(#11741)
    
    Currently, if encounter the add entry failure, will call 
producer.disconnect() multiple times during the disconnecting the producer
    which will add many disconnect producer tasks to the EventLoop.
    
    1. Added isDisconnecting state for the producer, if the producer in 
isDisconnecting state, skip the disconnect operation
    2. Create new future list only the topic have producers to reduce the heap 
allocation
    
    Added test to cover disconnecting the producer multiple times, but the 
EventLoop only execute one time.
    
    (cherry picked from commit 49c0796e8279442cc9162387a9db3e24415f9bbc)
---
 .../org/apache/pulsar/broker/service/Producer.java |  9 +++++++-
 .../apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../broker/service/persistent/PersistentTopic.java | 12 ++++++++---
 .../pulsar/broker/service/PersistentTopicTest.java | 25 ++++++++++++++++++++--
 4 files changed, 41 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 15076f9..8c35e66 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -34,6 +34,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
@@ -90,6 +91,7 @@ public class Producer {
 
     private final SchemaVersion schemaVersion;
     private final String clientAddress; // IP address only, no port number 
included
+    private final AtomicBoolean isDisconnecting = new AtomicBoolean(false);
 
     public Producer(Topic topic, TransportCnx cnx, long producerId, String 
producerName, String appId,
             boolean isEncrypted, Map<String, String> metadata, SchemaVersion 
schemaVersion, long epoch,
@@ -552,6 +554,7 @@ public class Producer {
             log.debug("Removed producer: {}", this);
         }
         closeFuture.complete(null);
+        isDisconnecting.set(false);
     }
 
     /**
@@ -561,7 +564,7 @@ public class Producer {
      * @return Completable future indicating completion of producer close
      */
     public CompletableFuture<Void> disconnect() {
-        if (!closeFuture.isDone()) {
+        if (!closeFuture.isDone() && isDisconnecting.compareAndSet(false, 
true)) {
             log.info("Disconnecting producer: {}", this);
             cnx.execute(() -> {
                 cnx.closeProducer(this);
@@ -669,6 +672,10 @@ public class Producer {
         return clientAddress;
     }
 
+    public boolean isDisconnecting() {
+        return isDisconnecting.get();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Producer.class);
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 511d5a2..6a5ee25 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2572,7 +2572,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     @Override
     public void execute(Runnable runnable) {
-        ctx.channel().eventLoop().execute(runnable);
+        ctx().channel().eventLoop().execute(runnable);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 285dff4..2808021 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -503,9 +503,15 @@ public class PersistentTopic extends AbstractTopic
             // fence topic when failed to write a message to BK
             fence();
             // close all producers
-            List<CompletableFuture<Void>> futures = Lists.newArrayList();
-            producers.values().forEach(producer -> 
futures.add(producer.disconnect()));
-            FutureUtil.waitForAll(futures).handle((BiFunction<Void, Throwable, 
Void>) (aVoid, throwable) -> {
+            CompletableFuture<Void> disconnectProducersFuture;
+            if (producers.size() > 0) {
+                List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                producers.forEach((__, producer) -> 
futures.add(producer.disconnect()));
+                disconnectProducersFuture = FutureUtil.waitForAll(futures);
+            } else {
+                disconnectProducersFuture = 
CompletableFuture.completedFuture(null);
+            }
+            disconnectProducersFuture.handle((BiFunction<Void, Throwable, 
Void>) (aVoid, throwable) -> {
                 decrementPendingWriteOpsAndCheck();
                 return null;
             });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index b53404a..2998345 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -56,7 +56,6 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -64,6 +63,9 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultEventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import lombok.Cleanup;
@@ -93,7 +95,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
@@ -216,6 +218,11 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
         doReturn(new PulsarCommandSenderImpl(null, serverCnx))
                 .when(serverCnx).getCommandSender();
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop();
+        doReturn(channel).when(ctx).channel();
+        doReturn(ctx).when(serverCnx).ctx();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
         NamespaceBundle bundle = mock(NamespaceBundle.class);
@@ -2166,6 +2173,20 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         f2.get();
     }
 
+    @Test
+    public void testDisconnectProducer() throws Exception {
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        String role = "appid1";
+        Producer producer = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name",
+                role, false, null, SchemaVersion.Latest, 0, false,
+                ProducerAccessMode.Shared, Optional.empty());
+        assertFalse(producer.isDisconnecting());
+        // Disconnect the producer multiple times.
+        producer.disconnect();
+        producer.disconnect();
+        verify(serverCnx).execute(any());
+    };
+
     private ByteBuf getMessageWithMetadata(byte[] data) {
         MessageMetadata messageData = new MessageMetadata()
                 .setPublishTime(System.currentTimeMillis())

Reply via email to