This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 59e48a3  Disable using v2 decoder to decode v3 response if 
usingV2WireProtocol is enabled and handle unknown op in v2 request decoder
59e48a3 is described below

commit 59e48a390c1caa12adb7cc89afd2f9d48790a674
Author: Sijie Guo <si...@apache.org>
AuthorDate: Sat Mar 10 00:13:57 2018 -0800

    Disable using v2 decoder to decode v3 response if usingV2WireProtocol is 
enabled and handle unknown op in v2 request decoder
    
    Descriptions of the changes in this PR:
    
    *Problem*
    
    - response decoder should use whatever protocol that request is using to 
decode the response. otherwise it is making wrong assumption on response 
protocol.
    - v2 request decoder doesn't handle unknown op correctly, which will cause 
netty double-release bytebuf
    
    *Solution*
    
    - make sure response decoder respect the protocol that client is using for 
sending requests
    - v2 request decoder should throw exceptions when fail to decode unknown 
request
    
    Related issues: #198
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli 
<eolive...@gmail.com>, Jia Zhai <None>, Matteo Merli <mme...@apache.org>
    
    This closes #1240 from sijie/bookkeeper_fix_bytebuf_issues
---
 .../bookkeeper/proto/BookieProtoEncoding.java      |  21 ++--
 .../bookkeeper/proto/PerChannelBookieClient.java   |   4 +-
 .../bookkeeper/proto/BookieProtoEncodingTest.java  | 121 +++++++++++++++++++++
 .../bookkeeper/proto/TestBackwardCompatCMS42.java  |  22 +++-
 4 files changed, 148 insertions(+), 20 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index bca878b..a45f94e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -188,9 +188,10 @@ public class BookieProtoEncoding {
                 BookkeeperProtocol.AuthMessage.Builder builder = 
BookkeeperProtocol.AuthMessage.newBuilder();
                 builder.mergeFrom(new ByteBufInputStream(packet), 
extensionRegistry);
                 return new BookieProtocol.AuthRequest(version, 
builder.build());
-            }
 
-            return packet;
+            default:
+                throw new IllegalStateException("Received unknown request op 
code = " + opCode);
+            }
         }
 
         private static byte[] readMasterKey(ByteBuf packet) {
@@ -474,12 +475,12 @@ public class BookieProtoEncoding {
     public static class ResponseDecoder extends 
MessageToMessageDecoder<Object> {
         final EnDecoder repPreV3;
         final EnDecoder repV3;
-        boolean usingV3Protocol;
+        boolean usingV2Protocol;
 
-        ResponseDecoder(ExtensionRegistry extensionRegistry) {
+        ResponseDecoder(ExtensionRegistry extensionRegistry, boolean 
useV2Protocol) {
             repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
             repV3 = new ResponseEnDecoderV3(extensionRegistry);
-            usingV3Protocol = true;
+            usingV2Protocol = useV2Protocol;
         }
 
         @Override
@@ -493,14 +494,8 @@ public class BookieProtoEncoding {
             ByteBuf buffer = (ByteBuf) msg;
             buffer.markReaderIndex();
 
-            if (usingV3Protocol) {
-                try {
-                    out.add(repV3.decode(buffer));
-                } catch (InvalidProtocolBufferException e) {
-                    usingV3Protocol = false;
-                    buffer.resetReaderIndex();
-                    out.add(repPreV3.decode(buffer));
-                }
+            if (!usingV2Protocol) {
+                out.add(repV3.decode(buffer));
             } else {
                 // If in the same connection we already got preV3 messages, 
don't try again to decode V3 messages
                 out.add(repPreV3.decode(buffer));
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 440239f..cc97fe1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -439,7 +439,9 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                         new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 
0, 4));
                 pipeline.addLast("lengthprepender", new 
LengthFieldPrepender(4));
                 pipeline.addLast("bookieProtoEncoder", new 
BookieProtoEncoding.RequestEncoder(extRegistry));
-                pipeline.addLast("bookieProtoDecoder", new 
BookieProtoEncoding.ResponseDecoder(extRegistry));
+                pipeline.addLast(
+                    "bookieProtoDecoder",
+                    new BookieProtoEncoding.ResponseDecoder(extRegistry, 
useV2WireProtocol));
                 pipeline.addLast("authHandler", new 
AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
                             connectionPeer));
                 pipeline.addLast("mainhandler", PerChannelBookieClient.this);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
new file mode 100644
index 0000000..f182371
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
+import org.apache.bookkeeper.proto.BookieProtoEncoding.RequestEnDeCoderPreV3;
+import org.apache.bookkeeper.proto.BookieProtoEncoding.RequestEnDecoderV3;
+import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseDecoder;
+import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEnDeCoderPreV3;
+import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEnDecoderV3;
+import org.apache.bookkeeper.proto.BookieProtocol.AddResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest.Flag;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test {@link BookieProtoEncoding}.
+ */
+public class BookieProtoEncodingTest {
+
+    @Test
+    public void testV3ResponseDecoderNoFallback() throws Exception {
+        AddResponse v2Resp = AddResponse.create(
+            BookieProtocol.CURRENT_PROTOCOL_VERSION,
+            BookieProtocol.EOK,
+            1L,
+            2L);
+
+        BookkeeperProtocol.Response v3Resp = 
BookkeeperProtocol.Response.newBuilder()
+            .setHeader(BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setTxnId(1L)
+                .setOperation(OperationType.ADD_ENTRY)
+                .build())
+            .setStatus(StatusCode.EOK)
+            .setAddResponse(BookkeeperProtocol.AddResponse.newBuilder()
+                .setStatus(StatusCode.EOK)
+                .setLedgerId(1L)
+                .setEntryId(2L)
+                .build())
+            .build();
+
+        List<Object> outList = Lists.newArrayList();
+
+        ResponseEnDeCoderPreV3 v2Encoder = new ResponseEnDeCoderPreV3(null);
+        ResponseEnDecoderV3 v3Encoder = new ResponseEnDecoderV3(null);
+
+        ResponseDecoder v3Decoder = new ResponseDecoder(null, false);
+        try {
+            v3Decoder.decode(
+                mock(ChannelHandlerContext.class),
+                v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT),
+                outList
+            );
+            fail("V3 response decoder should fail on decoding v2 response");
+        } catch (InvalidProtocolBufferException e) {
+            // expected
+        }
+        assertEquals(0, outList.size());
+
+        v3Decoder.decode(
+            mock(ChannelHandlerContext.class),
+            v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT),
+            outList);
+        assertEquals(1, outList.size());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testV2RequestDecoderThrowExceptionOnUnknownRequests() throws 
Exception {
+        RequestEnDeCoderPreV3 v2ReqEncoder = new RequestEnDeCoderPreV3(null);
+        RequestEnDecoderV3 v3ReqEncoder = new RequestEnDecoderV3(null);
+
+        BookkeeperProtocol.Request v3Req = 
BookkeeperProtocol.Request.newBuilder()
+            .setHeader(BKPacketHeader.newBuilder()
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setTxnId(1L)
+                .setOperation(OperationType.ADD_ENTRY)
+                .build())
+            .setAddRequest(BookkeeperProtocol.AddRequest.newBuilder()
+                .setLedgerId(1L)
+                .setEntryId(2L)
+                .setMasterKey(ByteString.copyFrom("", UTF_8))
+                .setFlag(Flag.RECOVERY_ADD)
+                .setBody(ByteString.copyFrom("test", UTF_8)))
+            .build();
+
+
+        v2ReqEncoder.decode((ByteBuf) v3ReqEncoder.encode(v3Req, 
UnpooledByteBufAllocator.DEFAULT));
+    }
+
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
index 9e230b6..59811d1 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -47,17 +47,15 @@ import 
org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
 import org.apache.bookkeeper.proto.BookieProtocol.Request;
 import org.apache.bookkeeper.proto.BookieProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Test backward compatibility.
  */
 public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase {
-    static final Logger LOG = 
LoggerFactory.getLogger(TestBackwardCompatCMS42.class);
 
     private static final byte[] SUCCESS_RESPONSE = {1};
     private static final byte[] FAILURE_RESPONSE = {2};
@@ -181,7 +179,9 @@ public class TestBackwardCompatCMS42 extends 
BookKeeperClusterTestCase {
     }
 
     CompatClient42 newCompatClient(BookieSocketAddress addr) throws Exception {
-        return new CompatClient42(executor, eventLoopGroup, addr, 
authProvider, extRegistry);
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setUseV2WireProtocol(true);
+        return new CompatClient42(conf, executor, eventLoopGroup, addr, 
authProvider, extRegistry);
     }
 
     // extending PerChannelBookieClient to get the pipeline factory
@@ -190,11 +190,21 @@ public class TestBackwardCompatCMS42 extends 
BookKeeperClusterTestCase {
         Channel channel;
         final CountDownLatch connected = new CountDownLatch(1);
 
-        CompatClient42(OrderedSafeExecutor executor, EventLoopGroup 
eventLoopGroup,
+        CompatClient42(ClientConfiguration conf,
+                       OrderedSafeExecutor executor,
+                       EventLoopGroup eventLoopGroup,
                        BookieSocketAddress addr,
                        ClientAuthProvider.Factory authProviderFactory,
                        ExtensionRegistry extRegistry) throws Exception {
-            super(executor, eventLoopGroup, addr, authProviderFactory, 
extRegistry);
+            super(
+                conf,
+                executor,
+                eventLoopGroup,
+                addr,
+                NullStatsLogger.INSTANCE,
+                authProviderFactory,
+                extRegistry,
+                null);
 
             state = ConnectionState.CONNECTING;
             ChannelFuture future = connect();

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to