This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 06a9fec57d1d8b630dc09094ad06956fcdb3fb3b Author: fengyubiao <[email protected]> AuthorDate: Mon Apr 29 13:40:18 2024 +0800 [fix][broker] One topic can be closed multiple times concurrently (#17524) (cherry picked from commit 93afd89b047ac56d3b7e476f578993197cf41935) --- .../broker/service/persistent/PersistentTopic.java | 200 +++++++++++++++++---- .../broker/service/OneWayReplicatorTest.java | 21 ++- .../service/persistent/PersistentTopicTest.java | 81 ++++++++- .../org/apache/pulsar/common/util/FutureUtil.java | 33 +++- 4 files changed, 291 insertions(+), 44 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index fa049e1a5bc..e99bd1425f4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -48,6 +48,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiFunction; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -270,6 +272,52 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Getter private final ExecutorService orderedExecutor; + private volatile CloseFutures closeFutures; + + /*** + * We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return + * the in-progress one when it is called the second time. + * + * The topic closing will be called the below scenarios: + * 1. Calling "pulsar-admin topics unload". Relate to {@link CloseFutures#waitDisconnectClients}. + * 2. Namespace bundle transfer or unloading. + * a. The unloading topic triggered by unloading namespace bundles will not wait for clients disconnect. Relate + * to {@link CloseFutures#notWaitDisconnectClients}. + * b. The unloading topic triggered by unloading namespace bundles was seperated to two steps when using + * {@link ExtensibleLoadManagerImpl}. + * b-1. step-1: fence the topic on the original Broker, and do not trigger reconnections of clients. Relate + * to {@link CloseFutures#transferring}. This step is a half closing. + * b-2. step-2: send the owner broker information to clients and disconnect clients. Relate + * to {@link CloseFutures#notWaitDisconnectClients}. + * + * The three futures will be setting as the below rule: + * Event: Topic close. + * - If the first one closing is called by "close and not disconnect clients": + * - {@link CloseFutures#transferring} will be initialized as "close and not disconnect clients". + * - {@link CloseFutures#waitDisconnectClients} ang {@link CloseFutures#notWaitDisconnectClients} will be empty, + * the second closing will do a new close after {@link CloseFutures#transferring} is completed. + * - If the first one closing is called by "close and not wait for clients disconnect": + * - {@link CloseFutures#waitDisconnectClients} will be initialized as "waiting for clients disconnect". + * - {@link CloseFutures#notWaitDisconnectClients} ang {@link CloseFutures#transferring} will be + * initialized as "not waiting for clients disconnect" . + * - If the first one closing is called by "close and wait for clients disconnect", the three futures will be + * initialized as "waiting for clients disconnect". + * Event: Topic delete. + * the three futures will be initialized as "waiting for clients disconnect". + */ + private class CloseFutures { + private final CompletableFuture<Void> transferring; + private final CompletableFuture<Void> notWaitDisconnectClients; + private final CompletableFuture<Void> waitDisconnectClients; + + public CloseFutures(CompletableFuture<Void> transferring, CompletableFuture<Void> waitDisconnectClients, + CompletableFuture<Void> notWaitDisconnectClients) { + this.transferring = transferring; + this.waitDisconnectClients = waitDisconnectClients; + this.notWaitDisconnectClients = notWaitDisconnectClients; + } + } + private static class TopicStatsHelper { public double averageMsgSize; public double aggMsgRateIn; @@ -1380,8 +1428,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting + // Mark the progress of close to prevent close calling concurrently. + this.closeFutures = + new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture()); - return getBrokerService().getPulsar().getPulsarResources().getNamespaceResources() + CompletableFuture<Void> res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources() .getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> { CompletableFuture<Void> deleteFuture = new CompletableFuture<>(); @@ -1484,6 +1535,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal unfenceTopicToResume(); } }); + + FutureUtil.completeAfter(closeFutures.transferring, res); + FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, res); + FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res); + return res; } finally { lock.writeLock().unlock(); } @@ -1499,6 +1555,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return close(true, closeWithoutWaitingClientDisconnect); } + private enum CloseTypes { + transferring, + notWaitDisconnectClients, + waitDisconnectClients; + } + /** * Close this topic - close all producers and subscriptions associated with this topic. * @@ -1509,32 +1571,57 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public CompletableFuture<Void> close( boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) { - CompletableFuture<Void> closeFuture = new CompletableFuture<>(); - lock.writeLock().lock(); - try { - if (!disconnectClients) { - transferring = true; - } + // Choose the close type. + CloseTypes closeType; + if (!disconnectClients) { + closeType = CloseTypes.transferring; + } else if (closeWithoutWaitingClientDisconnect) { + closeType = CloseTypes.notWaitDisconnectClients; + } else { // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker // forcefully wants to close managed-ledger without waiting all resources to be closed. - if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) { - fenceTopicToCloseOrDelete(); + closeType = CloseTypes.waitDisconnectClients; + } + /** Maybe there is a in-progress half closing task. see the section 2-b-1 of {@link CloseFutures}. **/ + CompletableFuture<Void> inProgressTransferCloseTask = null; + try { + // Return in-progress future if exists. + if (isClosingOrDeleting) { + if (closeType == CloseTypes.transferring) { + return closeFutures.transferring; + } + if (closeType == CloseTypes.notWaitDisconnectClients && closeFutures.notWaitDisconnectClients != null) { + return closeFutures.notWaitDisconnectClients; + } + if (closeType == CloseTypes.waitDisconnectClients && closeFutures.waitDisconnectClients != null) { + return closeFutures.waitDisconnectClients; + } + if (transferring) { + inProgressTransferCloseTask = closeFutures.transferring; + } + } + fenceTopicToCloseOrDelete(); + if (closeType == CloseTypes.transferring) { + transferring = true; + this.closeFutures = new CloseFutures(new CompletableFuture(), null, null); } else { - log.warn("[{}] Topic is already being closed or deleted", topic); - closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced")); - return closeFuture; + this.closeFutures = + new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture()); } } finally { lock.writeLock().unlock(); } List<CompletableFuture<Void>> futures = new ArrayList<>(); + if (inProgressTransferCloseTask != null) { + futures.add(inProgressTransferCloseTask); + } futures.add(transactionBuffer.closeAsync()); replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); - if (disconnectClients) { + if (closeType != CloseTypes.transferring) { futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( brokerService.getPulsar(), topic).thenAccept(lookupData -> { producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData))); @@ -1572,40 +1659,79 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } } - CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect - ? CompletableFuture.completedFuture(null) - : FutureUtil.waitForAll(futures); + CompletableFuture<Void> disconnectClientsInCurrentCall = null; + // Note: "disconnectClientsToCache" is a non-able value, it is null when close type is transferring. + AtomicReference<CompletableFuture<Void>> disconnectClientsToCache = new AtomicReference<>(); + switch (closeType) { + case transferring -> { + disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures); + break; + } + case notWaitDisconnectClients -> { + disconnectClientsInCurrentCall = CompletableFuture.completedFuture(null); + disconnectClientsToCache.set(FutureUtil.waitForAll(futures)); + break; + } + case waitDisconnectClients -> { + disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures); + disconnectClientsToCache.set(disconnectClientsInCurrentCall); + } + } - clientCloseFuture.thenRun(() -> { - // After having disconnected all producers/consumers, close the managed ledger - ledger.asyncClose(new CloseCallback() { - @Override - public void closeComplete(Object ctx) { - if (disconnectClients) { - // Everything is now closed, remove the topic from map - disposeTopic(closeFuture); - } else { - closeFuture.complete(null); - } + CompletableFuture<Void> closeFuture = new CompletableFuture<>(); + Runnable closeLedgerAfterCloseClients = (() -> ledger.asyncClose(new CloseCallback() { + @Override + public void closeComplete(Object ctx) { + if (closeType != CloseTypes.transferring) { + // Everything is now closed, remove the topic from map + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); } + } - @Override - public void closeFailed(ManagedLedgerException exception, Object ctx) { - log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - if (disconnectClients) { - disposeTopic(closeFuture); - } else { - closeFuture.complete(null); - } + @Override + public void closeFailed(ManagedLedgerException exception, Object ctx) { + log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); + if (closeType != CloseTypes.transferring) { + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); } - }, null); - }).exceptionally(exception -> { + } + }, null)); + + disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> { log.error("[{}] Error closing topic", topic, exception); unfenceTopicToResume(); closeFuture.completeExceptionally(exception); return null; }); + switch (closeType) { + case transferring -> { + FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture); + break; + } + case notWaitDisconnectClients -> { + FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture); + FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture); + FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, + closeFuture.thenCompose(ignore -> disconnectClientsToCache.get().exceptionally(ex -> { + // Since the managed ledger has been closed, eat the error of clients disconnection. + log.error("[{}] Closed managed ledger, but disconnect clients failed," + + " this topic will be marked closed", topic, ex); + return null; + }))); + break; + } + case waitDisconnectClients -> { + FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture); + FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture); + FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture); + } + } + return closeFuture; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 9b8b567af08..eb31c13b0d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; @@ -226,7 +227,7 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { }); } - private void injectMockReplicatorProducerBuilder( + private Runnable injectMockReplicatorProducerBuilder( BiFunction<ProducerConfigurationData, ProducerImpl, ProducerImpl> producerDecorator) throws Exception { String cluster2 = pulsar2.getConfig().getClusterName(); @@ -246,7 +247,8 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients"); PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2); PulsarClient spyClient = spy(internalClient); - replicationClients.put(cluster2, spyClient); + assertTrue(replicationClients.remove(cluster2, internalClient)); + assertNull(replicationClients.putIfAbsent(cluster2, spyClient)); // Inject producer decorator. doAnswer(invocation -> { @@ -275,6 +277,12 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { }).when(spyProducerBuilder).createAsync(); return spyProducerBuilder; }).when(spyClient).newProducer(any(Schema.class)); + + // Return a cleanup injection task; + return () -> { + assertTrue(replicationClients.remove(cluster2, spyClient)); + assertNull(replicationClients.putIfAbsent(cluster2, internalClient)); + }; } private SpyCursor spyCursor(PersistentTopic persistentTopic, String cursorName) throws Exception { @@ -368,7 +376,7 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator. final AtomicInteger createProducerCounter = new AtomicInteger(); final int failTimes = 6; - injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { + Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { if (topicName.equals(producerCnf.getTopicName())) { // There is a switch to determine create producer successfully or not. if (createProducerCounter.incrementAndGet() > failTimes) { @@ -427,6 +435,7 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { }); // cleanup. + taskToClearInjection.run(); cleanupTopics(() -> { admin1.topics().delete(topicName); admin2.topics().delete(topicName); @@ -531,7 +540,7 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator. final AtomicInteger createProducerCounter = new AtomicInteger(); final int failTimes = 6; - injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { + Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { if (topicName.equals(producerCnf.getTopicName())) { // There is a switch to determine create producer successfully or not. if (createProducerCounter.incrementAndGet() > failTimes) { @@ -593,6 +602,7 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { }); // cleanup. + taskToClearInjection.run(); cleanupTopics(namespaceName, () -> { admin1.topics().delete(topicName); admin2.topics().delete(topicName); @@ -644,8 +654,9 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { assertTrue(replicator2.producer != null && replicator2.producer.isConnected()); }); - // cleanup. + // cleanup the injection. persistentTopic.getProducers().remove(mockProducerName, mockProducer); + // cleanup. producer1.close(); cleanupTopics(() -> { admin1.topics().delete(topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 5c49b472303..8130c818e3a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -47,6 +47,8 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -54,6 +56,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.stream.Collectors; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; @@ -321,6 +324,83 @@ public class PersistentTopicTest extends BrokerTestBase { } } + @DataProvider(name = "closeWithoutWaitingClientDisconnectInFirstBatch") + public Object[][] closeWithoutWaitingClientDisconnectInFirstBatch() { + return new Object[][]{ + new Object[] {true}, + new Object[] {false}, + }; + } + + @Test(dataProvider = "closeWithoutWaitingClientDisconnectInFirstBatch") + public void testConcurrentClose(boolean closeWithoutWaitingClientDisconnectInFirstBatch) throws Exception { + final String topicName = "persistent://prop/ns/concurrentClose"; + final String ns = "prop/ns"; + admin.namespaces().createNamespace(ns, 1); + admin.topics().createNonPartitionedTopic(topicName); + final Topic topic = pulsar.getBrokerService().getTopicIfExists(topicName).get().get(); + List<CompletableFuture<Void>> futureList = + make2ConcurrentBatchesOfClose(topic, 10, closeWithoutWaitingClientDisconnectInFirstBatch); + Map<Integer, List<CompletableFuture<Void>>> futureMap = + futureList.stream().collect(Collectors.groupingBy(Objects::hashCode)); + /** + * The first call: get the return value of "topic.close". + * The other 19 calls: get the cached value which related {@link PersistentTopic#closeFutures}. + */ + assertTrue(futureMap.size() <= 3); + for (List list : futureMap.values()){ + if (list.size() == 1){ + // This is the first call, the future is the return value of `topic.close`. + } else { + // Two types future list: wait client close or not. + assertTrue(list.size() >= 9 && list.size() <= 10); + } + } + } + + private List<CompletableFuture<Void>> make2ConcurrentBatchesOfClose(Topic topic, int tryTimes, + boolean closeWithoutWaitingClientDisconnectInFirstBatch){ + final List<CompletableFuture<Void>> futureList = Collections.synchronizedList(new ArrayList<>()); + final List<Thread> taskList = new ArrayList<>(); + CountDownLatch allTaskBeginLatch = new CountDownLatch(1); + // Call a batch of close. + for (int i = 0; i < tryTimes; i++) { + Thread thread = new Thread(() -> { + try { + allTaskBeginLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + futureList.add(topic.close(closeWithoutWaitingClientDisconnectInFirstBatch)); + }); + thread.start(); + taskList.add(thread); + } + // Call another batch of close. + for (int i = 0; i < tryTimes; i++) { + Thread thread = new Thread(() -> { + try { + allTaskBeginLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + futureList.add(topic.close(!closeWithoutWaitingClientDisconnectInFirstBatch)); + }); + thread.start(); + taskList.add(thread); + } + // Wait close task executed. + allTaskBeginLatch.countDown(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()->{ + for (Thread thread : taskList){ + if (thread.isAlive()){ + return false; + } + } + return true; + }); + return futureList; + } @DataProvider(name = "topicAndMetricsLevel") public Object[][] indexPatternTestData() { @@ -330,7 +410,6 @@ public class PersistentTopicTest extends BrokerTestBase { }; } - @Test(dataProvider = "topicAndMetricsLevel") public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean exposeTopicLevelMetrics) throws Exception { PulsarClient client = pulsar.getClient(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 2b082b4a789..1ccd589a4fa 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -20,6 +20,7 @@ package org.apache.pulsar.common.util; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Objects; @@ -63,6 +64,36 @@ public class FutureUtil { }))); } + /** + * Make the dest future complete after another one. {@param dest} is will be completed with the same value as + * {@param src}, or be completed with the same error as {@param src}. + */ + public static <T> void completeAfter(final CompletableFuture<T> dest, CompletableFuture<T> src) { + src.whenComplete((v, ex) -> { + if (ex != null) { + dest.completeExceptionally(ex); + } else { + dest.complete(v); + } + }); + } + + /** + * Make the dest future complete after others. {@param dest} is will be completed with a {@link Void} value + * if all the futures of {@param src} is completed, or be completed exceptionally with the same error as the first + * one completed exceptionally future of {@param src}. + */ + public static void completeAfterAll(final CompletableFuture<Void> dest, + CompletableFuture<? extends Object>... src) { + FutureUtil.waitForAll(Arrays.asList(src)).whenComplete((ignore, ex) -> { + if (ex != null) { + dest.completeExceptionally(ex); + } else { + dest.complete(null); + } + }); + } + /** * Return a future that represents the completion of any future in the provided Collection. * @@ -125,7 +156,7 @@ public class FutureUtil { * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete */ public static CompletableFuture<Void> waitForAllAndSupportCancel( - Collection<? extends CompletableFuture<?>> futures) { + Collection<? extends CompletableFuture<?>> futures) { CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futuresArray); whenCancelledOrTimedOut(combinedFuture, () -> {
