This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c4e1b8b Fix flaky ManagedLedgerFactory shutdown (#10365)
c4e1b8b is described below
commit c4e1b8b03deca8b99dccc0022bde8f83cd572db4
Author: Lari Hotari <[email protected]>
AuthorDate: Sun Apr 25 17:47:39 2021 +0300
Fix flaky ManagedLedgerFactory shutdown (#10365)
Fixes #10356
---
.../bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java | 10 ++++------
.../main/java/org/apache/pulsar/broker/PulsarService.java | 14 ++++++++------
2 files changed, 12 insertions(+), 12 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index bed86a1..aa2684d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -20,12 +20,9 @@ package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
-
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
-
import io.netty.util.concurrent.DefaultThreadFactory;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -41,7 +38,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -78,8 +74,8 @@ import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
+import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -498,11 +494,13 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
statsTask.cancel(true);
flushCursorsTask.cancel(true);
+ // take a snapshot of ledgers currently in the map to prevent race
conditions
+ List<CompletableFuture<ManagedLedgerImpl>> ledgers = new
ArrayList<>(this.ledgers.values());
int numLedgers = ledgers.size();
final CountDownLatch latch = new CountDownLatch(numLedgers);
log.info("Closing {} ledgers", numLedgers);
- for (CompletableFuture<ManagedLedgerImpl> ledgerFuture :
ledgers.values()) {
+ for (CompletableFuture<ManagedLedgerImpl> ledgerFuture : ledgers) {
ManagedLedgerImpl ledger = ledgerFuture.getNow(null);
if (ledger == null) {
latch.countDown();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 78046fe..6e149d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -371,6 +371,13 @@ public class PulsarService implements AutoCloseable {
* getConfiguration()
.getBrokerShutdownTimeoutMs())));
+ // shutdown loadmanager before shutting down the broker
+ executorServicesShutdown.shutdown(loadManagerExecutor);
+ LoadManager loadManager = this.loadManager.get();
+ if (loadManager != null) {
+ loadManager.stop();
+ }
+
List<CompletableFuture<Void>> asyncCloseFutures = new
ArrayList<>();
if (this.brokerService != null) {
asyncCloseFutures.add(this.brokerService.closeAsync());
@@ -392,8 +399,6 @@ public class PulsarService implements AutoCloseable {
this.leaderElectionService = null;
}
- executorServicesShutdown.shutdown(loadManagerExecutor);
-
if (globalZkCache != null) {
globalZkCache.close();
globalZkCache = null;
@@ -434,10 +439,7 @@ public class PulsarService implements AutoCloseable {
executorServicesShutdown.shutdown(orderedExecutor);
executorServicesShutdown.shutdown(cacheExecutor);
- LoadManager loadManager = this.loadManager.get();
- if (loadManager != null) {
- loadManager.stop();
- }
+
if (schemaRegistryService != null) {
schemaRegistryService.close();