This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 3a52ce7 [TABLE SERVICE] Support getStreamById in root range store.
3a52ce7 is described below
commit 3a52ce74a92ac84f61a5834ef7ee701a7e6f7988
Author: Sijie Guo <[email protected]>
AuthorDate: Wed Jul 18 00:14:19 2018 -0700
[TABLE SERVICE] Support getStreamById in root range store.
Descriptions of the changes in this PR:
*Motivation*
Stream readers need to fetch metadata information by quering its metadata
using stream id.
*Changes*
Add support in root range to get stream properties using stream id.
Author: Sijie Guo <[email protected]>
Reviewers: Jia Zhai <None>
This closes #1550 from sijie/add_get_stream_by_id
---
stream/proto/src/main/proto/storage.proto | 1 +
.../storage/impl/metadata/RootRangeStoreImpl.java | 100 ++++++++++++++++-----
.../impl/metadata/TestRootRangeStoreImpl.java | 32 +++++++
3 files changed, 109 insertions(+), 24 deletions(-)
diff --git a/stream/proto/src/main/proto/storage.proto
b/stream/proto/src/main/proto/storage.proto
index e33ecbe..6648090 100644
--- a/stream/proto/src/main/proto/storage.proto
+++ b/stream/proto/src/main/proto/storage.proto
@@ -35,6 +35,7 @@ enum StatusCode {
// 4xx: client errors
BAD_REQUEST = 400;
+ ILLEGAL_OP = 403;
// 5xx: server errors
INTERNAL_SERVER_ERROR = 500;
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
index 4314bce..2f2c404 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
@@ -52,6 +52,7 @@ import
org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest.IdCase;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import
org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
@@ -66,11 +67,12 @@ public class RootRangeStoreImpl
private static final byte SYSTEM_TAG = (byte) 0xff;
private static final byte NS_NAME_TAG = (byte) 0x01;
- private static final byte NS_ID_TAG = (byte) 0x01;
+ private static final byte NS_ID_TAG = (byte) 0x02;
// separator used for separating streams within a same namespace
private static final byte NS_STREAM_NAME_SEP = (byte) 0x03;
private static final byte NS_STREAM_ID_SEP = (byte) 0x04;
+ private static final byte STREAM_ID_TAG = (byte) 0x05;
private static final byte NS_END_SEP = (byte) 0xff;
static final byte[] NS_ID_KEY = new byte[]{SYSTEM_TAG, 'n', 's', 'i', 'd'};
@@ -133,6 +135,16 @@ public class RootRangeStoreImpl
return streamIdBytes;
}
+ /**
+ * stream id: [STREAM_ID_TAG][stream_id].
+ */
+ static final byte[] getStreamIdKey(long streamId) {
+ byte[] streamIdBytes = new byte[Long.BYTES + 1];
+ streamIdBytes[0] = STREAM_ID_TAG;
+ Bytes.toBytes(streamId, streamIdBytes, 1);
+ return streamIdBytes;
+ }
+
private final URI defaultServiceUri;
private final MVCCAsyncStore<byte[], byte[]> store;
private final StorageContainerPlacementPolicy placementPolicy;
@@ -468,6 +480,8 @@ public class RootRangeStoreImpl
byte[] streamNameVal = Bytes.toBytes(streamId);
byte[] streamIdKey = getStreamIdKey(nsId, streamId);
byte[] streamIdVal = streamProps.toByteArray();
+ byte[] streamReverseIndexKey = getStreamIdKey(streamId);
+ byte[] streamReverseIndexValue = Bytes.toBytes(nsId);
TxnOp<byte[], byte[]> txn = store.newTxn()
.If(
@@ -480,6 +494,7 @@ public class RootRangeStoreImpl
.Then(
store.newPut(streamNameKey, streamNameVal),
store.newPut(streamIdKey, streamIdVal),
+ store.newPut(streamReverseIndexKey, streamReverseIndexValue),
store.newPut(STREAM_ID_KEY, Bytes.toBytes(streamId))
)
.build();
@@ -558,16 +573,19 @@ public class RootRangeStoreImpl
byte[] nsIdKey = getNamespaceIdKey(nsId);
byte[] streamNameKey = getStreamNameKey(nsId, streamName);
byte[] streamIdKey = getStreamIdKey(nsId, streamId);
+ byte[] streamReverseIndexKey = getStreamIdKey(streamId);
TxnOp<byte[], byte[]> txnOp = store.newTxn()
.If(
store.newCompareValue(CompareResult.NOT_EQUAL, nsIdKey, null),
store.newCompareValue(CompareResult.NOT_EQUAL, streamNameKey,
null),
- store.newCompareValue(CompareResult.NOT_EQUAL, streamIdKey,
null)
+ store.newCompareValue(CompareResult.NOT_EQUAL, streamIdKey,
null),
+ store.newCompareValue(CompareResult.NOT_EQUAL,
streamReverseIndexKey, null)
)
.Then(
store.newDelete(streamIdKey),
- store.newDelete(streamNameKey)
+ store.newDelete(streamNameKey),
+ store.newDelete(streamReverseIndexKey)
)
.build();
@@ -586,40 +604,74 @@ public class RootRangeStoreImpl
}).whenComplete((resp, cause) -> txnOp.close());
}
+ private CompletableFuture<GetStreamResponse> streamPropertiesToResponse(
+ CompletableFuture<StreamProperties> propsFuture
+ ) {
+ GetStreamResponse.Builder respBuilder = GetStreamResponse.newBuilder();
+ return propsFuture.thenCompose(streamProps -> {
+ if (null == streamProps) {
+ return
FutureUtils.value(respBuilder.setCode(StatusCode.STREAM_NOT_FOUND).build());
+ } else {
+ return FutureUtils.value(respBuilder
+ .setCode(StatusCode.SUCCESS)
+ .setStreamProps(streamProps)
+ .build());
+ }
+ }).exceptionally(cause ->
+ respBuilder
+ .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ .build()
+ );
+ }
+
@Override
public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest
request) {
- StreamName streamName = request.getStreamName();
+ if (IdCase.STREAM_ID == request.getIdCase()) {
+ return streamPropertiesToResponse(
+ getStreamProps(request.getStreamId()));
+ } else if (IdCase.STREAM_NAME == request.getIdCase()) {
+ return getStreamProps(request.getStreamName());
+ } else {
+ return FutureUtils.value(GetStreamResponse.newBuilder()
+ .setCode(StatusCode.ILLEGAL_OP)
+ .build());
+ }
+ }
+ CompletableFuture<StreamProperties> getStreamProps(long streamId) {
+ byte[] streamReverseIndexKey = getStreamIdKey(streamId);
+
+ return store.get(streamReverseIndexKey).thenCompose(nsIdBytes -> {
+ if (null == nsIdBytes) {
+ return FutureUtils.value(null);
+ }
+
+ long nsId = Bytes.toLong(nsIdBytes, 0);
+ return getStreamProps(nsId, streamId);
+ });
+ }
+
+ CompletableFuture<GetStreamResponse> getStreamProps(StreamName streamName)
{
StatusCode code = verifyStreamRequest(
- streamName.getNamespaceName(),
- streamName.getStreamName());
+ streamName.getNamespaceName(),
+ streamName.getStreamName());
if (StatusCode.SUCCESS != code) {
- return
FutureUtils.value(GetStreamResponse.newBuilder().setCode(code).build());
+ return FutureUtils.value(GetStreamResponse.newBuilder()
+ .setCode(code).build());
}
byte[] nsNameKey = getNamespaceNameKey(streamName.getNamespaceName());
- GetStreamResponse.Builder respBuilder = GetStreamResponse.newBuilder();
+
+
return store.get(nsNameKey)
.thenCompose(nsIdBytes -> {
if (null == nsIdBytes) {
- return
FutureUtils.value(respBuilder.setCode(StatusCode.NAMESPACE_NOT_FOUND).build());
+ return FutureUtils.value(GetStreamResponse.newBuilder()
+ .setCode(StatusCode.NAMESPACE_NOT_FOUND).build());
}
-
long nsId = Bytes.toLong(nsIdBytes, 0);
- return getStreamProps(nsId, streamName.getStreamName())
- .thenCompose(streamProps -> {
- if (null == streamProps) {
- return
FutureUtils.value(respBuilder.setCode(StatusCode.STREAM_NOT_FOUND).build());
- } else {
- return FutureUtils.value(respBuilder
- .setCode(StatusCode.SUCCESS)
- .setStreamProps(streamProps)
- .build());
- }
- })
- .exceptionally(cause -> respBuilder
- .setCode(StatusCode.INTERNAL_SERVER_ERROR)
- .build());
+ return streamPropertiesToResponse(
+ getStreamProps(nsId, streamName.getStreamName()));
});
}
diff --git
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
index 93a9e47..da3b6d1 100644
---
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
+++
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
@@ -462,6 +462,24 @@ public class TestRootRangeStoreImpl extends
MVCCAsyncStoreTestBase {
}
@Test
+ public void testGetStreamByIdSuccess() throws Exception {
+ String nsName = name.getMethodName();
+ String streamName = name.getMethodName();
+
+ createNamespaceAndVerify(nsName, 0L);
+ createStreamAndVerify(nsName, streamName, MIN_DATA_STREAM_ID);
+ verifyStreamId(MIN_DATA_STREAM_ID);
+
+ CompletableFuture<GetStreamResponse> getFuture =
rootRangeStore.getStream(
+ createGetStreamRequest(MIN_DATA_STREAM_ID));
+ GetStreamResponse getResp = FutureUtils.result(getFuture);
+ assertEquals(StatusCode.SUCCESS, getResp.getCode());
+ assertEquals(MIN_DATA_STREAM_ID,
getResp.getStreamProps().getStreamId());
+ assertEquals(streamName, getResp.getStreamProps().getStreamName());
+ assertEquals(streamConf, getResp.getStreamProps().getStreamConf());
+ }
+
+ @Test
public void testGetStreamNotFound() throws Exception {
String nsName = name.getMethodName();
String streamName = name.getMethodName();
@@ -476,4 +494,18 @@ public class TestRootRangeStoreImpl extends
MVCCAsyncStoreTestBase {
assertEquals(StatusCode.STREAM_NOT_FOUND, response.getCode());
}
+ @Test
+ public void testGetStreamByIdNotFound() throws Exception {
+ String nsName = name.getMethodName();
+
+ createNamespaceAndVerify(nsName, 0L);
+
+ verifyStreamId(-1);
+
+ CompletableFuture<GetStreamResponse> getFuture =
rootRangeStore.getStream(
+ createGetStreamRequest(MIN_DATA_STREAM_ID));
+ GetStreamResponse response = FutureUtils.result(getFuture);
+ assertEquals(StatusCode.STREAM_NOT_FOUND, response.getCode());
+ }
+
}