Repository: bookkeeper
Updated Branches:
  refs/heads/master 0f81461d2 -> e44c73883


BOOKKEEPER-1069: If client uses V2 proto, set the connection to always decode 
V2 messages

Avoid handling parsing exception for each request and instead adapt to what the 
client is sending.

Author: Matteo Merli <[email protected]>

Reviewers: Enrico Olivelli <None>, Sijie Guo <None>

Closes #157 from merlimat/always-v2


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e44c7388
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e44c7388
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e44c7388

Branch: refs/heads/master
Commit: e44c7388399e5589cf44e38c58bb84c74da544af
Parents: 0f81461
Author: Matteo Merli <[email protected]>
Authored: Tue May 16 13:20:56 2017 +0200
Committer: eolivelli <[email protected]>
Committed: Tue May 16 13:20:56 2017 +0200

----------------------------------------------------------------------
 .../bookkeeper/proto/BookieProtoEncoding.java   | 27 ++++++++++++--------
 1 file changed, 17 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e44c7388/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index b1f86ae..0fed3e5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -342,10 +342,12 @@ public class BookieProtoEncoding {
     public static class RequestDecoder extends MessageToMessageDecoder<Object> 
{
         final EnDecoder REQ_PREV3;
         final EnDecoder REQ_V3;
+        boolean usingV3Protocol;
 
         RequestDecoder(ExtensionRegistry extensionRegistry) {
             REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
             REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+            usingV3Protocol = true;
         }
 
         @Override
@@ -358,17 +360,18 @@ public class BookieProtoEncoding {
                 return;
             }
             ByteBuf buffer = (ByteBuf) msg;
-            try {
-                buffer.markReaderIndex();
+            buffer.markReaderIndex();
+
+            if (usingV3Protocol) {
                 try {
                     out.add(REQ_V3.decode(buffer));
                 } catch (InvalidProtocolBufferException e) {
+                    usingV3Protocol = false;
                     buffer.resetReaderIndex();
                     out.add(REQ_PREV3.decode(buffer));
                 }
-            } catch (Exception e) {
-                LOG.error("Failed to decode a request from {} : ", 
ctx.channel(), e);
-                ctx.close();
+            } else {
+                out.add(REQ_PREV3.decode(buffer));
             }
         }
     }
@@ -404,10 +407,12 @@ public class BookieProtoEncoding {
     public static class ResponseDecoder extends 
MessageToMessageDecoder<Object> {
         final EnDecoder REP_PREV3;
         final EnDecoder REP_V3;
+        boolean usingV3Protocol;
 
         ResponseDecoder(ExtensionRegistry extensionRegistry) {
             REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
             REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+            usingV3Protocol = true;
         }
 
         @Override
@@ -419,17 +424,19 @@ public class BookieProtoEncoding {
                 out.add(msg);
             }
             ByteBuf buffer = (ByteBuf) msg;
-            try {
-                buffer.markReaderIndex();
+            buffer.markReaderIndex();
+
+            if (usingV3Protocol) {
                 try {
                     out.add(REP_V3.decode(buffer));
                 } catch (InvalidProtocolBufferException e) {
+                    usingV3Protocol = false;
                     buffer.resetReaderIndex();
                     out.add(REP_PREV3.decode(buffer));
                 }
-            } catch (Exception e) {
-                LOG.error("Failed to decode a response from channel {} : ", 
ctx.channel(), e);
-                ctx.close();
+            } else {
+                // If in the same connection we already got preV3 messages, 
don't try again to decode V3 messages
+                out.add(REP_PREV3.decode(buffer));
             }
         }
     }

Reply via email to