This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 59e48a3 Disable using v2 decoder to decode v3 response if
usingV2WireProtocol is enabled and handle unknown op in v2 request decoder
59e48a3 is described below
commit 59e48a390c1caa12adb7cc89afd2f9d48790a674
Author: Sijie Guo <[email protected]>
AuthorDate: Sat Mar 10 00:13:57 2018 -0800
Disable using v2 decoder to decode v3 response if usingV2WireProtocol is
enabled and handle unknown op in v2 request decoder
Descriptions of the changes in this PR:
*Problem*
- response decoder should use whatever protocol that request is using to
decode the response. otherwise it is making wrong assumption on response
protocol.
- v2 request decoder doesn't handle unknown op correctly, which will cause
netty double-release bytebuf
*Solution*
- make sure response decoder respect the protocol that client is using for
sending requests
- v2 request decoder should throw exceptions when fail to decode unknown
request
Related issues: #198
Author: Sijie Guo <[email protected]>
Reviewers: Ivan Kelly <[email protected]>, Enrico Olivelli
<[email protected]>, Jia Zhai <None>, Matteo Merli <[email protected]>
This closes #1240 from sijie/bookkeeper_fix_bytebuf_issues
---
.../bookkeeper/proto/BookieProtoEncoding.java | 21 ++--
.../bookkeeper/proto/PerChannelBookieClient.java | 4 +-
.../bookkeeper/proto/BookieProtoEncodingTest.java | 121 +++++++++++++++++++++
.../bookkeeper/proto/TestBackwardCompatCMS42.java | 22 +++-
4 files changed, 148 insertions(+), 20 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index bca878b..a45f94e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -188,9 +188,10 @@ public class BookieProtoEncoding {
BookkeeperProtocol.AuthMessage.Builder builder =
BookkeeperProtocol.AuthMessage.newBuilder();
builder.mergeFrom(new ByteBufInputStream(packet),
extensionRegistry);
return new BookieProtocol.AuthRequest(version,
builder.build());
- }
- return packet;
+ default:
+ throw new IllegalStateException("Received unknown request op
code = " + opCode);
+ }
}
private static byte[] readMasterKey(ByteBuf packet) {
@@ -474,12 +475,12 @@ public class BookieProtoEncoding {
public static class ResponseDecoder extends
MessageToMessageDecoder<Object> {
final EnDecoder repPreV3;
final EnDecoder repV3;
- boolean usingV3Protocol;
+ boolean usingV2Protocol;
- ResponseDecoder(ExtensionRegistry extensionRegistry) {
+ ResponseDecoder(ExtensionRegistry extensionRegistry, boolean
useV2Protocol) {
repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
repV3 = new ResponseEnDecoderV3(extensionRegistry);
- usingV3Protocol = true;
+ usingV2Protocol = useV2Protocol;
}
@Override
@@ -493,14 +494,8 @@ public class BookieProtoEncoding {
ByteBuf buffer = (ByteBuf) msg;
buffer.markReaderIndex();
- if (usingV3Protocol) {
- try {
- out.add(repV3.decode(buffer));
- } catch (InvalidProtocolBufferException e) {
- usingV3Protocol = false;
- buffer.resetReaderIndex();
- out.add(repPreV3.decode(buffer));
- }
+ if (!usingV2Protocol) {
+ out.add(repV3.decode(buffer));
} else {
// If in the same connection we already got preV3 messages,
don't try again to decode V3 messages
out.add(repPreV3.decode(buffer));
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 440239f..cc97fe1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -439,7 +439,9 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4,
0, 4));
pipeline.addLast("lengthprepender", new
LengthFieldPrepender(4));
pipeline.addLast("bookieProtoEncoder", new
BookieProtoEncoding.RequestEncoder(extRegistry));
- pipeline.addLast("bookieProtoDecoder", new
BookieProtoEncoding.ResponseDecoder(extRegistry));
+ pipeline.addLast(
+ "bookieProtoDecoder",
+ new BookieProtoEncoding.ResponseDecoder(extRegistry,
useV2WireProtocol));
pipeline.addLast("authHandler", new
AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
connectionPeer));
pipeline.addLast("mainhandler", PerChannelBookieClient.this);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
new file mode 100644
index 0000000..f182371
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
+import org.apache.bookkeeper.proto.BookieProtoEncoding.RequestEnDeCoderPreV3;
+import org.apache.bookkeeper.proto.BookieProtoEncoding.RequestEnDecoderV3;
+import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseDecoder;
+import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEnDeCoderPreV3;
+import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEnDecoderV3;
+import org.apache.bookkeeper.proto.BookieProtocol.AddResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest.Flag;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test {@link BookieProtoEncoding}.
+ */
+public class BookieProtoEncodingTest {
+
+ @Test
+ public void testV3ResponseDecoderNoFallback() throws Exception {
+ AddResponse v2Resp = AddResponse.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION,
+ BookieProtocol.EOK,
+ 1L,
+ 2L);
+
+ BookkeeperProtocol.Response v3Resp =
BookkeeperProtocol.Response.newBuilder()
+ .setHeader(BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setTxnId(1L)
+ .setOperation(OperationType.ADD_ENTRY)
+ .build())
+ .setStatus(StatusCode.EOK)
+ .setAddResponse(BookkeeperProtocol.AddResponse.newBuilder()
+ .setStatus(StatusCode.EOK)
+ .setLedgerId(1L)
+ .setEntryId(2L)
+ .build())
+ .build();
+
+ List<Object> outList = Lists.newArrayList();
+
+ ResponseEnDeCoderPreV3 v2Encoder = new ResponseEnDeCoderPreV3(null);
+ ResponseEnDecoderV3 v3Encoder = new ResponseEnDecoderV3(null);
+
+ ResponseDecoder v3Decoder = new ResponseDecoder(null, false);
+ try {
+ v3Decoder.decode(
+ mock(ChannelHandlerContext.class),
+ v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT),
+ outList
+ );
+ fail("V3 response decoder should fail on decoding v2 response");
+ } catch (InvalidProtocolBufferException e) {
+ // expected
+ }
+ assertEquals(0, outList.size());
+
+ v3Decoder.decode(
+ mock(ChannelHandlerContext.class),
+ v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT),
+ outList);
+ assertEquals(1, outList.size());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testV2RequestDecoderThrowExceptionOnUnknownRequests() throws
Exception {
+ RequestEnDeCoderPreV3 v2ReqEncoder = new RequestEnDeCoderPreV3(null);
+ RequestEnDecoderV3 v3ReqEncoder = new RequestEnDecoderV3(null);
+
+ BookkeeperProtocol.Request v3Req =
BookkeeperProtocol.Request.newBuilder()
+ .setHeader(BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setTxnId(1L)
+ .setOperation(OperationType.ADD_ENTRY)
+ .build())
+ .setAddRequest(BookkeeperProtocol.AddRequest.newBuilder()
+ .setLedgerId(1L)
+ .setEntryId(2L)
+ .setMasterKey(ByteString.copyFrom("", UTF_8))
+ .setFlag(Flag.RECOVERY_ADD)
+ .setBody(ByteString.copyFrom("test", UTF_8)))
+ .build();
+
+
+ v2ReqEncoder.decode((ByteBuf) v3ReqEncoder.encode(v3Req,
UnpooledByteBufAllocator.DEFAULT));
+ }
+
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
index 9e230b6..59811d1 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -47,17 +47,15 @@ import
org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
import org.apache.bookkeeper.proto.BookieProtocol.Request;
import org.apache.bookkeeper.proto.BookieProtocol.Response;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Test backward compatibility.
*/
public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase {
- static final Logger LOG =
LoggerFactory.getLogger(TestBackwardCompatCMS42.class);
private static final byte[] SUCCESS_RESPONSE = {1};
private static final byte[] FAILURE_RESPONSE = {2};
@@ -181,7 +179,9 @@ public class TestBackwardCompatCMS42 extends
BookKeeperClusterTestCase {
}
CompatClient42 newCompatClient(BookieSocketAddress addr) throws Exception {
- return new CompatClient42(executor, eventLoopGroup, addr,
authProvider, extRegistry);
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setUseV2WireProtocol(true);
+ return new CompatClient42(conf, executor, eventLoopGroup, addr,
authProvider, extRegistry);
}
// extending PerChannelBookieClient to get the pipeline factory
@@ -190,11 +190,21 @@ public class TestBackwardCompatCMS42 extends
BookKeeperClusterTestCase {
Channel channel;
final CountDownLatch connected = new CountDownLatch(1);
- CompatClient42(OrderedSafeExecutor executor, EventLoopGroup
eventLoopGroup,
+ CompatClient42(ClientConfiguration conf,
+ OrderedSafeExecutor executor,
+ EventLoopGroup eventLoopGroup,
BookieSocketAddress addr,
ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry) throws Exception {
- super(executor, eventLoopGroup, addr, authProviderFactory,
extRegistry);
+ super(
+ conf,
+ executor,
+ eventLoopGroup,
+ addr,
+ NullStatsLogger.INSTANCE,
+ authProviderFactory,
+ extRegistry,
+ null);
state = ConnectionState.CONNECTING;
ChannelFuture future = connect();
--
To stop receiving notification emails like this one, please contact
[email protected].