This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 751e55f  ISSUE #2020: close db properly to avoid open RocksDB failure 
at the second time
751e55f is described below

commit 751e55fa433172422fb82556ff748c5f6f2bfc41
Author: Arvin <[email protected]>
AuthorDate: Tue Apr 2 00:27:42 2019 +0800

    ISSUE #2020: close db properly to avoid open RocksDB failure at the second 
time
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    If not releasing resources of failed/closed asyncStore, new creating of the 
same store identifier will fail, mainly caused by RocksDBException, like #2020 
shows.
    
    ### Changes
    
    add scStores to factory's instance variable at the `addstore` method of 
`MVCCStoreFactoryImpl` class;
    release store when open fail;
    
    Descriptions of the changes in this PR:
    
    Master Issue: #2020
    
    
    
    Reviewers: Jia Zhai <[email protected]>, Sijie Guo <[email protected]>
    
    This closes #2022 from ArvinDevel/issue2020, closes #2020
---
 .../storage/impl/sc/StorageContainerRegistryImpl.java    |  5 +++++
 .../stream/storage/impl/store/MVCCStoreFactoryImpl.java  | 16 +++++++++++++++-
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 160d9a8..4208b1a 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -101,6 +101,11 @@ public class StorageContainerRegistryImpl implements 
StorageContainerRegistry {
                     } else {
                         log.warn("Fail to de-register StorageContainer ('{}') 
when failed to start", scId, cause);
                     }
+                    log.info("Release resources hold by StorageContainer 
('{}') during de-register", scId);
+                    newStorageContainer.stop().exceptionally(throwable -> {
+                        log.error("Stop StorageContainer ('{}') fail during 
de-register", scId);
+                        return null;
+                    });
                 } else {
                     log.info("Successfully started registered StorageContainer 
('{}').", scId);
                 }
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
index 6f01d4b..e3d0a55 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
@@ -130,6 +130,7 @@ public class MVCCStoreFactoryImpl implements 
MVCCStoreFactory {
         Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores = 
stores.get(scId);
         if (null == scStores) {
             scStores = Maps.newHashMap();
+            stores.putIfAbsent(scId, scStores);
         }
         RangeId rid = RangeId.of(streamId, rangeId);
         MVCCAsyncStore<byte[], byte[]> oldStore = scStores.get(rid);
@@ -207,7 +208,18 @@ public class MVCCStoreFactoryImpl implements 
MVCCStoreFactory {
             .isReadonly(serveReadOnlyTable)
             .build();
 
-        return store.init(spec).thenApply(ignored -> {
+        return store.init(spec).whenComplete((ignored, throwable) -> {
+            // since the store has not been added, so can't release its 
resources during close sc
+            if (null != throwable) {
+                log.info("Clearing resources hold by stream({})/range({}) at 
storage container ({}) ",
+                    streamId, rangeId, scId);
+                store.closeAsync().whenComplete((i, t) -> {
+                    if (null != t) {
+                        log.error("Clear resources hold by {} fail", 
store.name());
+                    }
+                });
+            }
+        }).thenApply(ignored -> {
             log.info("Successfully initialize stream({})/range({}) at storage 
container ({})",
                 streamId, rangeId, scId);
             addStore(scId, streamId, rangeId, store);
@@ -222,11 +234,13 @@ public class MVCCStoreFactoryImpl implements 
MVCCStoreFactory {
             scStores = stores.remove(scId);
         }
         if (null == scStores) {
+            log.info("scStores for {} on store factory is null, return 
directly", scId);
             return FutureUtils.Void();
         }
 
         List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
         for (MVCCAsyncStore<byte[], byte[]> store : scStores.values()) {
+            log.info("Closing {} of sc {}", store.name(), scId);
             closeFutures.add(store.closeAsync());
         }
 

Reply via email to