This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.6 by this push:
new ae3f58d Implement directly ChannelOutboundHandlerAdapter in
BookieProtoEncoding#ResponseEncoder
ae3f58d is described below
commit ae3f58d3336d75bfd1cbe088d4704be073883987
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon Mar 26 11:40:26 2018 +0200
Implement directly ChannelOutboundHandlerAdapter in
BookieProtoEncoding#ResponseEncoder
This change is mostly a clean up/refactor which drops intermediate
MessageToMessageEncoder and MessageToMessageDecoder from BookieProtoEncoding
Author: Enrico Olivelli <[email protected]>
Reviewers: Ivan Kelly <[email protected]>
This closes #1293, and it is a manual cherry pick of #1286
---
.../bookkeeper/proto/BookieProtoEncoding.java | 164 +++++++++++----------
.../bookkeeper/proto/BookieProtoEncodingTest.java | 20 ++-
2 files changed, 97 insertions(+), 87 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 15920dd..7a18a05 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,21 +20,10 @@
*/
package org.apache.bookkeeper.proto;
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-import java.util.List;
-
-import org.apache.bookkeeper.client.MacDigestManager;
-import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
-import org.apache.bookkeeper.util.DoubleByteBuf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
@@ -42,9 +31,21 @@ import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToMessageDecoder;
-import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.ReferenceCountUtil;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import org.apache.bookkeeper.client.MacDigestManager;
+import org.apache.bookkeeper.util.DoubleByteBuf;
+import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * A class for encoding and decoding the Bookkeeper protocol.
+ */
public class BookieProtoEncoding {
private static final Logger LOG =
LoggerFactory.getLogger(BookieProtoEncoding.class);
@@ -355,122 +356,127 @@ public class BookieProtoEncoding {
}
@Sharable
- public static class RequestEncoder extends MessageToMessageEncoder<Object>
{
+ public static class RequestEncoder extends ChannelOutboundHandlerAdapter {
- final EnDecoder REQ_PREV3;
- final EnDecoder REQ_V3;
+ final EnDecoder reqPreV3;
+ final EnDecoder reqV3;
public RequestEncoder(ExtensionRegistry extensionRegistry) {
- REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
- REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+ reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+ reqV3 = new RequestEnDecoderV3(extensionRegistry);
}
@Override
- protected void encode(ChannelHandlerContext ctx, Object msg,
List<Object> out) throws Exception {
+ public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Encode request {} to channel {}.", msg,
ctx.channel());
+ }
if (msg instanceof BookkeeperProtocol.Request) {
- out.add(REQ_V3.encode(msg, ctx.alloc()));
+ ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Request) {
- out.add(REQ_PREV3.encode(msg, ctx.alloc()));
+ ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
} else {
LOG.error("Invalid request to encode to {}: {}",
ctx.channel(), msg.getClass().getName());
- out.add(msg);
+ ctx.write(msg, promise);
}
}
}
@Sharable
- public static class RequestDecoder extends MessageToMessageDecoder<Object>
{
- final EnDecoder REQ_PREV3;
- final EnDecoder REQ_V3;
+ public static class RequestDecoder extends ChannelInboundHandlerAdapter {
+ final EnDecoder reqPreV3;
+ final EnDecoder reqV3;
boolean usingV3Protocol;
RequestDecoder(ExtensionRegistry extensionRegistry) {
- REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
- REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+ reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+ reqV3 = new RequestEnDecoderV3(extensionRegistry);
usingV3Protocol = true;
}
@Override
- protected void decode(ChannelHandlerContext ctx, Object msg,
List<Object> out) throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received request {} from channel {} to decode.",
msg, ctx.channel());
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received request {} from channel {} to decode.",
msg, ctx.channel());
}
- if (!(msg instanceof ByteBuf)) {
- out.add(msg);
- return;
- }
- ByteBuf buffer = (ByteBuf) msg;
- buffer.markReaderIndex();
-
- if (usingV3Protocol) {
- try {
- out.add(REQ_V3.decode(buffer));
- } catch (InvalidProtocolBufferException e) {
- usingV3Protocol = false;
- buffer.resetReaderIndex();
- out.add(REQ_PREV3.decode(buffer));
+ try {
+ if (!(msg instanceof ByteBuf)) {
+ LOG.error("Received invalid request {} from channel {} to
decode.", msg, ctx.channel());
+ ctx.fireChannelRead(msg);
+ return;
}
- } else {
- out.add(REQ_PREV3.decode(buffer));
+ ByteBuf buffer = (ByteBuf) msg;
+ buffer.markReaderIndex();
+ Object result;
+ if (usingV3Protocol) {
+ try {
+ result = reqV3.decode(buffer);
+ } catch (InvalidProtocolBufferException e) {
+ usingV3Protocol = false;
+ buffer.resetReaderIndex();
+ result = reqPreV3.decode(buffer);
+ }
+ } else {
+ result = reqPreV3.decode(buffer);
+ }
+ ctx.fireChannelRead(result);
+ } finally {
+ ReferenceCountUtil.release(msg);
}
}
}
@Sharable
- public static class ResponseEncoder extends
MessageToMessageEncoder<Object> {
- final EnDecoder REP_PREV3;
- final EnDecoder REP_V3;
+ public static class ResponseEncoder extends ChannelOutboundHandlerAdapter {
+ final EnDecoder repPreV3;
+ final EnDecoder repV3;
ResponseEncoder(ExtensionRegistry extensionRegistry) {
- REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
- REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+ repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+ repV3 = new ResponseEnDecoderV3(extensionRegistry);
}
@Override
- protected void encode(ChannelHandlerContext ctx, Object msg,
List<Object> out)
- throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Encode response {} to channel {}.", msg,
ctx.channel());
+ public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Encode response {} to channel {}.", msg,
ctx.channel());
}
if (msg instanceof BookkeeperProtocol.Response) {
- out.add(REP_V3.encode(msg, ctx.alloc()));
+ ctx.write(repV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Response) {
- out.add(REP_PREV3.encode(msg, ctx.alloc()));
+ ctx.write(repPreV3.encode(msg, ctx.alloc()), promise);
} else {
LOG.error("Invalid response to encode to {}: {}",
ctx.channel(), msg.getClass().getName());
- out.add(msg);
+ ctx.write(msg, promise);
}
}
}
@Sharable
- public static class ResponseDecoder extends
MessageToMessageDecoder<Object> {
- final EnDecoder REP_PREV3;
- final EnDecoder REP_V3;
- boolean usingV2Protocol;
+ public static class ResponseDecoder extends ChannelInboundHandlerAdapter {
+ final EnDecoder rep;
ResponseDecoder(ExtensionRegistry extensionRegistry, boolean
useV2Protocol) {
- REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
- REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
- usingV2Protocol = useV2Protocol;
+ rep = useV2Protocol
+ ? new ResponseEnDeCoderPreV3(extensionRegistry) : new
ResponseEnDecoderV3(extensionRegistry);
}
@Override
- protected void decode(ChannelHandlerContext ctx, Object msg,
List<Object> out) throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received response {} from channel {} to decode.",
msg, ctx.channel());
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received response {} from channel {} to decode.",
msg, ctx.channel());
}
- if (!(msg instanceof ByteBuf)) {
- out.add(msg);
- }
- ByteBuf buffer = (ByteBuf) msg;
- buffer.markReaderIndex();
-
- if (!usingV2Protocol) {
- out.add(REP_V3.decode(buffer));
- } else {
- // If in the same connection we already got preV3 messages,
don't try again to decode V3 messages
- out.add(REP_PREV3.decode(buffer));
+ try {
+ if (!(msg instanceof ByteBuf)) {
+ LOG.error("Received invalid response {} from channel {} to
decode.", msg, ctx.channel());
+ ctx.fireChannelRead(msg);
+ return;
+ }
+ ByteBuf buffer = (ByteBuf) msg;
+ buffer.markReaderIndex();
+ ctx.fireChannelRead(rep.decode(buffer));
+ } finally {
+ ReferenceCountUtil.release(msg);
}
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
index f182371..6f9cc9a 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -21,7 +21,9 @@ package org.apache.bookkeeper.proto;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
@@ -71,16 +73,19 @@ public class BookieProtoEncodingTest {
.build();
List<Object> outList = Lists.newArrayList();
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.fireChannelRead(any())).thenAnswer((iom) -> {
+ outList.add(iom.getArgument(0));
+ return null;
+ });
ResponseEnDeCoderPreV3 v2Encoder = new ResponseEnDeCoderPreV3(null);
ResponseEnDecoderV3 v3Encoder = new ResponseEnDecoderV3(null);
ResponseDecoder v3Decoder = new ResponseDecoder(null, false);
try {
- v3Decoder.decode(
- mock(ChannelHandlerContext.class),
- v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT),
- outList
+ v3Decoder.channelRead(ctx,
+ v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT)
);
fail("V3 response decoder should fail on decoding v2 response");
} catch (InvalidProtocolBufferException e) {
@@ -88,10 +93,9 @@ public class BookieProtoEncodingTest {
}
assertEquals(0, outList.size());
- v3Decoder.decode(
- mock(ChannelHandlerContext.class),
- v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT),
- outList);
+ v3Decoder.channelRead(
+ ctx,
+ v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT));
assertEquals(1, outList.size());
}
--
To stop receiving notification emails like this one, please contact
[email protected].