This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 74192871ed0 [fix][meta] Check if metadata store is closed in 
RocksdbMetadataStore (#22852)
74192871ed0 is described below

commit 74192871ed00870e5181a5bd4018ba196fd8f698
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jun 5 22:02:43 2024 +0300

    [fix][meta] Check if metadata store is closed in RocksdbMetadataStore 
(#22852)
---
 .../metadata/impl/AbstractMetadataStore.java       | 25 +++++++++++-----------
 .../pulsar/metadata/impl/RocksdbMetadataStore.java | 15 +++++++++++++
 2 files changed, 27 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index fa827bb40e7..7315e6a04a2 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -257,8 +257,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     @Override
     public CompletableFuture<Optional<GetResult>> get(String path) {
         if (isClosed()) {
-            return FutureUtil.failedFuture(
-                    new MetadataStoreException.AlreadyClosedException());
+            return alreadyClosedFailedFuture();
         }
         long start = System.currentTimeMillis();
         if (!isValidPath(path)) {
@@ -286,8 +285,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     @Override
     public final CompletableFuture<List<String>> getChildren(String path) {
         if (isClosed()) {
-            return FutureUtil.failedFuture(
-                    new MetadataStoreException.AlreadyClosedException());
+            return alreadyClosedFailedFuture();
         }
         if (!isValidPath(path)) {
             return FutureUtil.failedFuture(new 
MetadataStoreException.InvalidPathException(path));
@@ -298,8 +296,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     @Override
     public final CompletableFuture<Boolean> exists(String path) {
         if (isClosed()) {
-            return FutureUtil.failedFuture(
-                    new MetadataStoreException.AlreadyClosedException());
+            return alreadyClosedFailedFuture();
         }
         if (!isValidPath(path)) {
             return FutureUtil.failedFuture(new 
MetadataStoreException.InvalidPathException(path));
@@ -362,8 +359,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     public final CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion) {
         log.info("Deleting path: {} (v. {})", path, expectedVersion);
         if (isClosed()) {
-            return FutureUtil.failedFuture(
-                    new MetadataStoreException.AlreadyClosedException());
+            return alreadyClosedFailedFuture();
         }
         long start = System.currentTimeMillis();
         if (!isValidPath(path)) {
@@ -414,8 +410,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     public CompletableFuture<Void> deleteRecursive(String path) {
         log.info("Deleting recursively path: {}", path);
         if (isClosed()) {
-            return FutureUtil.failedFuture(
-                    new MetadataStoreException.AlreadyClosedException());
+            return alreadyClosedFailedFuture();
         }
         return getChildren(path)
                 .thenCompose(children -> FutureUtil.waitForAll(
@@ -435,8 +430,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     public final CompletableFuture<Stat> put(String path, byte[] data, 
Optional<Long> optExpectedVersion,
             EnumSet<CreateOption> options) {
         if (isClosed()) {
-            return FutureUtil.failedFuture(
-                    new MetadataStoreException.AlreadyClosedException());
+            return alreadyClosedFailedFuture();
         }
         long start = System.currentTimeMillis();
         if (!isValidPath(path)) {
@@ -516,10 +510,15 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         }
     }
 
-    private boolean isClosed() {
+    protected boolean isClosed() {
         return isClosed.get();
     }
 
+    protected static <T> CompletableFuture<T> alreadyClosedFailedFuture() {
+        return FutureUtil.failedFuture(
+                new MetadataStoreException.AlreadyClosedException());
+    }
+
     @Override
     public void close() throws Exception {
         executor.shutdownNow();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 39f7edd5cee..06f7b260536 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -375,6 +375,9 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
         }
         try {
             dbStateLock.readLock().lock();
+            if (isClosed()) {
+                return alreadyClosedFailedFuture();
+            }
             byte[] value = db.get(optionCache, toBytes(path));
             if (value == null) {
                 return CompletableFuture.completedFuture(Optional.empty());
@@ -407,6 +410,9 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
         }
         try {
             dbStateLock.readLock().lock();
+            if (isClosed()) {
+                return alreadyClosedFailedFuture();
+            }
             try (RocksIterator iterator = db.newIterator(optionDontCache)) {
                 Set<String> result = new HashSet<>();
                 String firstKey = path.equals("/") ? path : path + "/";
@@ -449,6 +455,9 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
         }
         try {
             dbStateLock.readLock().lock();
+            if (isClosed()) {
+                return alreadyClosedFailedFuture();
+            }
             byte[] value = db.get(optionDontCache, toBytes(path));
             if (log.isDebugEnabled()) {
                 if (value != null) {
@@ -471,6 +480,9 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
         }
         try {
             dbStateLock.readLock().lock();
+            if (isClosed()) {
+                return alreadyClosedFailedFuture();
+            }
             try (Transaction transaction = db.beginTransaction(writeOptions)) {
                 byte[] pathBytes = toBytes(path);
                 byte[] oldValueData = 
transaction.getForUpdate(optionDontCache, pathBytes, true);
@@ -507,6 +519,9 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
         }
         try {
             dbStateLock.readLock().lock();
+            if (isClosed()) {
+                return alreadyClosedFailedFuture();
+            }
             try (Transaction transaction = db.beginTransaction(writeOptions)) {
                 byte[] pathBytes = toBytes(path);
                 byte[] oldValueData = 
transaction.getForUpdate(optionDontCache, pathBytes, true);

Reply via email to