sijie closed pull request #1730: [table service][storage] improve the logic on 
creating the meta range for a table
URL: https://github.com/apache/bookkeeper/pull/1730
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
index cac363f41e..9dda00202c 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
@@ -159,7 +159,7 @@ public StorageContainerStore build() {
             placementPolicyFactory.newPlacementPolicy(),
             storeResources.scheduler(),
             mvccStoreFactory,
-            defaultBackendUri);
+            clientManagerSupplier.get());
 
         RangeStoreContainerServiceFactoryImpl containerServiceFactory =
             new RangeStoreContainerServiceFactoryImpl(serviceFactory);
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
index 530d4cdce0..b64d3205f7 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
@@ -23,6 +23,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
 import org.apache.bookkeeper.stream.proto.RangeMetadata;
@@ -48,14 +49,17 @@
     private final ScheduledExecutorService executor;
     private final StorageContainerPlacementPolicy rangePlacementPolicy;
     private final Map<Long, MetaRangeImpl> streams;
+    private final StorageServerClientManager clientManager;
 
     public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
                               StorageContainerPlacementPolicy 
rangePlacementPolicy,
-                              ScheduledExecutorService executor) {
+                              ScheduledExecutorService executor,
+                              StorageServerClientManager clientManager) {
         this.store = store;
         this.executor = executor;
         this.rangePlacementPolicy = rangePlacementPolicy;
         this.streams = Maps.newHashMap();
+        this.clientManager = clientManager;
     }
 
     //
@@ -100,9 +104,10 @@ public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> 
store,
             return metaRangeImpl.load(streamId)
                 .thenCompose(mr -> {
                     if (null == mr) {
-                        StreamProperties streamProps = request.hasStreamProps()
-                            ? request.getStreamProps() : null;
-                        return createStreamIfMissing(streamId, metaRangeImpl, 
streamProps);
+                        // meta range doesn't exist, talk to root range to get 
the stream props
+                        return clientManager.getStreamProperties(streamId)
+                            .thenCompose(streamProperties ->
+                                createStreamIfMissing(streamId, metaRangeImpl, 
streamProperties));
                     } else {
                         synchronized (streams) {
                             streams.put(streamId, (MetaRangeImpl) mr);
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
index 2f2c4048e5..c066bd14e6 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
@@ -24,7 +24,6 @@
 import static 
org.apache.bookkeeper.stream.protocol.util.ProtoUtils.validateStreamName;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-import java.net.URI;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import lombok.extern.slf4j.Slf4j;
@@ -145,16 +144,13 @@
         return streamIdBytes;
     }
 
-    private final URI defaultServiceUri;
     private final MVCCAsyncStore<byte[], byte[]> store;
     private final StorageContainerPlacementPolicy placementPolicy;
     private final ScheduledExecutorService executor;
 
-    public RootRangeStoreImpl(URI defaultServiceUri,
-                              MVCCAsyncStore<byte[], byte[]> store,
+    public RootRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
                               StorageContainerPlacementPolicy placementPolicy,
                               ScheduledExecutorService executor) {
-        this.defaultServiceUri = defaultServiceUri;
         this.store = store;
         this.placementPolicy = placementPolicy;
         this.executor = executor;
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
index 02dd8a3c93..937a9fa61f 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
@@ -18,7 +18,7 @@
 
 package org.apache.bookkeeper.stream.storage.impl.service;
 
-import java.net.URI;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.SharedResourceManager;
 import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
@@ -38,30 +38,29 @@
     private final Resource<OrderedScheduler> schedulerResource;
     private final OrderedScheduler scheduler;
     private final MVCCStoreFactory storeFactory;
-    private final URI defaultBackendUri;
+    private final StorageServerClientManager clientManager;
 
     public RangeStoreServiceFactoryImpl(StorageConfiguration storageConf,
                                         StorageContainerPlacementPolicy 
rangePlacementPolicy,
                                         Resource<OrderedScheduler> 
schedulerResource,
                                         MVCCStoreFactory storeFactory,
-                                        URI defaultBackendUri) {
+                                        StorageServerClientManager 
clientManager) {
         this.storageConf = storageConf;
         this.rangePlacementPolicy = rangePlacementPolicy;
         this.schedulerResource = schedulerResource;
         this.scheduler = SharedResourceManager.shared().get(schedulerResource);
         this.storeFactory = storeFactory;
-        this.defaultBackendUri = defaultBackendUri;
+        this.clientManager = clientManager;
     }
 
     @Override
     public RangeStoreService createService(long scId) {
         return new RangeStoreServiceImpl(
-            storageConf,
             scId,
             rangePlacementPolicy,
             scheduler,
             storeFactory,
-            defaultBackendUri);
+            clientManager);
     }
 
     @Override
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
index 7adcefdb75..e3b2d3991d 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
@@ -25,12 +25,12 @@
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STREAM_ID;
 
 import com.google.common.collect.Lists;
-import java.net.URI;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
@@ -64,7 +64,6 @@
 import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreCache;
 import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreFactory;
 import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreImpl;
@@ -100,19 +99,18 @@
     @Getter(value = AccessLevel.PACKAGE)
     private final TableStoreFactory tableStoreFactory;
 
-    RangeStoreServiceImpl(StorageConfiguration storageConf,
-                          long scId,
+    RangeStoreServiceImpl(long scId,
                           StorageContainerPlacementPolicy rangePlacementPolicy,
                           OrderedScheduler scheduler,
                           MVCCStoreFactory storeFactory,
-                          URI defaultBackendUri) {
+                          StorageServerClientManager clientManager) {
         this(
             scId,
             scheduler,
             storeFactory,
             store -> new RootRangeStoreImpl(
-                defaultBackendUri, store, rangePlacementPolicy, 
scheduler.chooseThread(scId)),
-            store -> new MetaRangeStoreImpl(store, rangePlacementPolicy, 
scheduler.chooseThread(scId)),
+                store, rangePlacementPolicy, scheduler.chooseThread(scId)),
+            store -> new MetaRangeStoreImpl(store, rangePlacementPolicy, 
scheduler.chooseThread(scId), clientManager),
             store -> new TableStoreImpl(store));
     }
 
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
index d0156c55d2..5e5f4b3d7e 100644
--- 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
@@ -20,12 +20,16 @@
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.stream.LongStream;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.stream.proto.RangeMetadata;
 import org.apache.bookkeeper.stream.proto.RangeState;
@@ -48,6 +52,7 @@
 
     private StreamProperties streamProps;
     private MetaRangeStoreImpl mrStoreImpl;
+    private StorageServerClientManager clientManager;
 
     @Override
     protected void doSetup() throws Exception {
@@ -57,10 +62,12 @@ protected void doSetup() throws Exception {
             .setStreamName(name.getMethodName() + "_stream")
             .setStreamId(System.currentTimeMillis())
             .build();
+        this.clientManager = mock(StorageServerClientManager.class);
         this.mrStoreImpl = new MetaRangeStoreImpl(
             this.store,
             StorageContainerPlacementPolicyImpl.of(1024),
-            this.scheduler.chooseThread());
+            this.scheduler.chooseThread(),
+            clientManager);
     }
 
     @Override
@@ -68,11 +75,10 @@ protected void doTeardown() throws Exception {
     }
 
     GetActiveRangesRequest createRequest(StreamProperties streamProperties) {
+        
when(clientManager.getStreamProperties(eq(this.streamProps.getStreamId())))
+            .thenReturn(FutureUtils.value(streamProperties));
         GetActiveRangesRequest.Builder reqBuilder = 
GetActiveRangesRequest.newBuilder()
             .setStreamId(this.streamProps.getStreamId());
-        if (null != streamProperties) {
-            reqBuilder = reqBuilder.setStreamProps(streamProperties);
-        }
         return reqBuilder.build();
     }
 
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
index da3b6d1114..9cdbf6424d 100644
--- 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
@@ -33,7 +33,6 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.net.URI;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -57,8 +56,6 @@
 @Slf4j
 public class TestRootRangeStoreImpl extends MVCCAsyncStoreTestBase {
 
-    private static final String DEFAULT_SERVICE_URI = 
"distributedlog://127.0.0.1/stream/storage";
-
     private final NamespaceConfiguration namespaceConf =
         NamespaceConfiguration.newBuilder()
             .setDefaultStreamConf(DEFAULT_STREAM_CONF)
@@ -74,7 +71,6 @@
     @Override
     protected void doSetup() throws Exception {
         rootRangeStore = new RootRangeStoreImpl(
-            URI.create(DEFAULT_SERVICE_URI),
             store,
             StorageContainerPlacementPolicyImpl.of(1024),
             scheduler.chooseThread());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to