This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 74192871ed0 [fix][meta] Check if metadata store is closed in
RocksdbMetadataStore (#22852)
74192871ed0 is described below
commit 74192871ed00870e5181a5bd4018ba196fd8f698
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jun 5 22:02:43 2024 +0300
[fix][meta] Check if metadata store is closed in RocksdbMetadataStore
(#22852)
---
.../metadata/impl/AbstractMetadataStore.java | 25 +++++++++++-----------
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 15 +++++++++++++
2 files changed, 27 insertions(+), 13 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index fa827bb40e7..7315e6a04a2 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -257,8 +257,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
@Override
public CompletableFuture<Optional<GetResult>> get(String path) {
if (isClosed()) {
- return FutureUtil.failedFuture(
- new MetadataStoreException.AlreadyClosedException());
+ return alreadyClosedFailedFuture();
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
@@ -286,8 +285,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
@Override
public final CompletableFuture<List<String>> getChildren(String path) {
if (isClosed()) {
- return FutureUtil.failedFuture(
- new MetadataStoreException.AlreadyClosedException());
+ return alreadyClosedFailedFuture();
}
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
@@ -298,8 +296,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
@Override
public final CompletableFuture<Boolean> exists(String path) {
if (isClosed()) {
- return FutureUtil.failedFuture(
- new MetadataStoreException.AlreadyClosedException());
+ return alreadyClosedFailedFuture();
}
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
@@ -362,8 +359,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
public final CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion) {
log.info("Deleting path: {} (v. {})", path, expectedVersion);
if (isClosed()) {
- return FutureUtil.failedFuture(
- new MetadataStoreException.AlreadyClosedException());
+ return alreadyClosedFailedFuture();
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
@@ -414,8 +410,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
public CompletableFuture<Void> deleteRecursive(String path) {
log.info("Deleting recursively path: {}", path);
if (isClosed()) {
- return FutureUtil.failedFuture(
- new MetadataStoreException.AlreadyClosedException());
+ return alreadyClosedFailedFuture();
}
return getChildren(path)
.thenCompose(children -> FutureUtil.waitForAll(
@@ -435,8 +430,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
public final CompletableFuture<Stat> put(String path, byte[] data,
Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
if (isClosed()) {
- return FutureUtil.failedFuture(
- new MetadataStoreException.AlreadyClosedException());
+ return alreadyClosedFailedFuture();
}
long start = System.currentTimeMillis();
if (!isValidPath(path)) {
@@ -516,10 +510,15 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
}
- private boolean isClosed() {
+ protected boolean isClosed() {
return isClosed.get();
}
+ protected static <T> CompletableFuture<T> alreadyClosedFailedFuture() {
+ return FutureUtil.failedFuture(
+ new MetadataStoreException.AlreadyClosedException());
+ }
+
@Override
public void close() throws Exception {
executor.shutdownNow();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 39f7edd5cee..06f7b260536 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -375,6 +375,9 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
try {
dbStateLock.readLock().lock();
+ if (isClosed()) {
+ return alreadyClosedFailedFuture();
+ }
byte[] value = db.get(optionCache, toBytes(path));
if (value == null) {
return CompletableFuture.completedFuture(Optional.empty());
@@ -407,6 +410,9 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
try {
dbStateLock.readLock().lock();
+ if (isClosed()) {
+ return alreadyClosedFailedFuture();
+ }
try (RocksIterator iterator = db.newIterator(optionDontCache)) {
Set<String> result = new HashSet<>();
String firstKey = path.equals("/") ? path : path + "/";
@@ -449,6 +455,9 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
try {
dbStateLock.readLock().lock();
+ if (isClosed()) {
+ return alreadyClosedFailedFuture();
+ }
byte[] value = db.get(optionDontCache, toBytes(path));
if (log.isDebugEnabled()) {
if (value != null) {
@@ -471,6 +480,9 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
try {
dbStateLock.readLock().lock();
+ if (isClosed()) {
+ return alreadyClosedFailedFuture();
+ }
try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData =
transaction.getForUpdate(optionDontCache, pathBytes, true);
@@ -507,6 +519,9 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
try {
dbStateLock.readLock().lock();
+ if (isClosed()) {
+ return alreadyClosedFailedFuture();
+ }
try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData =
transaction.getForUpdate(optionDontCache, pathBytes, true);