This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-4.6 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.6 by this push: new ae3f58d Implement directly ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder ae3f58d is described below commit ae3f58d3336d75bfd1cbe088d4704be073883987 Author: Enrico Olivelli <eolive...@apache.org> AuthorDate: Mon Mar 26 11:40:26 2018 +0200 Implement directly ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder This change is mostly a clean up/refactor which drops intermediate MessageToMessageEncoder and MessageToMessageDecoder from BookieProtoEncoding Author: Enrico Olivelli <eolive...@apache.org> Reviewers: Ivan Kelly <iv...@apache.org> This closes #1293, and it is a manual cherry pick of #1286 --- .../bookkeeper/proto/BookieProtoEncoding.java | 164 +++++++++++---------- .../bookkeeper/proto/BookieProtoEncodingTest.java | 20 ++- 2 files changed, 97 insertions(+), 87 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 15920dd..7a18a05 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -20,21 +20,10 @@ */ package org.apache.bookkeeper.proto; -import java.io.IOException; -import java.security.NoSuchAlgorithmException; -import java.util.List; - -import org.apache.bookkeeper.client.MacDigestManager; -import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader; -import org.apache.bookkeeper.util.DoubleByteBuf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.protobuf.CodedOutputStream; import com.google.protobuf.ExtensionRegistry; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; @@ -42,9 +31,21 @@ import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; -import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.util.ReferenceCountUtil; +import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import org.apache.bookkeeper.client.MacDigestManager; +import org.apache.bookkeeper.util.DoubleByteBuf; +import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * A class for encoding and decoding the Bookkeeper protocol. + */ public class BookieProtoEncoding { private static final Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class); @@ -355,122 +356,127 @@ public class BookieProtoEncoding { } @Sharable - public static class RequestEncoder extends MessageToMessageEncoder<Object> { + public static class RequestEncoder extends ChannelOutboundHandlerAdapter { - final EnDecoder REQ_PREV3; - final EnDecoder REQ_V3; + final EnDecoder reqPreV3; + final EnDecoder reqV3; public RequestEncoder(ExtensionRegistry extensionRegistry) { - REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry); - REQ_V3 = new RequestEnDecoderV3(extensionRegistry); + reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry); + reqV3 = new RequestEnDecoderV3(extensionRegistry); } @Override - protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("Encode request {} to channel {}.", msg, ctx.channel()); + } if (msg instanceof BookkeeperProtocol.Request) { - out.add(REQ_V3.encode(msg, ctx.alloc())); + ctx.write(reqV3.encode(msg, ctx.alloc()), promise); } else if (msg instanceof BookieProtocol.Request) { - out.add(REQ_PREV3.encode(msg, ctx.alloc())); + ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise); } else { LOG.error("Invalid request to encode to {}: {}", ctx.channel(), msg.getClass().getName()); - out.add(msg); + ctx.write(msg, promise); } } } @Sharable - public static class RequestDecoder extends MessageToMessageDecoder<Object> { - final EnDecoder REQ_PREV3; - final EnDecoder REQ_V3; + public static class RequestDecoder extends ChannelInboundHandlerAdapter { + final EnDecoder reqPreV3; + final EnDecoder reqV3; boolean usingV3Protocol; RequestDecoder(ExtensionRegistry extensionRegistry) { - REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry); - REQ_V3 = new RequestEnDecoderV3(extensionRegistry); + reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry); + reqV3 = new RequestEnDecoderV3(extensionRegistry); usingV3Protocol = true; } @Override - protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Received request {} from channel {} to decode.", msg, ctx.channel()); + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("Received request {} from channel {} to decode.", msg, ctx.channel()); } - if (!(msg instanceof ByteBuf)) { - out.add(msg); - return; - } - ByteBuf buffer = (ByteBuf) msg; - buffer.markReaderIndex(); - - if (usingV3Protocol) { - try { - out.add(REQ_V3.decode(buffer)); - } catch (InvalidProtocolBufferException e) { - usingV3Protocol = false; - buffer.resetReaderIndex(); - out.add(REQ_PREV3.decode(buffer)); + try { + if (!(msg instanceof ByteBuf)) { + LOG.error("Received invalid request {} from channel {} to decode.", msg, ctx.channel()); + ctx.fireChannelRead(msg); + return; } - } else { - out.add(REQ_PREV3.decode(buffer)); + ByteBuf buffer = (ByteBuf) msg; + buffer.markReaderIndex(); + Object result; + if (usingV3Protocol) { + try { + result = reqV3.decode(buffer); + } catch (InvalidProtocolBufferException e) { + usingV3Protocol = false; + buffer.resetReaderIndex(); + result = reqPreV3.decode(buffer); + } + } else { + result = reqPreV3.decode(buffer); + } + ctx.fireChannelRead(result); + } finally { + ReferenceCountUtil.release(msg); } } } @Sharable - public static class ResponseEncoder extends MessageToMessageEncoder<Object> { - final EnDecoder REP_PREV3; - final EnDecoder REP_V3; + public static class ResponseEncoder extends ChannelOutboundHandlerAdapter { + final EnDecoder repPreV3; + final EnDecoder repV3; ResponseEncoder(ExtensionRegistry extensionRegistry) { - REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry); - REP_V3 = new ResponseEnDecoderV3(extensionRegistry); + repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry); + repV3 = new ResponseEnDecoderV3(extensionRegistry); } @Override - protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) - throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Encode response {} to channel {}.", msg, ctx.channel()); + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("Encode response {} to channel {}.", msg, ctx.channel()); } if (msg instanceof BookkeeperProtocol.Response) { - out.add(REP_V3.encode(msg, ctx.alloc())); + ctx.write(repV3.encode(msg, ctx.alloc()), promise); } else if (msg instanceof BookieProtocol.Response) { - out.add(REP_PREV3.encode(msg, ctx.alloc())); + ctx.write(repPreV3.encode(msg, ctx.alloc()), promise); } else { LOG.error("Invalid response to encode to {}: {}", ctx.channel(), msg.getClass().getName()); - out.add(msg); + ctx.write(msg, promise); } } } @Sharable - public static class ResponseDecoder extends MessageToMessageDecoder<Object> { - final EnDecoder REP_PREV3; - final EnDecoder REP_V3; - boolean usingV2Protocol; + public static class ResponseDecoder extends ChannelInboundHandlerAdapter { + final EnDecoder rep; ResponseDecoder(ExtensionRegistry extensionRegistry, boolean useV2Protocol) { - REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry); - REP_V3 = new ResponseEnDecoderV3(extensionRegistry); - usingV2Protocol = useV2Protocol; + rep = useV2Protocol + ? new ResponseEnDeCoderPreV3(extensionRegistry) : new ResponseEnDecoderV3(extensionRegistry); } @Override - protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Received response {} from channel {} to decode.", msg, ctx.channel()); + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("Received response {} from channel {} to decode.", msg, ctx.channel()); } - if (!(msg instanceof ByteBuf)) { - out.add(msg); - } - ByteBuf buffer = (ByteBuf) msg; - buffer.markReaderIndex(); - - if (!usingV2Protocol) { - out.add(REP_V3.decode(buffer)); - } else { - // If in the same connection we already got preV3 messages, don't try again to decode V3 messages - out.add(REP_PREV3.decode(buffer)); + try { + if (!(msg instanceof ByteBuf)) { + LOG.error("Received invalid response {} from channel {} to decode.", msg, ctx.channel()); + ctx.fireChannelRead(msg); + return; + } + ByteBuf buffer = (ByteBuf) msg; + buffer.markReaderIndex(); + ctx.fireChannelRead(rep.decode(buffer)); + } finally { + ReferenceCountUtil.release(msg); } } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java index f182371..6f9cc9a 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java @@ -21,7 +21,9 @@ package org.apache.bookkeeper.proto; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; @@ -71,16 +73,19 @@ public class BookieProtoEncodingTest { .build(); List<Object> outList = Lists.newArrayList(); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.fireChannelRead(any())).thenAnswer((iom) -> { + outList.add(iom.getArgument(0)); + return null; + }); ResponseEnDeCoderPreV3 v2Encoder = new ResponseEnDeCoderPreV3(null); ResponseEnDecoderV3 v3Encoder = new ResponseEnDecoderV3(null); ResponseDecoder v3Decoder = new ResponseDecoder(null, false); try { - v3Decoder.decode( - mock(ChannelHandlerContext.class), - v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT), - outList + v3Decoder.channelRead(ctx, + v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT) ); fail("V3 response decoder should fail on decoding v2 response"); } catch (InvalidProtocolBufferException e) { @@ -88,10 +93,9 @@ public class BookieProtoEncodingTest { } assertEquals(0, outList.size()); - v3Decoder.decode( - mock(ChannelHandlerContext.class), - v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT), - outList); + v3Decoder.channelRead( + ctx, + v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT)); assertEquals(1, outList.size()); } -- To stop receiving notification emails like this one, please contact eolive...@apache.org.