This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0ddec86 [Tests] Fix thread leak in MLTransactionMetadataStore (#14524)
0ddec86 is described below
commit 0ddec86444bbdd94221032a3128f7dbe79934b93
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Mar 2 14:21:41 2022 +0200
[Tests] Fix thread leak in MLTransactionMetadataStore (#14524)
- MLTransactionMetadataStore.internalPinnedExecutor wasn't closed
when MLTransactionMetadataStore.closeAsync was called
- problem was introduced by #14238 changes
- this issue causes tests to fail with OOME. Most likely this also impacts
production code.
* Close TransactionMetadataStoreService after the broker service has been
closed
---
.../org/apache/pulsar/broker/PulsarService.java | 16 +++++++-----
.../broker/TransactionMetadataStoreService.java | 12 ++++++++-
.../coordinator/TransactionMetadataStoreState.java | 13 +++++++---
.../impl/MLTransactionMetadataStore.java | 29 +++++++++++++++-------
4 files changed, 50 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index fab49be..fcd5a5a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -423,11 +423,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
brokerAdditionalServlets = null;
}
- if (this.transactionMetadataStoreService != null) {
- this.transactionMetadataStoreService.close();
- this.transactionMetadataStoreService = null;
- }
-
GracefulExecutorServicesShutdown executorServicesShutdown =
GracefulExecutorServicesShutdown
.initiate()
@@ -446,7 +441,16 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
List<CompletableFuture<Void>> asyncCloseFutures = new
ArrayList<>();
if (this.brokerService != null) {
- asyncCloseFutures.add(this.brokerService.closeAsync());
+ CompletableFuture<Void> brokerCloseFuture =
this.brokerService.closeAsync();
+ if (this.transactionMetadataStoreService != null) {
+ asyncCloseFutures.add(brokerCloseFuture.whenComplete((__,
___) -> {
+ // close transactionMetadataStoreService after the
broker has been closed
+ this.transactionMetadataStoreService.close();
+ this.transactionMetadataStoreService = null;
+ }));
+ } else {
+ asyncCloseFutures.add(brokerCloseFuture);
+ }
this.brokerService = null;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 760075b..e03f58e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -555,7 +555,17 @@ public class TransactionMetadataStoreService {
return Collections.unmodifiableMap(stores);
}
- public void close () {
+ public synchronized void close () {
this.internalPinnedExecutor.shutdown();
+ stores.forEach((tcId, metadataStore) -> {
+ metadataStore.closeAsync().whenComplete((v, ex) -> {
+ if (ex != null) {
+ LOG.error("Close transaction metadata store with id " +
tcId, ex);
+ } else {
+ LOG.info("Removed and closed transaction meta store {}",
tcId);
+ }
+ });
+ });
+ stores.clear();
}
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
index 5dbd9ba..8947413 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
@@ -33,6 +33,7 @@ public abstract class TransactionMetadataStoreState {
None,
Initializing,
Ready,
+ Closing,
Close
}
@@ -55,10 +56,14 @@ public abstract class TransactionMetadataStoreState {
return STATE_UPDATER.compareAndSet(this, State.None,
State.Initializing);
}
+ protected boolean changeToClosingState() {
+ return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Closing)
+ || STATE_UPDATER.compareAndSet(this, State.None, State.Closing)
+ || STATE_UPDATER.compareAndSet(this, State.Initializing,
State.Closing));
+ }
+
protected boolean changeToCloseState() {
- return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close)
- || STATE_UPDATER.compareAndSet(this, State.None, State.Close)
- || STATE_UPDATER.compareAndSet(this, State.Initializing,
State.Close));
+ return STATE_UPDATER.compareAndSet(this, State.Closing, State.Close);
}
protected boolean checkIfReady() {
@@ -68,4 +73,4 @@ public abstract class TransactionMetadataStoreState {
public State getState() {
return STATE_UPDATER.get(this);
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index f109ec4..b4100de 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.transaction.coordinator.impl;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -442,15 +444,24 @@ public class MLTransactionMetadataStore
@Override
public CompletableFuture<Void> closeAsync() {
- return transactionLog.closeAsync().thenCompose(v -> {
- txnMetaMap.clear();
- this.timeoutTracker.close();
- if (!this.changeToCloseState()) {
- return FutureUtil.failedFuture(
- new IllegalStateException("Managed ledger transaction
metadata store state to close error!"));
- }
+ if (changeToClosingState()) {
+ // Disable new tasks from being submitted
+ internalPinnedExecutor.shutdown();
+ return transactionLog.closeAsync().thenCompose(v -> {
+ txnMetaMap.clear();
+ this.timeoutTracker.close();
+ if (!this.changeToCloseState()) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException(
+ "Managed ledger transaction metadata store
state to close error!"));
+ }
+ // Shutdown the ExecutorService
+
MoreExecutors.shutdownAndAwaitTermination(internalPinnedExecutor,
Duration.ofSeconds(5L));
+ return CompletableFuture.completedFuture(null);
+ });
+ } else {
return CompletableFuture.completedFuture(null);
- });
+ }
}
@Override
@@ -504,4 +515,4 @@ public class MLTransactionMetadataStore
public ManagedLedger getManagedLedger() {
return this.transactionLog.getManagedLedger();
}
-}
\ No newline at end of file
+}