This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 59e48a3 Disable using v2 decoder to decode v3 response if usingV2WireProtocol is enabled and handle unknown op in v2 request decoder 59e48a3 is described below commit 59e48a390c1caa12adb7cc89afd2f9d48790a674 Author: Sijie Guo <si...@apache.org> AuthorDate: Sat Mar 10 00:13:57 2018 -0800 Disable using v2 decoder to decode v3 response if usingV2WireProtocol is enabled and handle unknown op in v2 request decoder Descriptions of the changes in this PR: *Problem* - response decoder should use whatever protocol that request is using to decode the response. otherwise it is making wrong assumption on response protocol. - v2 request decoder doesn't handle unknown op correctly, which will cause netty double-release bytebuf *Solution* - make sure response decoder respect the protocol that client is using for sending requests - v2 request decoder should throw exceptions when fail to decode unknown request Related issues: #198 Author: Sijie Guo <si...@apache.org> Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eolive...@gmail.com>, Jia Zhai <None>, Matteo Merli <mme...@apache.org> This closes #1240 from sijie/bookkeeper_fix_bytebuf_issues --- .../bookkeeper/proto/BookieProtoEncoding.java | 21 ++-- .../bookkeeper/proto/PerChannelBookieClient.java | 4 +- .../bookkeeper/proto/BookieProtoEncodingTest.java | 121 +++++++++++++++++++++ .../bookkeeper/proto/TestBackwardCompatCMS42.java | 22 +++- 4 files changed, 148 insertions(+), 20 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index bca878b..a45f94e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -188,9 +188,10 @@ public class BookieProtoEncoding { BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder(); builder.mergeFrom(new ByteBufInputStream(packet), extensionRegistry); return new BookieProtocol.AuthRequest(version, builder.build()); - } - return packet; + default: + throw new IllegalStateException("Received unknown request op code = " + opCode); + } } private static byte[] readMasterKey(ByteBuf packet) { @@ -474,12 +475,12 @@ public class BookieProtoEncoding { public static class ResponseDecoder extends MessageToMessageDecoder<Object> { final EnDecoder repPreV3; final EnDecoder repV3; - boolean usingV3Protocol; + boolean usingV2Protocol; - ResponseDecoder(ExtensionRegistry extensionRegistry) { + ResponseDecoder(ExtensionRegistry extensionRegistry, boolean useV2Protocol) { repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry); repV3 = new ResponseEnDecoderV3(extensionRegistry); - usingV3Protocol = true; + usingV2Protocol = useV2Protocol; } @Override @@ -493,14 +494,8 @@ public class BookieProtoEncoding { ByteBuf buffer = (ByteBuf) msg; buffer.markReaderIndex(); - if (usingV3Protocol) { - try { - out.add(repV3.decode(buffer)); - } catch (InvalidProtocolBufferException e) { - usingV3Protocol = false; - buffer.resetReaderIndex(); - out.add(repPreV3.decode(buffer)); - } + if (!usingV2Protocol) { + out.add(repV3.decode(buffer)); } else { // If in the same connection we already got preV3 messages, don't try again to decode V3 messages out.add(repPreV3.decode(buffer)); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 440239f..cc97fe1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -439,7 +439,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry)); - pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder(extRegistry)); + pipeline.addLast( + "bookieProtoDecoder", + new BookieProtoEncoding.ResponseDecoder(extRegistry, useV2WireProtocol)); pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator, connectionPeer)); pipeline.addLast("mainhandler", PerChannelBookieClient.this); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java new file mode 100644 index 0000000..f182371 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.proto; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import java.util.List; +import org.apache.bookkeeper.proto.BookieProtoEncoding.RequestEnDeCoderPreV3; +import org.apache.bookkeeper.proto.BookieProtoEncoding.RequestEnDecoderV3; +import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseDecoder; +import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEnDeCoderPreV3; +import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEnDecoderV3; +import org.apache.bookkeeper.proto.BookieProtocol.AddResponse; +import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest.Flag; +import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader; +import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion; +import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; +import org.junit.Test; + +/** + * Unit test {@link BookieProtoEncoding}. + */ +public class BookieProtoEncodingTest { + + @Test + public void testV3ResponseDecoderNoFallback() throws Exception { + AddResponse v2Resp = AddResponse.create( + BookieProtocol.CURRENT_PROTOCOL_VERSION, + BookieProtocol.EOK, + 1L, + 2L); + + BookkeeperProtocol.Response v3Resp = BookkeeperProtocol.Response.newBuilder() + .setHeader(BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setTxnId(1L) + .setOperation(OperationType.ADD_ENTRY) + .build()) + .setStatus(StatusCode.EOK) + .setAddResponse(BookkeeperProtocol.AddResponse.newBuilder() + .setStatus(StatusCode.EOK) + .setLedgerId(1L) + .setEntryId(2L) + .build()) + .build(); + + List<Object> outList = Lists.newArrayList(); + + ResponseEnDeCoderPreV3 v2Encoder = new ResponseEnDeCoderPreV3(null); + ResponseEnDecoderV3 v3Encoder = new ResponseEnDecoderV3(null); + + ResponseDecoder v3Decoder = new ResponseDecoder(null, false); + try { + v3Decoder.decode( + mock(ChannelHandlerContext.class), + v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT), + outList + ); + fail("V3 response decoder should fail on decoding v2 response"); + } catch (InvalidProtocolBufferException e) { + // expected + } + assertEquals(0, outList.size()); + + v3Decoder.decode( + mock(ChannelHandlerContext.class), + v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT), + outList); + assertEquals(1, outList.size()); + } + + @Test(expected = IllegalStateException.class) + public void testV2RequestDecoderThrowExceptionOnUnknownRequests() throws Exception { + RequestEnDeCoderPreV3 v2ReqEncoder = new RequestEnDeCoderPreV3(null); + RequestEnDecoderV3 v3ReqEncoder = new RequestEnDecoderV3(null); + + BookkeeperProtocol.Request v3Req = BookkeeperProtocol.Request.newBuilder() + .setHeader(BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setTxnId(1L) + .setOperation(OperationType.ADD_ENTRY) + .build()) + .setAddRequest(BookkeeperProtocol.AddRequest.newBuilder() + .setLedgerId(1L) + .setEntryId(2L) + .setMasterKey(ByteString.copyFrom("", UTF_8)) + .setFlag(Flag.RECOVERY_ADD) + .setBody(ByteString.copyFrom("test", UTF_8))) + .build(); + + + v2ReqEncoder.decode((ByteBuf) v3ReqEncoder.encode(v3Req, UnpooledByteBufAllocator.DEFAULT)); + } + +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java index 9e230b6..59811d1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java @@ -47,17 +47,15 @@ import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest; import org.apache.bookkeeper.proto.BookieProtocol.Request; import org.apache.bookkeeper.proto.BookieProtocol.Response; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Test backward compatibility. */ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase { - static final Logger LOG = LoggerFactory.getLogger(TestBackwardCompatCMS42.class); private static final byte[] SUCCESS_RESPONSE = {1}; private static final byte[] FAILURE_RESPONSE = {2}; @@ -181,7 +179,9 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase { } CompatClient42 newCompatClient(BookieSocketAddress addr) throws Exception { - return new CompatClient42(executor, eventLoopGroup, addr, authProvider, extRegistry); + ClientConfiguration conf = new ClientConfiguration(); + conf.setUseV2WireProtocol(true); + return new CompatClient42(conf, executor, eventLoopGroup, addr, authProvider, extRegistry); } // extending PerChannelBookieClient to get the pipeline factory @@ -190,11 +190,21 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase { Channel channel; final CountDownLatch connected = new CountDownLatch(1); - CompatClient42(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup, + CompatClient42(ClientConfiguration conf, + OrderedSafeExecutor executor, + EventLoopGroup eventLoopGroup, BookieSocketAddress addr, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry) throws Exception { - super(executor, eventLoopGroup, addr, authProviderFactory, extRegistry); + super( + conf, + executor, + eventLoopGroup, + addr, + NullStatsLogger.INSTANCE, + authProviderFactory, + extRegistry, + null); state = ConnectionState.CONNECTING; ChannelFuture future = connect(); -- To stop receiving notification emails like this one, please contact si...@apache.org.