This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.6 by this push:
     new ae3f58d  Implement directly ChannelOutboundHandlerAdapter in 
BookieProtoEncoding#ResponseEncoder
ae3f58d is described below

commit ae3f58d3336d75bfd1cbe088d4704be073883987
Author: Enrico Olivelli <eolive...@apache.org>
AuthorDate: Mon Mar 26 11:40:26 2018 +0200

    Implement directly ChannelOutboundHandlerAdapter in 
BookieProtoEncoding#ResponseEncoder
    
    This change is mostly a clean up/refactor which drops intermediate 
MessageToMessageEncoder and MessageToMessageDecoder from BookieProtoEncoding
    
    Author: Enrico Olivelli <eolive...@apache.org>
    
    Reviewers: Ivan Kelly <iv...@apache.org>
    
    This closes #1293, and it is a manual cherry pick of #1286
---
 .../bookkeeper/proto/BookieProtoEncoding.java      | 164 +++++++++++----------
 .../bookkeeper/proto/BookieProtoEncodingTest.java  |  20 ++-
 2 files changed, 97 insertions(+), 87 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 15920dd..7a18a05 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,21 +20,10 @@
  */
 package org.apache.bookkeeper.proto;
 
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-import java.util.List;
-
-import org.apache.bookkeeper.client.MacDigestManager;
-import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
-import org.apache.bookkeeper.util.DoubleByteBuf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.ExtensionRegistry;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.MessageLite;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufInputStream;
@@ -42,9 +31,21 @@ import io.netty.buffer.ByteBufOutputStream;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToMessageDecoder;
-import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.ReferenceCountUtil;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import org.apache.bookkeeper.client.MacDigestManager;
+import org.apache.bookkeeper.util.DoubleByteBuf;
+import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * A class for encoding and decoding the Bookkeeper protocol.
+ */
 public class BookieProtoEncoding {
     private static final Logger LOG = 
LoggerFactory.getLogger(BookieProtoEncoding.class);
 
@@ -355,122 +356,127 @@ public class BookieProtoEncoding {
     }
 
     @Sharable
-    public static class RequestEncoder extends MessageToMessageEncoder<Object> 
{
+    public static class RequestEncoder extends ChannelOutboundHandlerAdapter {
 
-        final EnDecoder REQ_PREV3;
-        final EnDecoder REQ_V3;
+        final EnDecoder reqPreV3;
+        final EnDecoder reqV3;
 
         public RequestEncoder(ExtensionRegistry extensionRegistry) {
-            REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
-            REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+            reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+            reqV3 = new RequestEnDecoderV3(extensionRegistry);
         }
 
         @Override
-        protected void encode(ChannelHandlerContext ctx, Object msg, 
List<Object> out) throws Exception {
+        public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Encode request {} to channel {}.", msg, 
ctx.channel());
+            }
             if (msg instanceof BookkeeperProtocol.Request) {
-                out.add(REQ_V3.encode(msg, ctx.alloc()));
+                ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
             } else if (msg instanceof BookieProtocol.Request) {
-                out.add(REQ_PREV3.encode(msg, ctx.alloc()));
+                ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
             } else {
                 LOG.error("Invalid request to encode to {}: {}", 
ctx.channel(), msg.getClass().getName());
-                out.add(msg);
+                ctx.write(msg, promise);
             }
         }
     }
 
     @Sharable
-    public static class RequestDecoder extends MessageToMessageDecoder<Object> 
{
-        final EnDecoder REQ_PREV3;
-        final EnDecoder REQ_V3;
+    public static class RequestDecoder extends ChannelInboundHandlerAdapter {
+        final EnDecoder reqPreV3;
+        final EnDecoder reqV3;
         boolean usingV3Protocol;
 
         RequestDecoder(ExtensionRegistry extensionRegistry) {
-            REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
-            REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+            reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+            reqV3 = new RequestEnDecoderV3(extensionRegistry);
             usingV3Protocol = true;
         }
 
         @Override
-        protected void decode(ChannelHandlerContext ctx, Object msg, 
List<Object> out) throws Exception {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Received request {} from channel {} to decode.", 
msg, ctx.channel());
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Received request {} from channel {} to decode.", 
msg, ctx.channel());
             }
-            if (!(msg instanceof ByteBuf)) {
-                out.add(msg);
-                return;
-            }
-            ByteBuf buffer = (ByteBuf) msg;
-            buffer.markReaderIndex();
-
-            if (usingV3Protocol) {
-                try {
-                    out.add(REQ_V3.decode(buffer));
-                } catch (InvalidProtocolBufferException e) {
-                    usingV3Protocol = false;
-                    buffer.resetReaderIndex();
-                    out.add(REQ_PREV3.decode(buffer));
+            try {
+                if (!(msg instanceof ByteBuf)) {
+                    LOG.error("Received invalid request {} from channel {} to 
decode.", msg, ctx.channel());
+                    ctx.fireChannelRead(msg);
+                    return;
                 }
-            } else {
-                out.add(REQ_PREV3.decode(buffer));
+                ByteBuf buffer = (ByteBuf) msg;
+                buffer.markReaderIndex();
+                Object result;
+                if (usingV3Protocol) {
+                    try {
+                        result = reqV3.decode(buffer);
+                    } catch (InvalidProtocolBufferException e) {
+                        usingV3Protocol = false;
+                        buffer.resetReaderIndex();
+                        result = reqPreV3.decode(buffer);
+                    }
+                } else {
+                    result = reqPreV3.decode(buffer);
+                }
+                ctx.fireChannelRead(result);
+            } finally {
+                ReferenceCountUtil.release(msg);
             }
         }
     }
 
     @Sharable
-    public static class ResponseEncoder extends 
MessageToMessageEncoder<Object> {
-        final EnDecoder REP_PREV3;
-        final EnDecoder REP_V3;
+    public static class ResponseEncoder extends ChannelOutboundHandlerAdapter {
+        final EnDecoder repPreV3;
+        final EnDecoder repV3;
 
         ResponseEncoder(ExtensionRegistry extensionRegistry) {
-            REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
-            REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+            repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+            repV3 = new ResponseEnDecoderV3(extensionRegistry);
         }
 
         @Override
-        protected void encode(ChannelHandlerContext ctx, Object msg, 
List<Object> out)
-                throws Exception {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Encode response {} to channel {}.", msg, 
ctx.channel());
+        public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Encode response {} to channel {}.", msg, 
ctx.channel());
             }
             if (msg instanceof BookkeeperProtocol.Response) {
-                out.add(REP_V3.encode(msg, ctx.alloc()));
+                ctx.write(repV3.encode(msg, ctx.alloc()), promise);
             } else if (msg instanceof BookieProtocol.Response) {
-                out.add(REP_PREV3.encode(msg, ctx.alloc()));
+                ctx.write(repPreV3.encode(msg, ctx.alloc()), promise);
             } else {
                 LOG.error("Invalid response to encode to {}: {}", 
ctx.channel(), msg.getClass().getName());
-                out.add(msg);
+                ctx.write(msg, promise);
             }
         }
     }
 
     @Sharable
-    public static class ResponseDecoder extends 
MessageToMessageDecoder<Object> {
-        final EnDecoder REP_PREV3;
-        final EnDecoder REP_V3;
-        boolean usingV2Protocol;
+    public static class ResponseDecoder extends ChannelInboundHandlerAdapter {
+        final EnDecoder rep;
 
         ResponseDecoder(ExtensionRegistry extensionRegistry, boolean 
useV2Protocol) {
-            REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
-            REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
-            usingV2Protocol = useV2Protocol;
+            rep = useV2Protocol
+                    ? new ResponseEnDeCoderPreV3(extensionRegistry) : new 
ResponseEnDecoderV3(extensionRegistry);
         }
 
         @Override
-        protected void decode(ChannelHandlerContext ctx, Object msg, 
List<Object> out) throws Exception {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Received response {} from channel {} to decode.", 
msg, ctx.channel());
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Received response {} from channel {} to decode.", 
msg, ctx.channel());
             }
-            if (!(msg instanceof ByteBuf)) {
-                out.add(msg);
-            }
-            ByteBuf buffer = (ByteBuf) msg;
-            buffer.markReaderIndex();
-
-            if (!usingV2Protocol) {
-                out.add(REP_V3.decode(buffer));
-            } else {
-                // If in the same connection we already got preV3 messages, 
don't try again to decode V3 messages
-                out.add(REP_PREV3.decode(buffer));
+            try {
+                if (!(msg instanceof ByteBuf)) {
+                    LOG.error("Received invalid response {} from channel {} to 
decode.", msg, ctx.channel());
+                    ctx.fireChannelRead(msg);
+                    return;
+                }
+                ByteBuf buffer = (ByteBuf) msg;
+                buffer.markReaderIndex();
+                ctx.fireChannelRead(rep.decode(buffer));
+            } finally {
+                ReferenceCountUtil.release(msg);
             }
         }
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
index f182371..6f9cc9a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -21,7 +21,9 @@ package org.apache.bookkeeper.proto;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
@@ -71,16 +73,19 @@ public class BookieProtoEncodingTest {
             .build();
 
         List<Object> outList = Lists.newArrayList();
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.fireChannelRead(any())).thenAnswer((iom) -> {
+                outList.add(iom.getArgument(0));
+                return null;
+        });
 
         ResponseEnDeCoderPreV3 v2Encoder = new ResponseEnDeCoderPreV3(null);
         ResponseEnDecoderV3 v3Encoder = new ResponseEnDecoderV3(null);
 
         ResponseDecoder v3Decoder = new ResponseDecoder(null, false);
         try {
-            v3Decoder.decode(
-                mock(ChannelHandlerContext.class),
-                v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT),
-                outList
+            v3Decoder.channelRead(ctx,
+                v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT)
             );
             fail("V3 response decoder should fail on decoding v2 response");
         } catch (InvalidProtocolBufferException e) {
@@ -88,10 +93,9 @@ public class BookieProtoEncodingTest {
         }
         assertEquals(0, outList.size());
 
-        v3Decoder.decode(
-            mock(ChannelHandlerContext.class),
-            v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT),
-            outList);
+        v3Decoder.channelRead(
+            ctx,
+            v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT));
         assertEquals(1, outList.size());
     }
 

-- 
To stop receiving notification emails like this one, please contact
eolive...@apache.org.

Reply via email to