sijie closed pull request #1730: [table service][storage] improve the logic on
creating the meta range for a table
URL: https://github.com/apache/bookkeeper/pull/1730
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
index cac363f41e..9dda00202c 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
@@ -159,7 +159,7 @@ public StorageContainerStore build() {
placementPolicyFactory.newPlacementPolicy(),
storeResources.scheduler(),
mvccStoreFactory,
- defaultBackendUri);
+ clientManagerSupplier.get());
RangeStoreContainerServiceFactoryImpl containerServiceFactory =
new RangeStoreContainerServiceFactoryImpl(serviceFactory);
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
index 530d4cdce0..b64d3205f7 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
@@ -23,6 +23,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.bookkeeper.stream.proto.RangeMetadata;
@@ -48,14 +49,17 @@
private final ScheduledExecutorService executor;
private final StorageContainerPlacementPolicy rangePlacementPolicy;
private final Map<Long, MetaRangeImpl> streams;
+ private final StorageServerClientManager clientManager;
public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
StorageContainerPlacementPolicy
rangePlacementPolicy,
- ScheduledExecutorService executor) {
+ ScheduledExecutorService executor,
+ StorageServerClientManager clientManager) {
this.store = store;
this.executor = executor;
this.rangePlacementPolicy = rangePlacementPolicy;
this.streams = Maps.newHashMap();
+ this.clientManager = clientManager;
}
//
@@ -100,9 +104,10 @@ public MetaRangeStoreImpl(MVCCAsyncStore<byte[], byte[]>
store,
return metaRangeImpl.load(streamId)
.thenCompose(mr -> {
if (null == mr) {
- StreamProperties streamProps = request.hasStreamProps()
- ? request.getStreamProps() : null;
- return createStreamIfMissing(streamId, metaRangeImpl,
streamProps);
+ // meta range doesn't exist, talk to root range to get
the stream props
+ return clientManager.getStreamProperties(streamId)
+ .thenCompose(streamProperties ->
+ createStreamIfMissing(streamId, metaRangeImpl,
streamProperties));
} else {
synchronized (streams) {
streams.put(streamId, (MetaRangeImpl) mr);
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
index 2f2c4048e5..c066bd14e6 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
@@ -24,7 +24,6 @@
import static
org.apache.bookkeeper.stream.protocol.util.ProtoUtils.validateStreamName;
import com.google.protobuf.InvalidProtocolBufferException;
-import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
@@ -145,16 +144,13 @@
return streamIdBytes;
}
- private final URI defaultServiceUri;
private final MVCCAsyncStore<byte[], byte[]> store;
private final StorageContainerPlacementPolicy placementPolicy;
private final ScheduledExecutorService executor;
- public RootRangeStoreImpl(URI defaultServiceUri,
- MVCCAsyncStore<byte[], byte[]> store,
+ public RootRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
StorageContainerPlacementPolicy placementPolicy,
ScheduledExecutorService executor) {
- this.defaultServiceUri = defaultServiceUri;
this.store = store;
this.placementPolicy = placementPolicy;
this.executor = executor;
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
index 02dd8a3c93..937a9fa61f 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceFactoryImpl.java
@@ -18,7 +18,7 @@
package org.apache.bookkeeper.stream.storage.impl.service;
-import java.net.URI;
+import
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.SharedResourceManager;
import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
@@ -38,30 +38,29 @@
private final Resource<OrderedScheduler> schedulerResource;
private final OrderedScheduler scheduler;
private final MVCCStoreFactory storeFactory;
- private final URI defaultBackendUri;
+ private final StorageServerClientManager clientManager;
public RangeStoreServiceFactoryImpl(StorageConfiguration storageConf,
StorageContainerPlacementPolicy
rangePlacementPolicy,
Resource<OrderedScheduler>
schedulerResource,
MVCCStoreFactory storeFactory,
- URI defaultBackendUri) {
+ StorageServerClientManager
clientManager) {
this.storageConf = storageConf;
this.rangePlacementPolicy = rangePlacementPolicy;
this.schedulerResource = schedulerResource;
this.scheduler = SharedResourceManager.shared().get(schedulerResource);
this.storeFactory = storeFactory;
- this.defaultBackendUri = defaultBackendUri;
+ this.clientManager = clientManager;
}
@Override
public RangeStoreService createService(long scId) {
return new RangeStoreServiceImpl(
- storageConf,
scId,
rangePlacementPolicy,
scheduler,
storeFactory,
- defaultBackendUri);
+ clientManager);
}
@Override
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
index 7adcefdb75..e3b2d3991d 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
@@ -25,12 +25,12 @@
import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STREAM_ID;
import com.google.common.collect.Lists;
-import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
@@ -64,7 +64,6 @@
import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreCache;
import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreFactory;
import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreImpl;
@@ -100,19 +99,18 @@
@Getter(value = AccessLevel.PACKAGE)
private final TableStoreFactory tableStoreFactory;
- RangeStoreServiceImpl(StorageConfiguration storageConf,
- long scId,
+ RangeStoreServiceImpl(long scId,
StorageContainerPlacementPolicy rangePlacementPolicy,
OrderedScheduler scheduler,
MVCCStoreFactory storeFactory,
- URI defaultBackendUri) {
+ StorageServerClientManager clientManager) {
this(
scId,
scheduler,
storeFactory,
store -> new RootRangeStoreImpl(
- defaultBackendUri, store, rangePlacementPolicy,
scheduler.chooseThread(scId)),
- store -> new MetaRangeStoreImpl(store, rangePlacementPolicy,
scheduler.chooseThread(scId)),
+ store, rangePlacementPolicy, scheduler.chooseThread(scId)),
+ store -> new MetaRangeStoreImpl(store, rangePlacementPolicy,
scheduler.chooseThread(scId), clientManager),
store -> new TableStoreImpl(store));
}
diff --git
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
index d0156c55d2..5e5f4b3d7e 100644
---
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
+++
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
@@ -20,12 +20,16 @@
import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
import java.util.NavigableMap;
import java.util.stream.LongStream;
+import
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stream.proto.RangeMetadata;
import org.apache.bookkeeper.stream.proto.RangeState;
@@ -48,6 +52,7 @@
private StreamProperties streamProps;
private MetaRangeStoreImpl mrStoreImpl;
+ private StorageServerClientManager clientManager;
@Override
protected void doSetup() throws Exception {
@@ -57,10 +62,12 @@ protected void doSetup() throws Exception {
.setStreamName(name.getMethodName() + "_stream")
.setStreamId(System.currentTimeMillis())
.build();
+ this.clientManager = mock(StorageServerClientManager.class);
this.mrStoreImpl = new MetaRangeStoreImpl(
this.store,
StorageContainerPlacementPolicyImpl.of(1024),
- this.scheduler.chooseThread());
+ this.scheduler.chooseThread(),
+ clientManager);
}
@Override
@@ -68,11 +75,10 @@ protected void doTeardown() throws Exception {
}
GetActiveRangesRequest createRequest(StreamProperties streamProperties) {
+
when(clientManager.getStreamProperties(eq(this.streamProps.getStreamId())))
+ .thenReturn(FutureUtils.value(streamProperties));
GetActiveRangesRequest.Builder reqBuilder =
GetActiveRangesRequest.newBuilder()
.setStreamId(this.streamProps.getStreamId());
- if (null != streamProperties) {
- reqBuilder = reqBuilder.setStreamProps(streamProperties);
- }
return reqBuilder.build();
}
diff --git
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
index da3b6d1114..9cdbf6424d 100644
---
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
+++
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
@@ -33,7 +33,6 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import java.net.URI;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -57,8 +56,6 @@
@Slf4j
public class TestRootRangeStoreImpl extends MVCCAsyncStoreTestBase {
- private static final String DEFAULT_SERVICE_URI =
"distributedlog://127.0.0.1/stream/storage";
-
private final NamespaceConfiguration namespaceConf =
NamespaceConfiguration.newBuilder()
.setDefaultStreamConf(DEFAULT_STREAM_CONF)
@@ -74,7 +71,6 @@
@Override
protected void doSetup() throws Exception {
rootRangeStore = new RootRangeStoreImpl(
- URI.create(DEFAULT_SERVICE_URI),
store,
StorageContainerPlacementPolicyImpl.of(1024),
scheduler.chooseThread());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services