Repository: kafka Updated Branches: refs/heads/trunk 24a4e6146 -> 27107ee34
MINOR: JoinGroupRequest V0 invalid rebalance timeout A JoinGroupRequest V0 built with the Builder had a rebalance timeout = -1 rather than equal to session timeout as it would have been if coming from the wire and deserialized from a V0 Struct fix developed with mimaison Author: Edoardo Comar <eco...@uk.ibm.com> Reviewers: Rajini Sivaram Closes #2936 from edoardocomar/MINOR-JoinGroupRequestV0 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27107ee3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27107ee3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27107ee3 Branch: refs/heads/trunk Commit: 27107ee34d1df89035eb9b9b4e11036fca6cf723 Parents: 24a4e61 Author: Edoardo Comar <eco...@uk.ibm.com> Authored: Thu May 11 16:17:34 2017 -0400 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Thu May 11 16:17:34 2017 -0400 ---------------------------------------------------------------------- .../apache/kafka/common/requests/JoinGroupRequest.java | 3 ++- .../kafka/common/requests/RequestResponseTest.java | 11 +++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/27107ee3/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 1080fe7..ff07d13 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -90,7 +90,8 @@ public class JoinGroupRequest extends AbstractRequest { @Override public JoinGroupRequest build(short version) { if (version < 1) { - rebalanceTimeout = -1; + // v0 had no rebalance timeout but used session timeout implicitly + rebalanceTimeout = sessionTimeout; } return new JoinGroupRequest(version, groupId, sessionTimeout, rebalanceTimeout, memberId, protocolType, groupProtocols); http://git-wip-us.apache.org/repos/asf/kafka/blob/27107ee3/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 6443e4d..b1e83bf 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -511,7 +511,15 @@ public class RequestResponseTest { deserialized = (FetchRequest) deserialize(request, struct, request.version()); assertEquals(request.isolationLevel(), deserialized.isolationLevel()); } - + + @Test + public void testJoinGroupRequestVersion0RebalanceTimeout() throws Exception { + final short version = 0; + JoinGroupRequest jgr = createJoinGroupRequest(version); + JoinGroupRequest jgr2 = new JoinGroupRequest(jgr.toStruct(), version); + assertEquals(jgr2.rebalanceTimeout(), jgr.rebalanceTimeout()); + } + private RequestHeader createRequestHeader() { return new RequestHeader((short) 10, (short) 1, "", 10); } @@ -565,7 +573,6 @@ public class RequestResponseTest { return new HeartbeatResponse(Errors.NONE); } - @SuppressWarnings("deprecation") private JoinGroupRequest createJoinGroupRequest(int version) { ByteBuffer metadata = ByteBuffer.wrap(new byte[] {}); List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();