This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c4e1b8b  Fix flaky ManagedLedgerFactory shutdown (#10365)
c4e1b8b is described below

commit c4e1b8b03deca8b99dccc0022bde8f83cd572db4
Author: Lari Hotari <[email protected]>
AuthorDate: Sun Apr 25 17:47:39 2021 +0300

    Fix flaky ManagedLedgerFactory shutdown (#10365)
    
    Fixes #10356
---
 .../bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java  | 10 ++++------
 .../main/java/org/apache/pulsar/broker/PulsarService.java  | 14 ++++++++------
 2 files changed, 12 insertions(+), 12 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index bed86a1..aa2684d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -20,12 +20,9 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
-
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
-
 import io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -41,7 +38,6 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -78,8 +74,8 @@ import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
+import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -498,11 +494,13 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         statsTask.cancel(true);
         flushCursorsTask.cancel(true);
 
+        // take a snapshot of ledgers currently in the map to prevent race 
conditions
+        List<CompletableFuture<ManagedLedgerImpl>> ledgers = new 
ArrayList<>(this.ledgers.values());
         int numLedgers = ledgers.size();
         final CountDownLatch latch = new CountDownLatch(numLedgers);
         log.info("Closing {} ledgers", numLedgers);
 
-        for (CompletableFuture<ManagedLedgerImpl> ledgerFuture : 
ledgers.values()) {
+        for (CompletableFuture<ManagedLedgerImpl> ledgerFuture : ledgers) {
             ManagedLedgerImpl ledger = ledgerFuture.getNow(null);
             if (ledger == null) {
                 latch.countDown();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 78046fe..6e149d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -371,6 +371,13 @@ public class PulsarService implements AutoCloseable {
                                                     * getConfiguration()
                                                     
.getBrokerShutdownTimeoutMs())));
 
+            // shutdown loadmanager before shutting down the broker
+            executorServicesShutdown.shutdown(loadManagerExecutor);
+            LoadManager loadManager = this.loadManager.get();
+            if (loadManager != null) {
+                loadManager.stop();
+            }
+
             List<CompletableFuture<Void>> asyncCloseFutures = new 
ArrayList<>();
             if (this.brokerService != null) {
                 asyncCloseFutures.add(this.brokerService.closeAsync());
@@ -392,8 +399,6 @@ public class PulsarService implements AutoCloseable {
                 this.leaderElectionService = null;
             }
 
-            executorServicesShutdown.shutdown(loadManagerExecutor);
-
             if (globalZkCache != null) {
                 globalZkCache.close();
                 globalZkCache = null;
@@ -434,10 +439,7 @@ public class PulsarService implements AutoCloseable {
             executorServicesShutdown.shutdown(orderedExecutor);
             executorServicesShutdown.shutdown(cacheExecutor);
 
-            LoadManager loadManager = this.loadManager.get();
-            if (loadManager != null) {
-                loadManager.stop();
-            }
+
 
             if (schemaRegistryService != null) {
                 schemaRegistryService.close();

Reply via email to