This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a52ce7  [TABLE SERVICE] Support getStreamById in root range store.
3a52ce7 is described below

commit 3a52ce74a92ac84f61a5834ef7ee701a7e6f7988
Author: Sijie Guo <[email protected]>
AuthorDate: Wed Jul 18 00:14:19 2018 -0700

    [TABLE SERVICE] Support getStreamById in root range store.
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    Stream readers need to fetch metadata information by quering its metadata 
using stream id.
    
    *Changes*
    
    Add support in root range to get stream properties using stream id.
    
    Author: Sijie Guo <[email protected]>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1550 from sijie/add_get_stream_by_id
---
 stream/proto/src/main/proto/storage.proto          |   1 +
 .../storage/impl/metadata/RootRangeStoreImpl.java  | 100 ++++++++++++++++-----
 .../impl/metadata/TestRootRangeStoreImpl.java      |  32 +++++++
 3 files changed, 109 insertions(+), 24 deletions(-)

diff --git a/stream/proto/src/main/proto/storage.proto 
b/stream/proto/src/main/proto/storage.proto
index e33ecbe..6648090 100644
--- a/stream/proto/src/main/proto/storage.proto
+++ b/stream/proto/src/main/proto/storage.proto
@@ -35,6 +35,7 @@ enum StatusCode {
 
     // 4xx: client errors
     BAD_REQUEST                 = 400;
+    ILLEGAL_OP                  = 403;
 
     // 5xx: server errors
     INTERNAL_SERVER_ERROR       = 500;
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
index 4314bce..2f2c404 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/RootRangeStoreImpl.java
@@ -52,6 +52,7 @@ import 
org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest.IdCase;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import 
org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
@@ -66,11 +67,12 @@ public class RootRangeStoreImpl
 
     private static final byte SYSTEM_TAG = (byte) 0xff;
     private static final byte NS_NAME_TAG = (byte) 0x01;
-    private static final byte NS_ID_TAG = (byte) 0x01;
+    private static final byte NS_ID_TAG = (byte) 0x02;
 
     // separator used for separating streams within a same namespace
     private static final byte NS_STREAM_NAME_SEP = (byte) 0x03;
     private static final byte NS_STREAM_ID_SEP = (byte) 0x04;
+    private static final byte STREAM_ID_TAG = (byte) 0x05;
     private static final byte NS_END_SEP = (byte) 0xff;
 
     static final byte[] NS_ID_KEY = new byte[]{SYSTEM_TAG, 'n', 's', 'i', 'd'};
@@ -133,6 +135,16 @@ public class RootRangeStoreImpl
         return streamIdBytes;
     }
 
+    /**
+     * stream id: [STREAM_ID_TAG][stream_id].
+     */
+    static final byte[] getStreamIdKey(long streamId) {
+        byte[] streamIdBytes = new byte[Long.BYTES + 1];
+        streamIdBytes[0] = STREAM_ID_TAG;
+        Bytes.toBytes(streamId, streamIdBytes, 1);
+        return streamIdBytes;
+    }
+
     private final URI defaultServiceUri;
     private final MVCCAsyncStore<byte[], byte[]> store;
     private final StorageContainerPlacementPolicy placementPolicy;
@@ -468,6 +480,8 @@ public class RootRangeStoreImpl
         byte[] streamNameVal = Bytes.toBytes(streamId);
         byte[] streamIdKey = getStreamIdKey(nsId, streamId);
         byte[] streamIdVal = streamProps.toByteArray();
+        byte[] streamReverseIndexKey = getStreamIdKey(streamId);
+        byte[] streamReverseIndexValue = Bytes.toBytes(nsId);
 
         TxnOp<byte[], byte[]> txn = store.newTxn()
             .If(
@@ -480,6 +494,7 @@ public class RootRangeStoreImpl
             .Then(
                 store.newPut(streamNameKey, streamNameVal),
                 store.newPut(streamIdKey, streamIdVal),
+                store.newPut(streamReverseIndexKey, streamReverseIndexValue),
                 store.newPut(STREAM_ID_KEY, Bytes.toBytes(streamId))
             )
             .build();
@@ -558,16 +573,19 @@ public class RootRangeStoreImpl
         byte[] nsIdKey = getNamespaceIdKey(nsId);
         byte[] streamNameKey = getStreamNameKey(nsId, streamName);
         byte[] streamIdKey = getStreamIdKey(nsId, streamId);
+        byte[] streamReverseIndexKey = getStreamIdKey(streamId);
 
         TxnOp<byte[], byte[]> txnOp = store.newTxn()
             .If(
                 store.newCompareValue(CompareResult.NOT_EQUAL, nsIdKey, null),
                 store.newCompareValue(CompareResult.NOT_EQUAL, streamNameKey, 
null),
-                store.newCompareValue(CompareResult.NOT_EQUAL, streamIdKey, 
null)
+                store.newCompareValue(CompareResult.NOT_EQUAL, streamIdKey, 
null),
+                store.newCompareValue(CompareResult.NOT_EQUAL, 
streamReverseIndexKey, null)
             )
             .Then(
                 store.newDelete(streamIdKey),
-                store.newDelete(streamNameKey)
+                store.newDelete(streamNameKey),
+                store.newDelete(streamReverseIndexKey)
             )
             .build();
 
@@ -586,40 +604,74 @@ public class RootRangeStoreImpl
         }).whenComplete((resp, cause) -> txnOp.close());
     }
 
+    private CompletableFuture<GetStreamResponse> streamPropertiesToResponse(
+        CompletableFuture<StreamProperties> propsFuture
+    ) {
+        GetStreamResponse.Builder respBuilder = GetStreamResponse.newBuilder();
+        return propsFuture.thenCompose(streamProps -> {
+            if (null == streamProps) {
+                return 
FutureUtils.value(respBuilder.setCode(StatusCode.STREAM_NOT_FOUND).build());
+            } else {
+                return FutureUtils.value(respBuilder
+                    .setCode(StatusCode.SUCCESS)
+                    .setStreamProps(streamProps)
+                    .build());
+            }
+        }).exceptionally(cause ->
+            respBuilder
+                .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+                .build()
+        );
+    }
+
     @Override
     public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest 
request) {
-        StreamName streamName = request.getStreamName();
+        if (IdCase.STREAM_ID == request.getIdCase()) {
+            return streamPropertiesToResponse(
+                getStreamProps(request.getStreamId()));
+        } else if (IdCase.STREAM_NAME == request.getIdCase()) {
+            return getStreamProps(request.getStreamName());
+        } else {
+            return FutureUtils.value(GetStreamResponse.newBuilder()
+                .setCode(StatusCode.ILLEGAL_OP)
+                .build());
+        }
+    }
 
+    CompletableFuture<StreamProperties> getStreamProps(long streamId) {
+        byte[] streamReverseIndexKey = getStreamIdKey(streamId);
+
+        return store.get(streamReverseIndexKey).thenCompose(nsIdBytes -> {
+            if (null == nsIdBytes) {
+                return FutureUtils.value(null);
+            }
+
+            long nsId = Bytes.toLong(nsIdBytes, 0);
+            return getStreamProps(nsId, streamId);
+        });
+    }
+
+    CompletableFuture<GetStreamResponse> getStreamProps(StreamName streamName) 
{
         StatusCode code = verifyStreamRequest(
-            streamName.getNamespaceName(),
-            streamName.getStreamName());
+                streamName.getNamespaceName(),
+                streamName.getStreamName());
         if (StatusCode.SUCCESS != code) {
-            return 
FutureUtils.value(GetStreamResponse.newBuilder().setCode(code).build());
+            return FutureUtils.value(GetStreamResponse.newBuilder()
+                .setCode(code).build());
         }
 
         byte[] nsNameKey = getNamespaceNameKey(streamName.getNamespaceName());
-        GetStreamResponse.Builder respBuilder = GetStreamResponse.newBuilder();
+
+
         return store.get(nsNameKey)
             .thenCompose(nsIdBytes -> {
                 if (null == nsIdBytes) {
-                    return 
FutureUtils.value(respBuilder.setCode(StatusCode.NAMESPACE_NOT_FOUND).build());
+                    return FutureUtils.value(GetStreamResponse.newBuilder()
+                        .setCode(StatusCode.NAMESPACE_NOT_FOUND).build());
                 }
-
                 long nsId = Bytes.toLong(nsIdBytes, 0);
-                return getStreamProps(nsId, streamName.getStreamName())
-                    .thenCompose(streamProps -> {
-                        if (null == streamProps) {
-                            return 
FutureUtils.value(respBuilder.setCode(StatusCode.STREAM_NOT_FOUND).build());
-                        } else {
-                            return FutureUtils.value(respBuilder
-                                .setCode(StatusCode.SUCCESS)
-                                .setStreamProps(streamProps)
-                                .build());
-                        }
-                    })
-                    .exceptionally(cause -> respBuilder
-                        .setCode(StatusCode.INTERNAL_SERVER_ERROR)
-                        .build());
+                return streamPropertiesToResponse(
+                    getStreamProps(nsId, streamName.getStreamName()));
             });
     }
 
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
index 93a9e47..da3b6d1 100644
--- 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/TestRootRangeStoreImpl.java
@@ -462,6 +462,24 @@ public class TestRootRangeStoreImpl extends 
MVCCAsyncStoreTestBase {
     }
 
     @Test
+    public void testGetStreamByIdSuccess() throws Exception {
+        String nsName = name.getMethodName();
+        String streamName = name.getMethodName();
+
+        createNamespaceAndVerify(nsName, 0L);
+        createStreamAndVerify(nsName, streamName, MIN_DATA_STREAM_ID);
+        verifyStreamId(MIN_DATA_STREAM_ID);
+
+        CompletableFuture<GetStreamResponse> getFuture = 
rootRangeStore.getStream(
+            createGetStreamRequest(MIN_DATA_STREAM_ID));
+        GetStreamResponse getResp = FutureUtils.result(getFuture);
+        assertEquals(StatusCode.SUCCESS, getResp.getCode());
+        assertEquals(MIN_DATA_STREAM_ID, 
getResp.getStreamProps().getStreamId());
+        assertEquals(streamName, getResp.getStreamProps().getStreamName());
+        assertEquals(streamConf, getResp.getStreamProps().getStreamConf());
+    }
+
+    @Test
     public void testGetStreamNotFound() throws Exception {
         String nsName = name.getMethodName();
         String streamName = name.getMethodName();
@@ -476,4 +494,18 @@ public class TestRootRangeStoreImpl extends 
MVCCAsyncStoreTestBase {
         assertEquals(StatusCode.STREAM_NOT_FOUND, response.getCode());
     }
 
+    @Test
+    public void testGetStreamByIdNotFound() throws Exception {
+        String nsName = name.getMethodName();
+
+        createNamespaceAndVerify(nsName, 0L);
+
+        verifyStreamId(-1);
+
+        CompletableFuture<GetStreamResponse> getFuture = 
rootRangeStore.getStream(
+            createGetStreamRequest(MIN_DATA_STREAM_ID));
+        GetStreamResponse response = FutureUtils.result(getFuture);
+        assertEquals(StatusCode.STREAM_NOT_FOUND, response.getCode());
+    }
+
 }

Reply via email to