Repository: bookkeeper Updated Branches: refs/heads/master 0f81461d2 -> e44c73883
BOOKKEEPER-1069: If client uses V2 proto, set the connection to always decode V2 messages Avoid handling parsing exception for each request and instead adapt to what the client is sending. Author: Matteo Merli <[email protected]> Reviewers: Enrico Olivelli <None>, Sijie Guo <None> Closes #157 from merlimat/always-v2 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e44c7388 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e44c7388 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e44c7388 Branch: refs/heads/master Commit: e44c7388399e5589cf44e38c58bb84c74da544af Parents: 0f81461 Author: Matteo Merli <[email protected]> Authored: Tue May 16 13:20:56 2017 +0200 Committer: eolivelli <[email protected]> Committed: Tue May 16 13:20:56 2017 +0200 ---------------------------------------------------------------------- .../bookkeeper/proto/BookieProtoEncoding.java | 27 ++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e44c7388/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index b1f86ae..0fed3e5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -342,10 +342,12 @@ public class BookieProtoEncoding { public static class RequestDecoder extends MessageToMessageDecoder<Object> { final EnDecoder REQ_PREV3; final EnDecoder REQ_V3; + boolean usingV3Protocol; RequestDecoder(ExtensionRegistry extensionRegistry) { REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry); REQ_V3 = new RequestEnDecoderV3(extensionRegistry); + usingV3Protocol = true; } @Override @@ -358,17 +360,18 @@ public class BookieProtoEncoding { return; } ByteBuf buffer = (ByteBuf) msg; - try { - buffer.markReaderIndex(); + buffer.markReaderIndex(); + + if (usingV3Protocol) { try { out.add(REQ_V3.decode(buffer)); } catch (InvalidProtocolBufferException e) { + usingV3Protocol = false; buffer.resetReaderIndex(); out.add(REQ_PREV3.decode(buffer)); } - } catch (Exception e) { - LOG.error("Failed to decode a request from {} : ", ctx.channel(), e); - ctx.close(); + } else { + out.add(REQ_PREV3.decode(buffer)); } } } @@ -404,10 +407,12 @@ public class BookieProtoEncoding { public static class ResponseDecoder extends MessageToMessageDecoder<Object> { final EnDecoder REP_PREV3; final EnDecoder REP_V3; + boolean usingV3Protocol; ResponseDecoder(ExtensionRegistry extensionRegistry) { REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry); REP_V3 = new ResponseEnDecoderV3(extensionRegistry); + usingV3Protocol = true; } @Override @@ -419,17 +424,19 @@ public class BookieProtoEncoding { out.add(msg); } ByteBuf buffer = (ByteBuf) msg; - try { - buffer.markReaderIndex(); + buffer.markReaderIndex(); + + if (usingV3Protocol) { try { out.add(REP_V3.decode(buffer)); } catch (InvalidProtocolBufferException e) { + usingV3Protocol = false; buffer.resetReaderIndex(); out.add(REP_PREV3.decode(buffer)); } - } catch (Exception e) { - LOG.error("Failed to decode a response from channel {} : ", ctx.channel(), e); - ctx.close(); + } else { + // If in the same connection we already got preV3 messages, don't try again to decode V3 messages + out.add(REP_PREV3.decode(buffer)); } } }
