This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 751e55f ISSUE #2020: close db properly to avoid open RocksDB failure
at the second time
751e55f is described below
commit 751e55fa433172422fb82556ff748c5f6f2bfc41
Author: Arvin <[email protected]>
AuthorDate: Tue Apr 2 00:27:42 2019 +0800
ISSUE #2020: close db properly to avoid open RocksDB failure at the second
time
Descriptions of the changes in this PR:
### Motivation
If not releasing resources of failed/closed asyncStore, new creating of the
same store identifier will fail, mainly caused by RocksDBException, like #2020
shows.
### Changes
add scStores to factory's instance variable at the `addstore` method of
`MVCCStoreFactoryImpl` class;
release store when open fail;
Descriptions of the changes in this PR:
Master Issue: #2020
Reviewers: Jia Zhai <[email protected]>, Sijie Guo <[email protected]>
This closes #2022 from ArvinDevel/issue2020, closes #2020
---
.../storage/impl/sc/StorageContainerRegistryImpl.java | 5 +++++
.../stream/storage/impl/store/MVCCStoreFactoryImpl.java | 16 +++++++++++++++-
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 160d9a8..4208b1a 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -101,6 +101,11 @@ public class StorageContainerRegistryImpl implements
StorageContainerRegistry {
} else {
log.warn("Fail to de-register StorageContainer ('{}')
when failed to start", scId, cause);
}
+ log.info("Release resources hold by StorageContainer
('{}') during de-register", scId);
+ newStorageContainer.stop().exceptionally(throwable -> {
+ log.error("Stop StorageContainer ('{}') fail during
de-register", scId);
+ return null;
+ });
} else {
log.info("Successfully started registered StorageContainer
('{}').", scId);
}
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
index 6f01d4b..e3d0a55 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
@@ -130,6 +130,7 @@ public class MVCCStoreFactoryImpl implements
MVCCStoreFactory {
Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores =
stores.get(scId);
if (null == scStores) {
scStores = Maps.newHashMap();
+ stores.putIfAbsent(scId, scStores);
}
RangeId rid = RangeId.of(streamId, rangeId);
MVCCAsyncStore<byte[], byte[]> oldStore = scStores.get(rid);
@@ -207,7 +208,18 @@ public class MVCCStoreFactoryImpl implements
MVCCStoreFactory {
.isReadonly(serveReadOnlyTable)
.build();
- return store.init(spec).thenApply(ignored -> {
+ return store.init(spec).whenComplete((ignored, throwable) -> {
+ // since the store has not been added, so can't release its
resources during close sc
+ if (null != throwable) {
+ log.info("Clearing resources hold by stream({})/range({}) at
storage container ({}) ",
+ streamId, rangeId, scId);
+ store.closeAsync().whenComplete((i, t) -> {
+ if (null != t) {
+ log.error("Clear resources hold by {} fail",
store.name());
+ }
+ });
+ }
+ }).thenApply(ignored -> {
log.info("Successfully initialize stream({})/range({}) at storage
container ({})",
streamId, rangeId, scId);
addStore(scId, streamId, rangeId, store);
@@ -222,11 +234,13 @@ public class MVCCStoreFactoryImpl implements
MVCCStoreFactory {
scStores = stores.remove(scId);
}
if (null == scStores) {
+ log.info("scStores for {} on store factory is null, return
directly", scId);
return FutureUtils.Void();
}
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
for (MVCCAsyncStore<byte[], byte[]> store : scStores.values()) {
+ log.info("Closing {} of sc {}", store.name(), scId);
closeFutures.add(store.closeAsync());
}