This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 620fe9be239fbb35c2db279660a8634410f9f357
Author: fengyubiao <[email protected]>
AuthorDate: Mon Apr 29 13:40:18 2024 +0800

    [fix][broker] One topic can be closed multiple times concurrently (#17524)
    
    (cherry picked from commit 93afd89b047ac56d3b7e476f578993197cf41935)
---
 .../broker/service/persistent/PersistentTopic.java | 122 ++++++++++++++++++---
 .../broker/service/OneWayReplicatorTest.java       |  25 +++--
 .../service/persistent/PersistentTopicTest.java    |  81 +++++++++++++-
 .../org/apache/pulsar/common/util/FutureUtil.java  |  33 +++++-
 4 files changed, 233 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3933706da82..904e1ed670e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
@@ -258,6 +259,37 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Getter
     private final ExecutorService orderedExecutor;
 
+    private volatile CloseFutures closeFutures;
+
+    /***
+     * We use 2 futures to prevent a new closing if there is an in-progress 
deletion or closing.  We make Pulsar return
+     * the in-progress one when it is called the second time.
+     *
+     * The topic closing will be called the below scenarios:
+     * 1. Calling "pulsar-admin topics unload". Relate to {@link 
CloseFutures#waitDisconnectClients}.
+     * 2. Namespace bundle unloading. The unloading topic triggered by 
unloading namespace bundles will not wait for
+     *    clients disconnect. See {@link 
CloseFutures#notWaitDisconnectClients}.
+     *
+     * The two futures will be setting as the below rule:
+     * Event: Topic close.
+     * - If the first one closing is called by "close and not wait for clients 
disconnect":
+     *   - {@link CloseFutures#waitDisconnectClients} will be initialized as 
"waiting for clients disconnect".
+     * - If the first one closing is called by "close and wait for clients 
disconnect", the two futures will be
+     *   initialized as "waiting for clients disconnect".
+     * Event: Topic delete.
+     *  the three futures will be initialized as "waiting for clients 
disconnect".
+     */
+    private class CloseFutures {
+        private final CompletableFuture<Void> notWaitDisconnectClients;
+        private final CompletableFuture<Void> waitDisconnectClients;
+
+        public CloseFutures(CompletableFuture<Void> waitDisconnectClients,
+                            CompletableFuture<Void> notWaitDisconnectClients) {
+            this.waitDisconnectClients = waitDisconnectClients;
+            this.notWaitDisconnectClients = notWaitDisconnectClients;
+        }
+    }
+
     private static class TopicStatsHelper {
         public double averageMsgSize;
         public double aggMsgRateIn;
@@ -1349,8 +1381,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             }
 
             fenceTopicToCloseOrDelete(); // Avoid clients reconnections while 
deleting
+            // Mark the progress of close to prevent close calling 
concurrently.
+            this.closeFutures = new CloseFutures(new CompletableFuture(), new 
CompletableFuture());
 
-            return 
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
+            CompletableFuture<Void> res = 
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
                         
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () 
-> {
                 CompletableFuture<Void> deleteFuture = new 
CompletableFuture<>();
 
@@ -1453,6 +1487,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                         unfenceTopicToResume();
                     }
                 });
+
+            FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, 
res);
+            FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res);
+            return res;
         } finally {
             lock.writeLock().unlock();
         }
@@ -1463,6 +1501,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return close(false);
     }
 
+    private enum CloseTypes {
+        notWaitDisconnectClients,
+        waitDisconnectClients;
+    }
+
     /**
      * Close this topic - close all producers and subscriptions associated 
with this topic.
      *
@@ -1471,19 +1514,32 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
      */
     @Override
     public CompletableFuture<Void> close(boolean 
closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
-        lock.writeLock().lock();
-        try {
+        CloseTypes closeType;
+        if (closeWithoutWaitingClientDisconnect) {
+            closeType = CloseTypes.notWaitDisconnectClients;
+        } else {
             // closing managed-ledger waits until all 
producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all 
resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
-            } else {
-                log.warn("[{}] Topic is already being closed or deleted", 
topic);
-                closeFuture.completeExceptionally(new 
TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+            closeType = CloseTypes.waitDisconnectClients;
+        }
+
+        lock.writeLock().lock();
+        try {
+            // Return in-progress future if exists.
+            if (isClosingOrDeleting) {
+                switch (closeType) {
+                    case notWaitDisconnectClients -> {
+                        return closeFutures.notWaitDisconnectClients;
+                    }
+                    case waitDisconnectClients -> {
+                        return closeFutures.waitDisconnectClients;
+                    }
+                }
             }
+            // No in-progress closing.
+            fenceTopicToCloseOrDelete();
+            this.closeFutures = new CloseFutures(new CompletableFuture(), new 
CompletableFuture());
         } finally {
             lock.writeLock().unlock();
         }
@@ -1513,11 +1569,22 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             });
         }
 
-        CompletableFuture<Void> clientCloseFuture = 
closeWithoutWaitingClientDisconnect
-                ? CompletableFuture.completedFuture(null)
-                : FutureUtil.waitForAll(futures);
+        CompletableFuture<Void> disconnectClientsInCurrentCall = null;
+        AtomicReference<CompletableFuture<Void>> disconnectClientsToCache = 
new AtomicReference<>();
+        switch (closeType) {
+            case notWaitDisconnectClients -> {
+                disconnectClientsInCurrentCall = 
CompletableFuture.completedFuture(null);
+                disconnectClientsToCache.set(FutureUtil.waitForAll(futures));
+                break;
+            }
+            case waitDisconnectClients -> {
+                disconnectClientsInCurrentCall = 
FutureUtil.waitForAll(futures);
+                disconnectClientsToCache.set(disconnectClientsInCurrentCall);
+            }
+        }
+        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
-        clientCloseFuture.thenRun(() -> {
+        Runnable closeLedgerAfterCloseClients = () -> {
             // After having disconnected all producers/consumers, close the 
managed ledger
             ledger.asyncClose(new CloseCallback() {
                 @Override
@@ -1532,13 +1599,32 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     disposeTopic(closeFuture);
                 }
             }, null);
-        }).exceptionally(exception -> {
+        };
+        
disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception
 -> {
             log.error("[{}] Error closing topic", topic, exception);
             unfenceTopicToResume();
             closeFuture.completeExceptionally(exception);
             return null;
         });
 
+        switch (closeType) {
+            case notWaitDisconnectClients -> {
+                
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
+                FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients,
+                        closeFuture.thenCompose(ignore -> 
disconnectClientsToCache.get().exceptionally(ex -> {
+                            // Since the managed ledger has been closed, eat 
the error of clients disconnection.
+                            log.error("[{}] Closed managed ledger, but 
disconnect clients failed,"
+                                    + " this topic will be marked closed", 
topic, ex);
+                            return null;
+                        })));
+                break;
+            }
+            case waitDisconnectClients -> {
+                
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
+                
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture);
+            }
+        }
+
         return closeFuture;
     }
 
@@ -1824,10 +1910,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     lock.readLock().lock();
                     try {
                         if (isClosingOrDeleting) {
-                            // Whether is "transferring" or not, do not create 
new replicator.
+                            // Do not create new replicator.
                             log.info("[{}] Skip to create replicator because 
this topic is closing."
-                                    + " remote cluster: {}. State of 
transferring : {}",
-                                    topic, remoteCluster, transferring);
+                                    + " remote cluster: {}.",
+                                    topic, remoteCluster);
                             return;
                         }
                         Replicator replicator = 
replicators.computeIfAbsent(remoteCluster, r -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index a1fbd8fad12..e422f3d8c6f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
@@ -232,7 +233,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
     }
 
-    private void injectMockReplicatorProducerBuilder(
+    private Runnable injectMockReplicatorProducerBuilder(
                                 BiFunction<ProducerConfigurationData, 
ProducerImpl, ProducerImpl> producerDecorator)
             throws Exception {
         String cluster2 = pulsar2.getConfig().getClusterName();
@@ -252,7 +253,8 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
                 replicationClients = 
WhiteboxImpl.getInternalState(brokerService, "replicationClients");
         PulsarClientImpl internalClient = (PulsarClientImpl) 
replicationClients.get(cluster2);
         PulsarClient spyClient = spy(internalClient);
-        replicationClients.put(cluster2, spyClient);
+        assertTrue(replicationClients.remove(cluster2, internalClient));
+        assertNull(replicationClients.putIfAbsent(cluster2, spyClient));
 
         // Inject producer decorator.
         doAnswer(invocation -> {
@@ -281,6 +283,12 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             }).when(spyProducerBuilder).createAsync();
             return spyProducerBuilder;
         }).when(spyClient).newProducer(any(Schema.class));
+
+        // Return a cleanup injection task;
+        return () -> {
+            assertTrue(replicationClients.remove(cluster2, spyClient));
+            assertNull(replicationClients.putIfAbsent(cluster2, 
internalClient));
+        };
     }
 
     private SpyCursor spyCursor(PersistentTopic persistentTopic, String 
cursorName) throws Exception {
@@ -374,7 +382,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         //   If the retry counter is larger than 6, the next creation will be 
slow enough to close Replicator.
         final AtomicInteger createProducerCounter = new AtomicInteger();
         final int failTimes = 6;
-        injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> 
{
+        Runnable taskToClearInjection = 
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
             if (topicName.equals(producerCnf.getTopicName())) {
                 // There is a switch to determine create producer successfully 
or not.
                 if (createProducerCounter.incrementAndGet() > failTimes) {
@@ -433,6 +441,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
 
         // cleanup.
+        taskToClearInjection.run();
         cleanupTopics(() -> {
             admin1.topics().delete(topicName);
             admin2.topics().delete(topicName);
@@ -537,7 +546,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         //   If the retry counter is larger than 6, the next creation will be 
slow enough to close Replicator.
         final AtomicInteger createProducerCounter = new AtomicInteger();
         final int failTimes = 6;
-        injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> 
{
+        Runnable taskToClearInjection = 
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
             if (topicName.equals(producerCnf.getTopicName())) {
                 // There is a switch to determine create producer successfully 
or not.
                 if (createProducerCounter.incrementAndGet() > failTimes) {
@@ -599,6 +608,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
 
         // cleanup.
+        taskToClearInjection.run();
         cleanupTopics(namespaceName, () -> {
             admin1.topics().delete(topicName);
             admin2.topics().delete(topicName);
@@ -619,8 +629,6 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         final String mockProducerName = UUID.randomUUID().toString();
         final org.apache.pulsar.broker.service.Producer mockProducer =
                 mock(org.apache.pulsar.broker.service.Producer.class);
-        doAnswer(invocation -> CompletableFuture.failedFuture(new 
RuntimeException("mocked error")))
-                .when(mockProducer).disconnect(any());
         doAnswer(invocation -> CompletableFuture.failedFuture(new 
RuntimeException("mocked error")))
                 .when(mockProducer).disconnect();
         PersistentTopic persistentTopic =
@@ -631,7 +639,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         GeoPersistentReplicator replicator1 =
                 (GeoPersistentReplicator) 
persistentTopic.getReplicators().values().iterator().next();
         try {
-            persistentTopic.close(true, false).join();
+            persistentTopic.close(false).join();
             fail("Expected close fails due to a producer close fails");
         } catch (Exception ex) {
             log.info("Expected error: {}", ex.getMessage());
@@ -650,8 +658,9 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             assertTrue(replicator2.producer != null && 
replicator2.producer.isConnected());
         });
 
-        // cleanup.
+        // cleanup the injection.
         persistentTopic.getProducers().remove(mockProducerName, mockProducer);
+        // cleanup.
         producer1.close();
         cleanupTopics(() -> {
             admin1.topics().delete(topicName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 464243f4bbb..7c62c115f07 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -47,6 +47,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -54,6 +56,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -321,6 +324,83 @@ public class PersistentTopicTest extends BrokerTestBase {
         }
     }
 
+    @DataProvider(name = "closeWithoutWaitingClientDisconnectInFirstBatch")
+    public Object[][] closeWithoutWaitingClientDisconnectInFirstBatch() {
+        return new Object[][]{
+                new Object[] {true},
+                new Object[] {false},
+        };
+    }
+
+    @Test(dataProvider = "closeWithoutWaitingClientDisconnectInFirstBatch")
+    public void testConcurrentClose(boolean 
closeWithoutWaitingClientDisconnectInFirstBatch) throws Exception {
+        final String topicName = "persistent://prop/ns/concurrentClose";
+        final String ns = "prop/ns";
+        admin.namespaces().createNamespace(ns, 1);
+        admin.topics().createNonPartitionedTopic(topicName);
+        final Topic topic = 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+        List<CompletableFuture<Void>> futureList =
+                make2ConcurrentBatchesOfClose(topic, 10, 
closeWithoutWaitingClientDisconnectInFirstBatch);
+        Map<Integer, List<CompletableFuture<Void>>> futureMap =
+                
futureList.stream().collect(Collectors.groupingBy(Objects::hashCode));
+        /**
+         * The first call: get the return value of "topic.close".
+         * The other 19 calls: get the cached value which related {@link 
PersistentTopic#closeFutures}.
+         */
+        assertTrue(futureMap.size() <= 3);
+        for (List list : futureMap.values()){
+            if (list.size() == 1){
+                // This is the first call, the future is the return value of 
`topic.close`.
+            } else {
+                // Two types future list: wait client close or not.
+                assertTrue(list.size() >= 9 && list.size() <= 10);
+            }
+        }
+    }
+
+    private List<CompletableFuture<Void>> make2ConcurrentBatchesOfClose(Topic 
topic, int tryTimes,
+                                                              boolean 
closeWithoutWaitingClientDisconnectInFirstBatch){
+        final List<CompletableFuture<Void>> futureList = 
Collections.synchronizedList(new ArrayList<>());
+        final List<Thread> taskList = new ArrayList<>();
+        CountDownLatch allTaskBeginLatch = new CountDownLatch(1);
+        // Call a batch of close.
+        for (int i = 0; i < tryTimes; i++) {
+            Thread thread = new Thread(() -> {
+                try {
+                    allTaskBeginLatch.await(5, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                
futureList.add(topic.close(closeWithoutWaitingClientDisconnectInFirstBatch));
+            });
+            thread.start();
+            taskList.add(thread);
+        }
+        // Call another batch of close.
+        for (int i = 0; i < tryTimes; i++) {
+            Thread thread = new Thread(() -> {
+                try {
+                    allTaskBeginLatch.await(5, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                
futureList.add(topic.close(!closeWithoutWaitingClientDisconnectInFirstBatch));
+            });
+            thread.start();
+            taskList.add(thread);
+        }
+        // Wait close task executed.
+        allTaskBeginLatch.countDown();
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()->{
+            for (Thread thread : taskList){
+                if (thread.isAlive()){
+                    return false;
+                }
+            }
+            return true;
+        });
+        return futureList;
+    }
 
     @DataProvider(name = "topicAndMetricsLevel")
     public Object[][] indexPatternTestData() {
@@ -330,7 +410,6 @@ public class PersistentTopicTest extends BrokerTestBase {
         };
     }
 
-
     @Test(dataProvider = "topicAndMetricsLevel")
     public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, 
boolean exposeTopicLevelMetrics) throws Exception {
         PulsarClient client = pulsar.getClient();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 6f625898535..f6fcb12f359 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.common.util;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
@@ -69,6 +70,36 @@ public class FutureUtil {
                 })));
     }
 
+    /**
+     * Make the dest future complete after another one. {@param dest} is will 
be completed with the same value as
+     * {@param src}, or be completed with the same error as {@param src}.
+     */
+    public static <T> void completeAfter(final CompletableFuture<T> dest, 
CompletableFuture<T> src) {
+        src.whenComplete((v, ex) -> {
+            if (ex != null) {
+                dest.completeExceptionally(ex);
+            } else {
+                dest.complete(v);
+            }
+        });
+    }
+
+    /**
+     * Make the dest future complete after others. {@param dest} is will be 
completed with a {@link Void} value
+     * if all the futures of {@param src} is completed, or be completed 
exceptionally with the same error as the first
+     * one completed exceptionally future of {@param src}.
+     */
+    public static void completeAfterAll(final CompletableFuture<Void> dest,
+                                        CompletableFuture<? extends Object>... 
src) {
+        FutureUtil.waitForAll(Arrays.asList(src)).whenComplete((ignore, ex) -> 
{
+            if (ex != null) {
+                dest.completeExceptionally(ex);
+            } else {
+                dest.complete(null);
+            }
+        });
+    }
+
     /**
      * Return a future that represents the completion of any future in the 
provided Collection.
      *
@@ -131,7 +162,7 @@ public class FutureUtil {
      * @return a new CompletableFuture that is completed when all of the given 
CompletableFutures complete
      */
     public static CompletableFuture<Void> waitForAllAndSupportCancel(
-                                                    Collection<? extends 
CompletableFuture<?>> futures) {
+            Collection<? extends CompletableFuture<?>> futures) {
         CompletableFuture[] futuresArray = futures.toArray(new 
CompletableFuture[0]);
         CompletableFuture<Void> combinedFuture = 
CompletableFuture.allOf(futuresArray);
         whenCancelledOrTimedOut(combinedFuture, () -> {

Reply via email to