sijie closed pull request #1708: [table service] introduce a client interceptor 
to attach routing information for table service rpc calls
URL: https://github.com/apache/bookkeeper/pull/1708
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index 8660aab9b5..76c763138d 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -20,6 +20,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import io.grpc.Channel;
+import io.grpc.ClientInterceptor;
 import io.grpc.ClientInterceptors;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -143,10 +144,13 @@ public synchronized TableServiceFutureStub 
getTableService() {
      * @return an intercepted server channel.
      */
     public StorageServerChannel intercept(long scId) {
+        return intercept(new StorageContainerClientInterceptor(scId));
+    }
+
+    public StorageServerChannel intercept(ClientInterceptor... interceptors) {
         Channel interceptedChannel = ClientInterceptors.intercept(
             this.channel,
-            new StorageContainerClientInterceptor(scId));
-
+            interceptors);
         return new StorageServerChannel(
             interceptedChannel,
             this.token);
diff --git 
a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
 
b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
new file mode 100644
index 0000000000..2ef970f92f
--- /dev/null
+++ 
b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv.interceptors;
+
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RANGE_ID_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROUTING_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.STREAM_ID_KEY;
+
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+import com.google.protobuf.UnsafeByteOperations;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+
+/**
+ * A client interceptor that intercepting kv rpcs to attach routing 
information.
+ */
+@Slf4j
+public class RoutingHeaderClientInterceptor implements ClientInterceptor {
+
+    static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of(
+        RANGE_ID_KEY,
+        LongBinaryMarshaller.of()
+    );
+    static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of(
+        STREAM_ID_KEY,
+        LongBinaryMarshaller.of()
+    );
+    static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of(
+        ROUTING_KEY,
+        IdentityBinaryMarshaller.of()
+    );
+
+    /**
+     * Table request mutator that mutates a table service rpc request to attach
+     * the routing information.
+     */
+    private interface TableRequestMutator<ReqT> {
+
+        /**
+         * Mutate the provided <tt>request</tt> to attach the given routing 
information.
+         *
+         * @param request table request to be mutated
+         * @param sid stream id
+         * @param rid range id
+         * @param rk routing key
+         * @return the mutated request
+         */
+        ReqT intercept(ReqT request,
+                       Long sid,
+                       Long rid,
+                       byte[] rk);
+
+    }
+
+    private static RoutingHeader.Builder newRoutingHeaderBuilder(RoutingHeader 
header,
+                                                                 Long sid,
+                                                                 Long rid,
+                                                                 byte[] rk) {
+        return RoutingHeader.newBuilder(header)
+                .setStreamId(sid)
+                .setRangeId(rid)
+                .setRKey(UnsafeByteOperations.unsafeWrap(rk));
+    }
+
+    private static final TableRequestMutator<PutRequest> PUT_INTERCEPTOR =
+        (request, sid, rid, rk) -> PutRequest.newBuilder(request)
+            .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, 
rk))
+            .build();
+
+    private static final TableRequestMutator<RangeRequest> RANGE_INTERCEPTOR =
+        (request, sid, rid, rk) -> RangeRequest.newBuilder(request)
+            .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, 
rk))
+            .build();
+
+    private static final TableRequestMutator<DeleteRangeRequest> 
DELETE_INTERCEPTOR =
+        (request, sid, rid, rk) -> DeleteRangeRequest.newBuilder(request)
+            .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, 
rk))
+            .build();
+
+    private static final TableRequestMutator<IncrementRequest> 
INCR_INTERCEPTOR =
+        (request, sid, rid, rk) -> IncrementRequest.newBuilder(request)
+            .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, 
rk))
+            .build();
+
+    private static final TableRequestMutator<TxnRequest> TXN_INTERCEPTOR =
+        (request, sid, rid, rk) -> TxnRequest.newBuilder(request)
+            .setHeader(newRoutingHeaderBuilder(request.getHeader(), sid, rid, 
rk))
+            .build();
+
+    @Data(staticConstructor = "of")
+    private static class InterceptorDescriptor<T extends MessageLite> {
+
+        private final Class<T> clz;
+        private final Parser<T> parser;
+        private final TableRequestMutator<T> interceptor;
+
+    }
+
+    private static Map<String, InterceptorDescriptor<?>> kvRpcMethods = new 
HashMap<>();
+    static {
+        kvRpcMethods.put(
+            TableServiceGrpc.getPutMethod().getFullMethodName(),
+            InterceptorDescriptor.of(
+                PutRequest.class, PutRequest.parser(), PUT_INTERCEPTOR
+            )
+        );
+        kvRpcMethods.put(
+            TableServiceGrpc.getRangeMethod().getFullMethodName(),
+            InterceptorDescriptor.of(
+                RangeRequest.class, RangeRequest.parser(), RANGE_INTERCEPTOR
+            )
+        );
+        kvRpcMethods.put(
+            TableServiceGrpc.getDeleteMethod().getFullMethodName(),
+            InterceptorDescriptor.of(
+                DeleteRangeRequest.class, DeleteRangeRequest.parser(), 
DELETE_INTERCEPTOR
+            )
+        );
+        kvRpcMethods.put(
+            TableServiceGrpc.getIncrementMethod().getFullMethodName(),
+            InterceptorDescriptor.of(
+                IncrementRequest.class, IncrementRequest.parser(), 
INCR_INTERCEPTOR
+            )
+        );
+        kvRpcMethods.put(
+            TableServiceGrpc.getTxnMethod().getFullMethodName(),
+            InterceptorDescriptor.of(
+                TxnRequest.class, TxnRequest.parser(), TXN_INTERCEPTOR
+            )
+        );
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> 
interceptCall(MethodDescriptor<ReqT, RespT> method,
+                                                               CallOptions 
callOptions,
+                                                               Channel next) {
+        if (log.isTraceEnabled()) {
+            log.trace("Intercepting method {}", method.getFullMethodName());
+        }
+        InterceptorDescriptor<?> descriptor = 
kvRpcMethods.get(method.getFullMethodName());
+        if (null != descriptor) {
+            return new SimpleForwardingClientCall<ReqT, 
RespT>(next.newCall(method, callOptions)) {
+
+                private Long rid = null;
+                private Long sid = null;
+                private byte[] rk = null;
+
+                @Override
+                public void start(Listener<RespT> responseListener, Metadata 
headers) {
+                    // capture routing information from headers
+                    sid = headers.get(SID_METADATA_KEY);
+                    rid = headers.get(RID_METADATA_KEY);
+                    rk  = headers.get(RK_METADATA_KEY);
+                    if (log.isTraceEnabled()) {
+                        log.trace("Intercepting request with header : sid = 
{}, rid = {}, rk = {}",
+                            sid, rid, rk);
+                    }
+
+                    delegate().start(responseListener, headers);
+                }
+
+                @Override
+                public void sendMessage(ReqT message) {
+                    ReqT interceptedMessage;
+                    if (null == rid || null == sid || null == rk) {
+                        // we don't have enough information to form the new 
routing header
+                        // so do nothing
+                        interceptedMessage = message;
+                    } else {
+                        interceptedMessage = interceptMessage(
+                            method,
+                            descriptor,
+                            message,
+                            sid,
+                            rid,
+                            rk
+                        );
+                    }
+                    delegate().sendMessage(interceptedMessage);
+                }
+            };
+        } else {
+            return next.newCall(method, callOptions);
+        }
+    }
+
+    private <ReqT, TableReqT extends MessageLite> ReqT interceptMessage(
+        MethodDescriptor<ReqT, ?> method,
+        InterceptorDescriptor<TableReqT> descriptor,
+        ReqT message,
+        Long sid,
+        Long rid,
+        byte[] rk
+    ) {
+        if (null == descriptor) {
+            return message;
+        } else {
+            try {
+                return interceptTableRequest(method, descriptor, message, sid, 
rid, rk);
+            } catch (IOException ioe) {
+                return message;
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private <ReqT, TableReqT extends MessageLite> ReqT interceptTableRequest(
+        MethodDescriptor<ReqT, ?> method,
+        InterceptorDescriptor<TableReqT> interceptor,
+        ReqT message,
+        Long sid, Long rid, byte[] rk
+    ) throws IOException {
+        // put request
+        TableReqT request;
+        if (message.getClass() == interceptor.getClz()) {
+            request = (TableReqT) message;
+        } else {
+            InputStream is = method.getRequestMarshaller().stream(message);
+            request = interceptor.getParser().parseFrom(is);
+        }
+        TableReqT interceptedMessage = interceptor.getInterceptor().intercept(
+            request, sid, rid, rk
+        );
+        if (message.getClass() == interceptor.getClz()) {
+            return (ReqT) interceptedMessage;
+        } else {
+            byte[] reqBytes = new byte[interceptedMessage.getSerializedSize()];
+            
interceptedMessage.writeTo(CodedOutputStream.newInstance(reqBytes));
+            return method.getRequestMarshaller().parse(new 
ByteArrayInputStream(reqBytes));
+
+        }
+    }
+}
diff --git 
a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/package-info.java
 
b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/package-info.java
new file mode 100644
index 0000000000..2e56f75b4b
--- /dev/null
+++ 
b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Grpc Interceptors for table service.
+ */
+package org.apache.bookkeeper.clients.impl.kv.interceptors;
\ No newline at end of file
diff --git 
a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
 
b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
new file mode 100644
index 0000000000..745118b049
--- /dev/null
+++ 
b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv.interceptors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.ByteString;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors.CheckedForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
+import 
org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RoutingHeaderClientInterceptor}.
+ */
+@Slf4j
+public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase {
+
+    private final long streamId = 1234L;
+    private final long rangeId = 2345L;
+    private final byte[] routingKey = ("routing-key-" + 
System.currentTimeMillis()).getBytes(UTF_8);
+    private final AtomicReference<Object> receivedRequest = new 
AtomicReference<>();
+    private StorageServerChannel channel;
+
+    @Override
+    protected void doSetup() {
+        TableServiceImplBase tableService = new TableServiceImplBase() {
+
+            @Override
+            public void range(RangeRequest request, 
StreamObserver<RangeResponse> responseObserver) {
+                log.info("Received range request : {}", request);
+                receivedRequest.set(request);
+                responseObserver.onNext(RangeResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(StatusCode.SUCCESS)
+                        .setRoutingHeader(request.getHeader())
+                        .build())
+                    .build());
+                responseObserver.onCompleted();
+            }
+
+            @Override
+            public void delete(DeleteRangeRequest request, 
StreamObserver<DeleteRangeResponse> responseObserver) {
+                log.info("Received delete range request : {}", request);
+                receivedRequest.set(request);
+                responseObserver.onNext(DeleteRangeResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(StatusCode.SUCCESS)
+                        .setRoutingHeader(request.getHeader())
+                        .build())
+                    .build());
+                responseObserver.onCompleted();
+            }
+
+            @Override
+            public void txn(TxnRequest request, StreamObserver<TxnResponse> 
responseObserver) {
+                log.info("Received txn request : {}", request);
+                receivedRequest.set(request);
+                responseObserver.onNext(TxnResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(StatusCode.SUCCESS)
+                        .setRoutingHeader(request.getHeader())
+                        .build())
+                    .build());
+                responseObserver.onCompleted();
+            }
+
+            @Override
+            public void increment(IncrementRequest request, 
StreamObserver<IncrementResponse> responseObserver) {
+                log.info("Received incr request : {}", request);
+                receivedRequest.set(request);
+                responseObserver.onNext(IncrementResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(StatusCode.SUCCESS)
+                        .setRoutingHeader(request.getHeader())
+                        .build())
+                    .build());
+                responseObserver.onCompleted();
+            }
+
+            @Override
+            public void put(PutRequest request, StreamObserver<PutResponse> 
responseObserver) {
+                log.info("Received put request : {}", request);
+                receivedRequest.set(request);
+                responseObserver.onNext(PutResponse.newBuilder()
+                    .setHeader(ResponseHeader.newBuilder()
+                        .setCode(StatusCode.SUCCESS)
+                        .setRoutingHeader(request.getHeader())
+                        .build())
+                    .build());
+                responseObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(tableService.bindService());
+
+        this.channel = new StorageServerChannel(
+            
InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+            Optional.empty()
+        ).intercept(
+            new RoutingHeaderClientInterceptor(),
+            new ClientInterceptor() {
+                @Override
+                public <ReqT, RespT> ClientCall<ReqT, RespT> 
interceptCall(MethodDescriptor<ReqT, RespT> method,
+                                                                           
CallOptions callOptions,
+                                                                           
Channel next) {
+                    return new CheckedForwardingClientCall<ReqT, 
RespT>(next.newCall(method, callOptions)) {
+                        @Override
+                        protected void checkedStart(Listener<RespT> 
responseListener, Metadata headers) {
+                            log.info("Intercept the request with routing 
information : sid = {}, rid = {}, rk = {}",
+                                streamId, rangeId, new String(routingKey, 
UTF_8));
+                            headers.put(
+                                
RoutingHeaderClientInterceptor.RID_METADATA_KEY,
+                                rangeId
+                            );
+                            headers.put(
+                                
RoutingHeaderClientInterceptor.SID_METADATA_KEY,
+                                streamId
+                            );
+                            headers.put(
+                                RoutingHeaderClientInterceptor.RK_METADATA_KEY,
+                                routingKey
+                            );
+                            delegate().start(responseListener, headers);
+                        }
+                    };
+                }
+            }
+        );
+    }
+
+    @Override
+    protected void doTeardown() {
+    }
+
+    @Test
+    public void testPutRequest() throws Exception {
+        PutRequest request = PutRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("test-key"))
+            .build();
+        PutRequest expectedRequest = PutRequest.newBuilder(request)
+            .setHeader(RoutingHeader.newBuilder(request.getHeader())
+                .setStreamId(streamId)
+                .setRangeId(rangeId)
+                .setRKey(ByteString.copyFrom(routingKey))
+                .build())
+            .build();
+        PutResponse response = 
this.channel.getTableService().put(request).get();
+
+        assertEquals(expectedRequest, receivedRequest.get());
+        assertEquals(expectedRequest.getHeader(), 
response.getHeader().getRoutingHeader());
+    }
+
+    @Test
+    public void testRangeRequest() throws Exception {
+        RangeRequest request = RangeRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("test-key"))
+            .build();
+        RangeRequest expectedRequest = RangeRequest.newBuilder(request)
+            .setHeader(RoutingHeader.newBuilder(request.getHeader())
+                .setStreamId(streamId)
+                .setRangeId(rangeId)
+                .setRKey(ByteString.copyFrom(routingKey))
+                .build())
+            .build();
+        RangeResponse response = this.channel.getTableService()
+            .range(request).get();
+
+        assertEquals(expectedRequest, receivedRequest.get());
+        assertEquals(expectedRequest.getHeader(), 
response.getHeader().getRoutingHeader());
+    }
+
+    @Test
+    public void testDeleteRangeRequest() throws Exception {
+        DeleteRangeRequest request = DeleteRangeRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("test-key"))
+            .build();
+        DeleteRangeRequest expectedRequest = 
DeleteRangeRequest.newBuilder(request)
+            .setHeader(RoutingHeader.newBuilder(request.getHeader())
+                .setStreamId(streamId)
+                .setRangeId(rangeId)
+                .setRKey(ByteString.copyFrom(routingKey))
+                .build())
+            .build();
+        DeleteRangeResponse response = this.channel.getTableService()
+            .delete(request).get();
+
+        assertEquals(expectedRequest, receivedRequest.get());
+        assertEquals(expectedRequest.getHeader(), 
response.getHeader().getRoutingHeader());
+    }
+
+    @Test
+    public void testIncrementRequest() throws Exception {
+        IncrementRequest request = IncrementRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("test-key"))
+            .build();
+        IncrementRequest expectedRequest = IncrementRequest.newBuilder(request)
+            .setHeader(RoutingHeader.newBuilder(request.getHeader())
+                .setStreamId(streamId)
+                .setRangeId(rangeId)
+                .setRKey(ByteString.copyFrom(routingKey))
+                .build())
+            .build();
+        IncrementResponse response = this.channel.getTableService()
+            .increment(request).get();
+
+        assertEquals(expectedRequest, receivedRequest.get());
+        assertEquals(expectedRequest.getHeader(), 
response.getHeader().getRoutingHeader());
+    }
+
+    @Test
+    public void testTxnRequest() throws Exception {
+        TxnRequest request = TxnRequest.newBuilder()
+            .build();
+        TxnRequest expectedRequest = TxnRequest.newBuilder(request)
+            .setHeader(RoutingHeader.newBuilder(request.getHeader())
+                .setStreamId(streamId)
+                .setRangeId(rangeId)
+                .setRKey(ByteString.copyFrom(routingKey))
+                .build())
+            .build();
+        TxnResponse response = 
this.channel.getTableService().txn(request).get();
+
+        assertEquals(expectedRequest, receivedRequest.get());
+        assertEquals(expectedRequest.getHeader(), 
response.getHeader().getRoutingHeader());
+    }
+
+}
diff --git 
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
 
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
index 0e6900cc85..0088c4b308 100644
--- 
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
+++ 
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
@@ -110,4 +110,9 @@ private ProtocolConstants() {
 
     // storage container request metadata key
     public static final String SC_ID_KEY = "sc-id" + 
Metadata.BINARY_HEADER_SUFFIX;
+
+    // request metadata key for routing requests
+    public static final String ROUTING_KEY = "rk" + 
Metadata.BINARY_HEADER_SUFFIX;
+    public static final String STREAM_ID_KEY = "sid-" + 
Metadata.BINARY_HEADER_SUFFIX;
+    public static final String RANGE_ID_KEY = "rid-" + 
Metadata.BINARY_HEADER_SUFFIX;
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to