This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ddec86  [Tests] Fix thread leak in MLTransactionMetadataStore (#14524)
0ddec86 is described below

commit 0ddec86444bbdd94221032a3128f7dbe79934b93
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Mar 2 14:21:41 2022 +0200

    [Tests] Fix thread leak in MLTransactionMetadataStore (#14524)
    
    - MLTransactionMetadataStore.internalPinnedExecutor wasn't closed
      when MLTransactionMetadataStore.closeAsync was called
      - problem was introduced by #14238 changes
    
    - this issue causes tests to fail with OOME. Most likely this also impacts
      production code.
    
    * Close TransactionMetadataStoreService after the broker service has been 
closed
---
 .../org/apache/pulsar/broker/PulsarService.java    | 16 +++++++-----
 .../broker/TransactionMetadataStoreService.java    | 12 ++++++++-
 .../coordinator/TransactionMetadataStoreState.java | 13 +++++++---
 .../impl/MLTransactionMetadataStore.java           | 29 +++++++++++++++-------
 4 files changed, 50 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index fab49be..fcd5a5a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -423,11 +423,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 brokerAdditionalServlets = null;
             }
 
-            if (this.transactionMetadataStoreService != null) {
-                this.transactionMetadataStoreService.close();
-                this.transactionMetadataStoreService = null;
-            }
-
             GracefulExecutorServicesShutdown executorServicesShutdown =
                     GracefulExecutorServicesShutdown
                             .initiate()
@@ -446,7 +441,16 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
             List<CompletableFuture<Void>> asyncCloseFutures = new 
ArrayList<>();
             if (this.brokerService != null) {
-                asyncCloseFutures.add(this.brokerService.closeAsync());
+                CompletableFuture<Void> brokerCloseFuture = 
this.brokerService.closeAsync();
+                if (this.transactionMetadataStoreService != null) {
+                    asyncCloseFutures.add(brokerCloseFuture.whenComplete((__, 
___) -> {
+                        // close transactionMetadataStoreService after the 
broker has been closed
+                        this.transactionMetadataStoreService.close();
+                        this.transactionMetadataStoreService = null;
+                    }));
+                } else {
+                    asyncCloseFutures.add(brokerCloseFuture);
+                }
                 this.brokerService = null;
             }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 760075b..e03f58e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -555,7 +555,17 @@ public class TransactionMetadataStoreService {
         return Collections.unmodifiableMap(stores);
     }
 
-    public void close () {
+    public synchronized void close () {
         this.internalPinnedExecutor.shutdown();
+        stores.forEach((tcId, metadataStore) -> {
+            metadataStore.closeAsync().whenComplete((v, ex) -> {
+                if (ex != null) {
+                    LOG.error("Close transaction metadata store with id " + 
tcId, ex);
+                } else {
+                    LOG.info("Removed and closed transaction meta store {}", 
tcId);
+                }
+            });
+        });
+        stores.clear();
     }
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
index 5dbd9ba..8947413 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
@@ -33,6 +33,7 @@ public abstract class TransactionMetadataStoreState {
         None,
         Initializing,
         Ready,
+        Closing,
         Close
     }
 
@@ -55,10 +56,14 @@ public abstract class TransactionMetadataStoreState {
         return STATE_UPDATER.compareAndSet(this, State.None, 
State.Initializing);
     }
 
+    protected boolean changeToClosingState() {
+        return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Closing)
+                || STATE_UPDATER.compareAndSet(this, State.None, State.Closing)
+                || STATE_UPDATER.compareAndSet(this, State.Initializing, 
State.Closing));
+    }
+
     protected boolean changeToCloseState() {
-        return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close)
-                || STATE_UPDATER.compareAndSet(this, State.None, State.Close)
-                || STATE_UPDATER.compareAndSet(this, State.Initializing, 
State.Close));
+        return STATE_UPDATER.compareAndSet(this, State.Closing, State.Close);
     }
 
     protected boolean checkIfReady() {
@@ -68,4 +73,4 @@ public abstract class TransactionMetadataStoreState {
     public State getState() {
         return STATE_UPDATER.get(this);
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index f109ec4..b4100de 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -442,15 +444,24 @@ public class MLTransactionMetadataStore
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        return transactionLog.closeAsync().thenCompose(v -> {
-            txnMetaMap.clear();
-            this.timeoutTracker.close();
-            if (!this.changeToCloseState()) {
-                return FutureUtil.failedFuture(
-                        new IllegalStateException("Managed ledger transaction 
metadata store state to close error!"));
-            }
+        if (changeToClosingState()) {
+            // Disable new tasks from being submitted
+            internalPinnedExecutor.shutdown();
+            return transactionLog.closeAsync().thenCompose(v -> {
+                txnMetaMap.clear();
+                this.timeoutTracker.close();
+                if (!this.changeToCloseState()) {
+                    return FutureUtil.failedFuture(
+                            new IllegalStateException(
+                                    "Managed ledger transaction metadata store 
state to close error!"));
+                }
+                // Shutdown the ExecutorService
+                
MoreExecutors.shutdownAndAwaitTermination(internalPinnedExecutor, 
Duration.ofSeconds(5L));
+                return CompletableFuture.completedFuture(null);
+            });
+        } else {
             return CompletableFuture.completedFuture(null);
-        });
+        }
     }
 
     @Override
@@ -504,4 +515,4 @@ public class MLTransactionMetadataStore
     public ManagedLedger getManagedLedger() {
         return this.transactionLog.getManagedLedger();
     }
-}
\ No newline at end of file
+}

Reply via email to