This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9c2888f7df738f579a9f295b79297ebe325c85ae Author: lipenghui <[email protected]> AuthorDate: Tue Aug 24 02:16:00 2021 +0800 Avoid duplicated disconnecting producer when after add entry failed. (#11741) Currently, if encounter the add entry failure, will call producer.disconnect() multiple times during the disconnecting the producer which will add many disconnect producer tasks to the EventLoop. 1. Added isDisconnecting state for the producer, if the producer in isDisconnecting state, skip the disconnect operation 2. Create new future list only the topic have producers to reduce the heap allocation Added test to cover disconnecting the producer multiple times, but the EventLoop only execute one time. (cherry picked from commit 49c0796e8279442cc9162387a9db3e24415f9bbc) --- .../org/apache/pulsar/broker/service/Producer.java | 9 +++++++- .../apache/pulsar/broker/service/ServerCnx.java | 2 +- .../broker/service/persistent/PersistentTopic.java | 12 ++++++++--- .../pulsar/broker/service/PersistentTopicTest.java | 25 ++++++++++++++++++++-- 4 files changed, 41 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 15076f9..8c35e66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; @@ -90,6 +91,7 @@ public class Producer { private final SchemaVersion schemaVersion; private final String clientAddress; // IP address only, no port number included + private final AtomicBoolean isDisconnecting = new AtomicBoolean(false); public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch, @@ -552,6 +554,7 @@ public class Producer { log.debug("Removed producer: {}", this); } closeFuture.complete(null); + isDisconnecting.set(false); } /** @@ -561,7 +564,7 @@ public class Producer { * @return Completable future indicating completion of producer close */ public CompletableFuture<Void> disconnect() { - if (!closeFuture.isDone()) { + if (!closeFuture.isDone() && isDisconnecting.compareAndSet(false, true)) { log.info("Disconnecting producer: {}", this); cnx.execute(() -> { cnx.closeProducer(this); @@ -669,6 +672,10 @@ public class Producer { return clientAddress; } + public boolean isDisconnecting() { + return isDisconnecting.get(); + } + private static final Logger log = LoggerFactory.getLogger(Producer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 511d5a2..6a5ee25 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2572,7 +2572,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { @Override public void execute(Runnable runnable) { - ctx.channel().eventLoop().execute(runnable); + ctx().channel().eventLoop().execute(runnable); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 285dff4..2808021 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -503,9 +503,15 @@ public class PersistentTopic extends AbstractTopic // fence topic when failed to write a message to BK fence(); // close all producers - List<CompletableFuture<Void>> futures = Lists.newArrayList(); - producers.values().forEach(producer -> futures.add(producer.disconnect())); - FutureUtil.waitForAll(futures).handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> { + CompletableFuture<Void> disconnectProducersFuture; + if (producers.size() > 0) { + List<CompletableFuture<Void>> futures = Lists.newArrayList(); + producers.forEach((__, producer) -> futures.add(producer.disconnect())); + disconnectProducersFuture = FutureUtil.waitForAll(futures); + } else { + disconnectProducersFuture = CompletableFuture.completedFuture(null); + } + disconnectProducersFuture.handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> { decrementPendingWriteOpsAndCheck(); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index b53404a..2998345 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -56,7 +56,6 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -64,6 +63,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultEventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import lombok.Cleanup; @@ -93,7 +95,7 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; +import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.service.persistent.CompactorSubscription; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; @@ -216,6 +218,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); doReturn(new PulsarCommandSenderImpl(null, serverCnx)) .when(serverCnx).getCommandSender(); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop(); + doReturn(channel).when(ctx).channel(); + doReturn(ctx).when(serverCnx).ctx(); NamespaceService nsSvc = mock(NamespaceService.class); NamespaceBundle bundle = mock(NamespaceBundle.class); @@ -2166,6 +2173,20 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { f2.get(); } + @Test + public void testDisconnectProducer() throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + String role = "appid1"; + Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", + role, false, null, SchemaVersion.Latest, 0, false, + ProducerAccessMode.Shared, Optional.empty()); + assertFalse(producer.isDisconnecting()); + // Disconnect the producer multiple times. + producer.disconnect(); + producer.disconnect(); + verify(serverCnx).execute(any()); + }; + private ByteBuf getMessageWithMetadata(byte[] data) { MessageMetadata messageData = new MessageMetadata() .setPublishTime(System.currentTimeMillis())
