This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 74c90f4 KAFKA-8221; Add batch leave group request (#6714)
74c90f4 is described below
commit 74c90f46c34727be9484e9826ff543b451ada775
Author: Boyang Chen <[email protected]>
AuthorDate: Fri Jul 26 23:13:37 2019 -0700
KAFKA-8221; Add batch leave group request (#6714)
This patch is part of KIP-345. We are aiming to support batch leave group
request issued from admin client. This diff is the first effort to bump leave
group request version.
Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../consumer/internals/AbstractCoordinator.java | 26 +-
.../kafka/common/requests/LeaveGroupRequest.java | 48 +++-
.../kafka/common/requests/LeaveGroupResponse.java | 86 +++++-
.../common/message/LeaveGroupRequest.json | 15 +-
.../common/message/LeaveGroupResponse.json | 15 +-
.../internals/AbstractCoordinatorTest.java | 280 ++++++++++++--------
.../internals/ConsumerCoordinatorTest.java | 294 +++++++++------------
.../apache/kafka/common/message/MessageTest.java | 44 ++-
.../common/requests/LeaveGroupRequestTest.java | 88 +++++-
.../common/requests/LeaveGroupResponseTest.java | 86 +++++-
.../kafka/common/requests/RequestResponseTest.java | 37 +--
.../kafka/coordinator/group/GroupCoordinator.scala | 105 +++++---
core/src/main/scala/kafka/server/KafkaApis.scala | 53 ++--
.../kafka/api/AuthorizerIntegrationTest.scala | 9 +-
.../group/GroupCoordinatorConcurrencyTest.scala | 24 +-
.../coordinator/group/GroupCoordinatorTest.scala | 238 ++++++++++++++---
.../scala/unit/kafka/server/KafkaApisTest.scala | 58 ++++
.../scala/unit/kafka/server/RequestQuotaTest.scala | 9 +-
.../streams/processor/internals/AssignedTasks.java | 1 -
20 files changed, 1065 insertions(+), 453 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 03321f1..908e114 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -62,7 +62,7 @@
<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
-
files="(Sender|Fetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/>
+
files="(Sender|Fetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message)Test.java"/>
<suppress checks="ClassFanOutComplexity"
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient)Test.java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index b92a4a6..fc385f6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -35,7 +35,8 @@ import
org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
-import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -843,7 +844,8 @@ public abstract class AbstractCoordinator implements
Closeable {
* Leave the current group and reset local generation/memberId.
* @param leaveReason reason to attempt leaving the group
*/
- public synchronized void maybeLeaveGroup(String leaveReason) {
+ public synchronized RequestFuture<Void> maybeLeaveGroup(String
leaveReason) {
+ RequestFuture<Void> future = null;
// Starting from 2.3, only dynamic members will send LeaveGroupRequest
to the broker,
// consumer with valid group.instance.id is viewed as static member
that never sends LeaveGroup,
// and the membership expiration is only controlled by session timeout.
@@ -853,14 +855,18 @@ public abstract class AbstractCoordinator implements
Closeable {
// attempt any resending if the request fails or times out.
log.info("Member {} sending LeaveGroup request to coordinator {}
due to {}",
generation.memberId, coordinator, leaveReason);
- LeaveGroupRequest.Builder request = new
LeaveGroupRequest.Builder(new LeaveGroupRequestData()
-
.setGroupId(rebalanceConfig.groupId).setMemberId(generation.memberId));
- client.send(coordinator, request)
+ LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
+ rebalanceConfig.groupId,
+ Collections.singletonList(new MemberIdentity()
+
.setMemberId(generation.memberId))
+ );
+ future = client.send(coordinator, request)
.compose(new LeaveGroupResponseHandler());
client.pollNoWakeup();
}
resetGeneration();
+ return future;
}
protected boolean isDynamicMember() {
@@ -870,12 +876,18 @@ public abstract class AbstractCoordinator implements
Closeable {
private class LeaveGroupResponseHandler extends
CoordinatorResponseHandler<LeaveGroupResponse, Void> {
@Override
public void handle(LeaveGroupResponse leaveResponse,
RequestFuture<Void> future) {
- Errors error = leaveResponse.error();
+ final List<MemberResponse> members =
leaveResponse.memberResponses();
+ if (members.size() > 1) {
+ future.raise(new IllegalStateException("The expected leave
group response " +
+ "should only
contain no more than one member info, however get " + members));
+ }
+
+ final Errors error = leaveResponse.error();
if (error == Errors.NONE) {
log.debug("LeaveGroup request returned successfully");
future.complete(null);
} else {
- log.debug("LeaveGroup request failed with error: {}",
error.message());
+ log.error("LeaveGroup request failed with error: {}",
error.message());
future.raise(error);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index e6e239c..ac77379 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -16,33 +16,64 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
public class LeaveGroupRequest extends AbstractRequest {
public static class Builder extends
AbstractRequest.Builder<LeaveGroupRequest> {
+ private final String groupId;
+ private final List<MemberIdentity> members;
- private final LeaveGroupRequestData data;
-
- public Builder(LeaveGroupRequestData data) {
+ public Builder(String groupId, List<MemberIdentity> members) {
super(ApiKeys.LEAVE_GROUP);
- this.data = data;
+ this.groupId = groupId;
+ this.members = members;
+ if (members.isEmpty()) {
+ throw new IllegalArgumentException("leaving members should not
be empty");
+ }
}
+ /**
+ * Based on the request version to choose fields.
+ */
@Override
public LeaveGroupRequest build(short version) {
+ final LeaveGroupRequestData data;
+ // Starting from version 3, all the leave group request will be in
batch.
+ if (version >= 3) {
+ data = new LeaveGroupRequestData()
+ .setGroupId(groupId)
+ .setMembers(members);
+ } else {
+ if (members.size() != 1) {
+ throw new UnsupportedVersionException("Version " + version
+ " leave group request only " +
+ "supports single
member instance than " + members.size() + " members");
+ }
+
+ data = new LeaveGroupRequestData()
+ .setGroupId(groupId)
+ .setMemberId(members.get(0).memberId());
+ }
return new LeaveGroupRequest(data, version);
}
@Override
public String toString() {
- return data.toString();
+ return "(type=LeaveGroupRequest" +
+ ", groupId=" + groupId +
+ ", members=" +
MessageUtil.deepToString(members.iterator()) +
+ ")";
}
}
private final LeaveGroupRequestData data;
@@ -64,6 +95,13 @@ public class LeaveGroupRequest extends AbstractRequest {
return data;
}
+ public List<MemberIdentity> members() {
+ // Before version 3, leave group request is still in single mode
+ return version <= 2 ? Collections.singletonList(
+ new MemberIdentity()
+ .setMemberId(data.memberId())) : data.members();
+ }
+
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
LeaveGroupResponseData responseData = new LeaveGroupResponseData()
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 6390c0f..97a583f 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -17,47 +17,117 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+/**
+ * Possible error codes.
+ *
+ * Top level errors:
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ *
+ * Member level errors:
+ * - {@link Errors#FENCED_INSTANCE_ID}
+ * - {@link Errors#UNKNOWN_MEMBER_ID}
+ *
+ * If the top level error code is set, normally this indicates that broker
early stops the request
+ * handling due to some severe global error, so it is expected to see the
member level errors to be empty.
+ * For older version response, we may populate member level error towards top
level because older client
+ * couldn't parse member level.
+ */
public class LeaveGroupResponse extends AbstractResponse {
- private final LeaveGroupResponseData data;
+ public final LeaveGroupResponseData data;
public LeaveGroupResponse(LeaveGroupResponseData data) {
this.data = data;
}
+ public LeaveGroupResponse(List<MemberResponse> memberResponses,
+ Errors topLevelError,
+ final int throttleTimeMs,
+ final short version) {
+ if (version <= 2) {
+ // Populate member level error.
+ final short errorCode = getError(topLevelError,
memberResponses).code();
+
+ this.data = new LeaveGroupResponseData()
+ .setErrorCode(errorCode);
+ } else {
+ this.data = new LeaveGroupResponseData()
+ .setErrorCode(topLevelError.code())
+ .setMembers(memberResponses);
+ }
+
+ if (version >= 1) {
+ this.data.setThrottleTimeMs(throttleTimeMs);
+ }
+ }
+
public LeaveGroupResponse(Struct struct) {
short latestVersion = (short) (LeaveGroupResponseData.SCHEMAS.length -
1);
this.data = new LeaveGroupResponseData(struct, latestVersion);
}
+
public LeaveGroupResponse(Struct struct, short version) {
this.data = new LeaveGroupResponseData(struct, version);
}
- public LeaveGroupResponseData data() {
- return data;
- }
-
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
+ public List<MemberResponse> memberResponses() {
+ return data.members();
+ }
+
public Errors error() {
- return Errors.forCode(data.errorCode());
+ return getError(Errors.forCode(data.errorCode()), data.members());
+ }
+
+ private static Errors getError(Errors topLevelError, List<MemberResponse>
memberResponses) {
+ if (topLevelError != Errors.NONE) {
+ return topLevelError;
+ } else {
+ for (MemberResponse memberResponse : memberResponses) {
+ Errors memberError =
Errors.forCode(memberResponse.errorCode());
+ if (memberError != Errors.NONE) {
+ return memberError;
+ }
+ }
+ return Errors.NONE;
+ }
}
@Override
public Map<Errors, Integer> errorCounts() {
- return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+ Map<Errors, Integer> combinedErrorCounts = new HashMap<>();
+ // Top level error.
+ Errors topLevelError = Errors.forCode(data.errorCode());
+ if (topLevelError != Errors.NONE) {
+ updateErrorCounts(combinedErrorCounts, topLevelError);
+ }
+
+ // Member level error.
+ for (MemberResponse memberResponse : data.members()) {
+ Errors memberError = Errors.forCode(memberResponse.errorCode());
+ if (memberError != Errors.NONE) {
+ updateErrorCounts(combinedErrorCounts, memberError);
+ }
+ }
+ return combinedErrorCounts;
}
@Override
diff --git a/clients/src/main/resources/common/message/LeaveGroupRequest.json
b/clients/src/main/resources/common/message/LeaveGroupRequest.json
index 7c536da..9f98f06 100644
--- a/clients/src/main/resources/common/message/LeaveGroupRequest.json
+++ b/clients/src/main/resources/common/message/LeaveGroupRequest.json
@@ -18,11 +18,20 @@
"type": "request",
"name": "LeaveGroupRequest",
// Version 1 and 2 are the same as version 0.
- "validVersions": "0-2",
+ // Version 3 defines batch processing scheme with group.instance.id +
member.id for identity
+ "validVersions": "0-3",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
"about": "The ID of the group to leave." },
- { "name": "MemberId", "type": "string", "versions": "0+",
- "about": "The member ID to remove from the group." }
+ { "name": "MemberId", "type": "string", "versions": "0-2",
+ "about": "The member ID to remove from the group." },
+ { "name": "Members", "type": "[]MemberIdentity", "versions": "3+",
+ "about": "List of leaving member identities.", "fields": [
+ { "name": "MemberId", "type": "string", "versions": "3+",
+ "about": "The member ID to remove from the group." },
+ { "name": "GroupInstanceId", "type": "string",
+ "versions": "3+", "nullableVersions": "3+", "default": "null",
+ "about": "The group instance ID to remove from the group." }
+ ]}
]
}
diff --git a/clients/src/main/resources/common/message/LeaveGroupResponse.json
b/clients/src/main/resources/common/message/LeaveGroupResponse.json
index 0d887cd..2bbf63d 100644
--- a/clients/src/main/resources/common/message/LeaveGroupResponse.json
+++ b/clients/src/main/resources/common/message/LeaveGroupResponse.json
@@ -19,11 +19,22 @@
"name": "LeaveGroupResponse",
// Version 1 adds the throttle time.
// Starting in version 2, on quota violation, brokers send out responses
before throttling.
- "validVersions": "0-2",
+ // Starting in version 3, we will make leave group request into batch mode.
+ "validVersions": "0-3",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+",
"ignorable": true,
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
- "about": "The error code, or 0 if there was no error." }
+ "about": "The error code, or 0 if there was no error." },
+
+ { "name": "Members", "type": "[]MemberResponse", "versions": "3+",
+ "about": "List of leaving member responses.", "fields": [
+ { "name": "MemberId", "type": "string", "versions": "3+",
+ "about": "The member ID to remove from the group." },
+ { "name": "GroupInstanceId", "type": "string", "versions": "3+",
"nullableVersions": "3+",
+ "about": "The group instance ID to remove from the group." },
+ { "name": "ErrorCode", "type": "int16", "versions": "3+",
+ "about": "The error code, or 0 if there was no error." }
+ ]}
]
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 659ef5f..e0264b3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -18,25 +18,32 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
@@ -48,6 +55,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -63,6 +71,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.emptyMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
@@ -75,7 +84,6 @@ public class AbstractCoordinatorTest {
private static final int SESSION_TIMEOUT_MS = 10000;
private static final int HEARTBEAT_INTERVAL_MS = 3000;
private static final int RETRY_BACKOFF_MS = 100;
- private static final int LONG_RETRY_BACKOFF_MS = 10000;
private static final int REQUEST_TIMEOUT_MS = 40000;
private static final String GROUP_ID = "dummy-group";
private static final String METRIC_GROUP_PREFIX = "consumer";
@@ -87,6 +95,10 @@ public class AbstractCoordinatorTest {
private ConsumerNetworkClient consumerClient;
private DummyCoordinator coordinator;
+ private final String memberId = "memberId";
+ private final String leaderId = "leaderId";
+ private final int defaultGeneration = -1;
+
private void setupCoordinator() {
setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS,
Optional.empty());
@@ -164,7 +176,7 @@ public class AbstractCoordinatorTest {
assertFalse(firstAttempt.get());
assertTrue(consumerClient.hasPendingRequests(coordinatorNode));
- mockClient.respond(joinGroupFollowerResponse(1, "memberId",
"leaderId", Errors.NONE));
+ mockClient.respond(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
Timer secondAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS);
@@ -183,10 +195,7 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
- final String memberId = "memberId";
- final int generation = -1;
-
- mockClient.prepareResponse(joinGroupFollowerResponse(generation,
memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED));
+
mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration,
memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED));
RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
assertTrue(consumerClient.poll(future,
mockTime.timer(REQUEST_TIMEOUT_MS)));
@@ -232,23 +241,14 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
- final String memberId = "memberId";
- final int generation = -1;
-
- mockClient.prepareResponse(joinGroupFollowerResponse(generation,
memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.MEMBER_ID_REQUIRED));
+
mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration,
memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.MEMBER_ID_REQUIRED));
- mockClient.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- if (!(body instanceof JoinGroupRequest)) {
- return false;
- }
- JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
- if (!joinGroupRequest.data().memberId().equals(memberId)) {
- return false;
- }
- return true;
+ mockClient.prepareResponse(body -> {
+ if (!(body instanceof JoinGroupRequest)) {
+ return false;
}
+ JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
+ return joinGroupRequest.data().memberId().equals(memberId);
}, joinGroupResponse(Errors.UNKNOWN_MEMBER_ID));
RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
@@ -256,7 +256,7 @@ public class AbstractCoordinatorTest {
assertEquals(Errors.MEMBER_ID_REQUIRED.message(),
future.exception().getMessage());
assertTrue(coordinator.rejoinNeededOrPending());
assertTrue(coordinator.hasValidMemberId());
- assertTrue(coordinator.hasMatchingGenerationId(generation));
+ assertTrue(coordinator.hasMatchingGenerationId(defaultGeneration));
future = coordinator.sendJoinGroupRequest();
assertTrue(consumerClient.poll(future,
mockTime.timer(REBALANCE_TIMEOUT_MS)));
}
@@ -267,10 +267,7 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
- final String memberId = "memberId";
- final int generation = -1;
-
- mockClient.prepareResponse(joinGroupFollowerResponse(generation,
memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.FENCED_INSTANCE_ID));
+
mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration,
memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.FENCED_INSTANCE_ID));
RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
assertTrue(consumerClient.poll(future,
mockTime.timer(REQUEST_TIMEOUT_MS)));
@@ -284,7 +281,6 @@ public class AbstractCoordinatorTest {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- final String memberId = "memberId";
final int generation = -1;
mockClient.prepareResponse(joinGroupFollowerResponse(generation,
memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.NONE));
@@ -298,7 +294,6 @@ public class AbstractCoordinatorTest {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- final String memberId = "memberId";
final int generation = -1;
mockClient.prepareResponse(joinGroupFollowerResponse(generation,
memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.NONE));
@@ -325,17 +320,14 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
- final String memberId = "memberId";
- final int generation = -1;
-
- mockClient.prepareResponse(joinGroupFollowerResponse(generation,
memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID));
+
mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration,
memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID));
RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
assertTrue(consumerClient.poll(future,
mockTime.timer(REQUEST_TIMEOUT_MS)));
assertEquals(Errors.UNKNOWN_MEMBER_ID.message(),
future.exception().getMessage());
assertTrue(coordinator.rejoinNeededOrPending());
- assertTrue(coordinator.hasMatchingGenerationId(generation));
+ assertTrue(coordinator.hasMatchingGenerationId(defaultGeneration));
}
@Test
@@ -348,19 +340,16 @@ public class AbstractCoordinatorTest {
setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, groupInstanceId);
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId",
"leaderId", Errors.NONE));
+ mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
final RuntimeException e = new RuntimeException();
// raise the error when the coordinator tries to send leave group
request.
- mockClient.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- if (body instanceof LeaveGroupRequest)
- throw e;
- return false;
- }
+ mockClient.prepareResponse(body -> {
+ if (body instanceof LeaveGroupRequest)
+ throw e;
+ return false;
}, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
try {
@@ -379,23 +368,105 @@ public class AbstractCoordinatorTest {
}
@Test
+ public void testHandleNormalLeaveGroupResponse() {
+ MemberResponse memberResponse = new MemberResponse()
+ .setMemberId(memberId)
+ .setErrorCode(Errors.NONE.code());
+ LeaveGroupResponse response =
+ leaveGroupResponse(Collections.singletonList(memberResponse));
+ RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response);
+ assertNotNull(leaveGroupFuture);
+ assertTrue(leaveGroupFuture.succeeded());
+ }
+
+ @Test
+ public void testHandleMultipleMembersLeaveGroupResponse() {
+ MemberResponse memberResponse = new MemberResponse()
+ .setMemberId(memberId)
+ .setErrorCode(Errors.NONE.code());
+ LeaveGroupResponse response =
+ leaveGroupResponse(Arrays.asList(memberResponse, memberResponse));
+ RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response);
+ assertNotNull(leaveGroupFuture);
+ assertTrue(leaveGroupFuture.exception() instanceof
IllegalStateException);
+ }
+
+ @Test
+ public void testHandleLeaveGroupResponseWithEmptyMemberResponse() {
+ LeaveGroupResponse response =
+ leaveGroupResponse(Collections.emptyList());
+ RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response);
+ assertNotNull(leaveGroupFuture);
+ assertTrue(leaveGroupFuture.succeeded());
+ }
+
+ @Test
+ public void testHandleLeaveGroupResponseWithException() {
+ MemberResponse memberResponse = new MemberResponse()
+ .setMemberId(memberId)
+
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code());
+ LeaveGroupResponse response =
+ leaveGroupResponse(Collections.singletonList(memberResponse));
+ RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response);
+ assertNotNull(leaveGroupFuture);
+ assertTrue(leaveGroupFuture.exception() instanceof
UnknownMemberIdException);
+ }
+
+ @Test
+ public void testHandleSingleLeaveGroupRequest() {
+ setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE,
Optional.empty());
+
mockClient.setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(
+ new ApiVersionsResponse.ApiVersion(ApiKeys.LEAVE_GROUP, (short) 2,
(short) 2))));
+
+ LeaveGroupResponse expectedResponse =
leaveGroupResponse(Collections.singletonList(
+ new MemberResponse()
+ .setErrorCode(Errors.NONE.code())
+ .setMemberId(memberId)));
+ mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
+ mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
+ mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+ mockClient.prepareResponse(body -> {
+ if (body instanceof LeaveGroupRequest) {
+ LeaveGroupRequest request = (LeaveGroupRequest) body;
+ return request.data().memberId().equals(memberId)
+ && request.data().members().isEmpty();
+ } else {
+ return false;
+ }
+ }, expectedResponse);
+
+ coordinator.ensureActiveGroup();
+ RequestFuture<Void> leaveGroupFuture =
coordinator.maybeLeaveGroup("test single leave group");
+ assertTrue(leaveGroupFuture.succeeded());
+ }
+
+ private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse
leaveGroupResponse) {
+ setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE,
Optional.empty());
+
+ mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
+ mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
+ mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+ mockClient.prepareResponse(leaveGroupResponse);
+
+ coordinator.ensureActiveGroup();
+ return coordinator.maybeLeaveGroup("test maybe leave group");
+ }
+
+ @Test
public void testUncaughtExceptionInHeartbeatThread() throws Exception {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId",
"leaderId", Errors.NONE));
+ mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
final RuntimeException e = new RuntimeException();
// raise the error when the background thread tries to send a heartbeat
- mockClient.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- if (body instanceof HeartbeatRequest)
- throw e;
- return false;
- }
+ mockClient.prepareResponse(body -> {
+ if (body instanceof HeartbeatRequest)
+ throw e;
+ return false;
}, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
try {
@@ -414,21 +485,19 @@ public class AbstractCoordinatorTest {
@Test
public void testPollHeartbeatAwakesHeartbeatThread() throws Exception {
- setupCoordinator(LONG_RETRY_BACKOFF_MS);
+ final int longRetryBackoffMs = 10000;
+ setupCoordinator(longRetryBackoffMs);
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId",
"leaderId", Errors.NONE));
+ mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
coordinator.ensureActiveGroup();
final CountDownLatch heartbeatDone = new CountDownLatch(1);
- mockClient.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- heartbeatDone.countDown();
- return body instanceof HeartbeatRequest;
- }
+ mockClient.prepareResponse(body -> {
+ heartbeatDone.countDown();
+ return body instanceof HeartbeatRequest;
}, heartbeatResponse(Errors.NONE));
mockTime.sleep(HEARTBEAT_INTERVAL_MS);
@@ -473,7 +542,7 @@ public class AbstractCoordinatorTest {
throw new WakeupException();
return isJoinGroupRequest;
}
- }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+ }, joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
@@ -511,7 +580,7 @@ public class AbstractCoordinatorTest {
throw new WakeupException();
return isJoinGroupRequest;
}
- }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+ }, joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
@@ -540,16 +609,13 @@ public class AbstractCoordinatorTest {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- mockClient.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
- if (isJoinGroupRequest)
- // wakeup after the request returns
- consumerClient.wakeup();
- return isJoinGroupRequest;
- }
- }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+ mockClient.prepareResponse(body -> {
+ boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
+ if (isJoinGroupRequest)
+ // wakeup after the request returns
+ consumerClient.wakeup();
+ return isJoinGroupRequest;
+ }, joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
@@ -576,16 +642,13 @@ public class AbstractCoordinatorTest {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- mockClient.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
- if (isJoinGroupRequest)
- // wakeup after the request returns
- consumerClient.wakeup();
- return isJoinGroupRequest;
- }
- }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+ mockClient.prepareResponse(body -> {
+ boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
+ if (isJoinGroupRequest)
+ // wakeup after the request returns
+ consumerClient.wakeup();
+ return isJoinGroupRequest;
+ }, joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
@@ -614,7 +677,7 @@ public class AbstractCoordinatorTest {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId",
"leaderId", Errors.NONE));
+ mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
mockClient.prepareResponse(new MockClient.RequestMatcher() {
private int invocations = 0;
@Override
@@ -652,7 +715,7 @@ public class AbstractCoordinatorTest {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId",
"leaderId", Errors.NONE));
+ mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
mockClient.prepareResponse(new MockClient.RequestMatcher() {
private int invocations = 0;
@Override
@@ -692,16 +755,13 @@ public class AbstractCoordinatorTest {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId",
"leaderId", Errors.NONE));
- mockClient.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
- if (isSyncGroupRequest)
- // wakeup after the request returns
- consumerClient.wakeup();
- return isSyncGroupRequest;
- }
+ mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
+ mockClient.prepareResponse(body -> {
+ boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
+ if (isSyncGroupRequest)
+ // wakeup after the request returns
+ consumerClient.wakeup();
+ return isSyncGroupRequest;
}, syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
@@ -728,16 +788,13 @@ public class AbstractCoordinatorTest {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId",
"leaderId", Errors.NONE));
- mockClient.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
- if (isSyncGroupRequest)
- // wakeup after the request returns
- consumerClient.wakeup();
- return isSyncGroupRequest;
- }
+ mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
+ mockClient.prepareResponse(body -> {
+ boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
+ if (isSyncGroupRequest)
+ // wakeup after the request returns
+ consumerClient.wakeup();
+ return isSyncGroupRequest;
}, syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
@@ -765,7 +822,7 @@ public class AbstractCoordinatorTest {
coordinator.wakeupOnJoinComplete = true;
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
- mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId",
"leaderId", Errors.NONE));
+ mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
@@ -806,14 +863,11 @@ public class AbstractCoordinatorTest {
private AtomicBoolean prepareFirstHeartbeat() {
final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
- mockClient.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- boolean isHeartbeatRequest = body instanceof HeartbeatRequest;
- if (isHeartbeatRequest)
- heartbeatReceived.set(true);
- return isHeartbeatRequest;
- }
+ mockClient.prepareResponse(body -> {
+ boolean isHeartbeatRequest = body instanceof HeartbeatRequest;
+ if (isHeartbeatRequest)
+ heartbeatReceived.set(true);
+ return isHeartbeatRequest;
}, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
return heartbeatReceived;
}
@@ -861,6 +915,12 @@ public class AbstractCoordinatorTest {
);
}
+ private LeaveGroupResponse leaveGroupResponse(List<MemberResponse>
members) {
+ return new LeaveGroupResponse(new LeaveGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setMembers(members));
+ }
+
public static class DummyCoordinator extends AbstractCoordinator {
private int onJoinPrepareInvokes = 0;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a81e73e..11273cb 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
@@ -51,7 +52,6 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -473,14 +473,11 @@ public class ConsumerCoordinatorTest {
partitionAssignor.prepare(singletonMap(consumerId, assigned));
client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
memberSubscriptions, Errors.NONE));
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.data.memberId().equals(consumerId) &&
- sync.data.generationId() == 1 &&
- sync.groupAssignments().containsKey(consumerId);
- }
+ client.prepareResponse(body -> {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId);
}, syncGroupResponse(assigned, Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -514,28 +511,22 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(
joinGroupLeaderResponse(
1, consumerId, singletonMap(consumerId, oldSubscription),
Errors.NONE));
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.data.memberId().equals(consumerId) &&
- sync.data.generationId() == 1 &&
- sync.groupAssignments().containsKey(consumerId);
- }
+ client.prepareResponse(body -> {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId);
}, syncGroupResponse(oldAssignment, Errors.NONE));
// Second correct assignment for subscription
client.prepareResponse(
joinGroupLeaderResponse(
1, consumerId, singletonMap(consumerId, newSubscription),
Errors.NONE));
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.data.memberId().equals(consumerId) &&
- sync.data.generationId() == 1 &&
- sync.groupAssignments().containsKey(consumerId);
- }
+ client.prepareResponse(body -> {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId);
}, syncGroupResponse(newAssignment, Errors.NONE));
// Poll once so that the join group future gets created and complete
@@ -587,14 +578,11 @@ public class ConsumerCoordinatorTest {
partitionAssignor.prepare(singletonMap(consumerId, assigned));
client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
memberSubscriptions, Errors.NONE));
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.data.memberId().equals(consumerId) &&
- sync.data.generationId() == 1 &&
- sync.groupAssignments().containsKey(consumerId);
- }
+ client.prepareResponse(body -> {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId);
}, syncGroupResponse(assigned, Errors.NONE));
// expect client to force updating the metadata, if yes gives it both
topics
client.prepareMetadataUpdate(metadataResponse);
@@ -634,15 +622,12 @@ public class ConsumerCoordinatorTest {
final List<String> updatedSubscription = Arrays.asList(topic1, topic2);
client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
initialSubscription, Errors.NONE));
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- final Map<String, Integer> updatedPartitions = new HashMap<>();
- for (String topic : updatedSubscription)
- updatedPartitions.put(topic, 1);
- client.updateMetadata(TestUtils.metadataUpdateWith(1,
updatedPartitions));
- return true;
- }
+ client.prepareResponse(body -> {
+ final Map<String, Integer> updatedPartitions = new HashMap<>();
+ for (String topic : updatedSubscription)
+ updatedPartitions.put(topic, 1);
+ client.updateMetadata(TestUtils.metadataUpdateWith(1,
updatedPartitions));
+ return true;
}, syncGroupResponse(oldAssigned, Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -661,20 +646,17 @@ public class ConsumerCoordinatorTest {
partitionAssignor.prepare(singletonMap(consumerId, newAssigned));
// we expect to see a second rebalance with the new-found topics
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- JoinGroupRequest join = (JoinGroupRequest) body;
- Iterator<JoinGroupRequestData.JoinGroupRequestProtocol>
protocolIterator =
- join.data().protocols().iterator();
- assertTrue(protocolIterator.hasNext());
- JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata
= protocolIterator.next();
-
- ByteBuffer metadata =
ByteBuffer.wrap(protocolMetadata.metadata());
- ConsumerPartitionAssignor.Subscription subscription =
ConsumerProtocol.deserializeSubscription(metadata);
- metadata.rewind();
- return subscription.topics().containsAll(updatedSubscription);
- }
+ client.prepareResponse(body -> {
+ JoinGroupRequest join = (JoinGroupRequest) body;
+ Iterator<JoinGroupRequestData.JoinGroupRequestProtocol>
protocolIterator =
+ join.data().protocols().iterator();
+ assertTrue(protocolIterator.hasNext());
+ JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata =
protocolIterator.next();
+
+ ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
+ ConsumerPartitionAssignor.Subscription subscription =
ConsumerProtocol.deserializeSubscription(metadata);
+ metadata.rewind();
+ return subscription.topics().containsAll(updatedSubscription);
}, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions,
Errors.NONE));
client.prepareResponse(syncGroupResponse(newAssigned, Errors.NONE));
@@ -707,14 +689,11 @@ public class ConsumerCoordinatorTest {
client.prepareMetadataUpdate(metadataResponse);
client.prepareResponse(joinGroupFollowerResponse(1, consumerId,
"leader", Errors.NONE));
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.data.memberId().equals(consumerId) &&
- sync.data.generationId() == 1 &&
- sync.groupAssignments().isEmpty();
- }
+ client.prepareResponse(body -> {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().isEmpty();
}, syncGroupResponse(singletonList(t1p), Errors.NONE));
partitionAssignor.prepare(singletonMap(consumerId,
singletonList(t1p)));
@@ -786,14 +765,11 @@ public class ConsumerCoordinatorTest {
// normal join group
client.prepareResponse(joinGroupFollowerResponse(1, consumerId,
"leader", Errors.NONE));
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.data.memberId().equals(consumerId) &&
- sync.data.generationId() == 1 &&
- sync.groupAssignments().isEmpty();
- }
+ client.prepareResponse(body -> {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().isEmpty();
}, syncGroupResponse(assigned, Errors.NONE));
coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
@@ -854,14 +830,11 @@ public class ConsumerCoordinatorTest {
// normal join group
client.prepareResponse(joinGroupFollowerResponse(1, consumerId,
"leader", Errors.NONE));
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.data.memberId().equals(consumerId) &&
- sync.data.generationId() == 1 &&
- sync.groupAssignments().isEmpty();
- }
+ client.prepareResponse(body -> {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().isEmpty();
}, syncGroupResponse(assigned, Errors.NONE));
// expect client to force updating the metadata, if yes gives it both
topics
client.prepareMetadataUpdate(metadataResponse);
@@ -885,15 +858,12 @@ public class ConsumerCoordinatorTest {
joinAsFollowerAndReceiveAssignment(consumerId, coordinator,
singletonList(t1p));
final AtomicBoolean received = new AtomicBoolean(false);
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- received.set(true);
- LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
- return leaveRequest.data().memberId().equals(consumerId) &&
- leaveRequest.data().groupId().equals(groupId);
- }
- }, new LeaveGroupResponse(new
LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
+ client.prepareResponse(body -> {
+ received.set(true);
+ LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
+ return validateLeaveGroup(groupId, consumerId, leaveRequest);
+ }, new LeaveGroupResponse(
+ new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
coordinator.close(time.timer(0));
assertTrue(received.get());
}
@@ -906,14 +876,10 @@ public class ConsumerCoordinatorTest {
joinAsFollowerAndReceiveAssignment(consumerId, coordinator,
singletonList(t1p));
final AtomicBoolean received = new AtomicBoolean(false);
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- received.set(true);
- LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
- return leaveRequest.data().memberId().equals(consumerId) &&
- leaveRequest.data().groupId().equals(groupId);
- }
+ client.prepareResponse(body -> {
+ received.set(true);
+ LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
+ return validateLeaveGroup(groupId, consumerId, leaveRequest);
}, new LeaveGroupResponse(new
LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
coordinator.maybeLeaveGroup("test maybe leave group");
assertTrue(received.get());
@@ -922,6 +888,15 @@ public class ConsumerCoordinatorTest {
assertNull(generation);
}
+ private boolean validateLeaveGroup(String groupId,
+ String consumerId,
+ LeaveGroupRequest leaveRequest) {
+ List<MemberIdentity> members = leaveRequest.data().members();
+ return leaveRequest.data().groupId().equals(groupId) &&
+ members.size() == 1 &&
+ members.get(0).memberId().equals(consumerId);
+ }
+
/**
* This test checks if a consumer that has a valid member ID but an
invalid generation
* ({@link
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.Generation#NO_GENERATION})
@@ -944,13 +919,10 @@ public class ConsumerCoordinatorTest {
coordinator.joinGroupIfNeeded(time.timer(0));
final AtomicBoolean received = new AtomicBoolean(false);
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- received.set(true);
- LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
- return leaveRequest.data().memberId().equals(consumerId);
- }
+ client.prepareResponse(body -> {
+ received.set(true);
+ LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
+ return validateLeaveGroup(groupId, consumerId, leaveRequest);
}, new LeaveGroupResponse(new
LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
coordinator.maybeLeaveGroup("pending member leaves");
@@ -986,12 +958,9 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(),
Errors.UNKNOWN_MEMBER_ID));
// now we should see a new join with the empty UNKNOWN_MEMBER_ID
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- JoinGroupRequest joinRequest = (JoinGroupRequest) body;
- return
joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
- }
+ client.prepareResponse(body -> {
+ JoinGroupRequest joinRequest = (JoinGroupRequest) body;
+ return
joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p),
Errors.NONE));
@@ -1038,12 +1007,9 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(),
Errors.ILLEGAL_GENERATION));
// then let the full join/sync finish successfully
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- JoinGroupRequest joinRequest = (JoinGroupRequest) body;
- return
joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
- }
+ client.prepareResponse(body -> {
+ JoinGroupRequest joinRequest = (JoinGroupRequest) body;
+ return
joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p),
Errors.NONE));
@@ -1106,22 +1072,19 @@ public class ConsumerCoordinatorTest {
partitionAssignor.prepare(singletonMap(consumerId,
Arrays.asList(tp1)));
client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
memberSubscriptions, Errors.NONE));
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- SyncGroupRequest sync = (SyncGroupRequest) body;
- if (sync.data.memberId().equals(consumerId) &&
- sync.data.generationId() == 1 &&
- sync.groupAssignments().containsKey(consumerId)) {
- // trigger the metadata update including both topics after
the sync group request has been sent
- Map<String, Integer> topicPartitionCounts = new
HashMap<>();
- topicPartitionCounts.put(topic1, 1);
- topicPartitionCounts.put(topic2, 1);
- client.updateMetadata(TestUtils.metadataUpdateWith(1,
topicPartitionCounts));
- return true;
- }
- return false;
+ client.prepareResponse(body -> {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
+ if (sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId)) {
+ // trigger the metadata update including both topics after the
sync group request has been sent
+ Map<String, Integer> topicPartitionCounts = new HashMap<>();
+ topicPartitionCounts.put(topic1, 1);
+ topicPartitionCounts.put(topic2, 1);
+ client.updateMetadata(TestUtils.metadataUpdateWith(1,
topicPartitionCounts));
+ return true;
}
+ return false;
}, syncGroupResponse(Collections.singletonList(tp1), Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -1581,13 +1544,10 @@ public class ConsumerCoordinatorTest {
subscriptions.assignFromUser(singleton(t1p));
// the client should not reuse generation/memberId from
auto-subscribed generation
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
- return
commitRequest.data().memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
- commitRequest.data().generationId() ==
OffsetCommitRequest.DEFAULT_GENERATION_ID;
- }
+ client.prepareResponse(body -> {
+ OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
+ return
commitRequest.data().memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
+ commitRequest.data().generationId() ==
OffsetCommitRequest.DEFAULT_GENERATION_ID;
}, offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
AtomicBoolean success = new AtomicBoolean(false);
@@ -2302,21 +2262,15 @@ public class ConsumerCoordinatorTest {
private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean
shouldLeaveGroup) throws Exception {
final AtomicBoolean commitRequested = new AtomicBoolean();
final AtomicBoolean leaveGroupRequested = new AtomicBoolean();
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- commitRequested.set(true);
- OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
- return commitRequest.data().groupId().equals(groupId);
- }
+ client.prepareResponse(body -> {
+ commitRequested.set(true);
+ OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
+ return commitRequest.data().groupId().equals(groupId);
}, new OffsetCommitResponse(new OffsetCommitResponseData()));
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- leaveGroupRequested.set(true);
- LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
- return leaveRequest.data().groupId().equals(groupId);
- }
+ client.prepareResponse(body -> {
+ leaveGroupRequested.set(true);
+ LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
+ return leaveRequest.data().groupId().equals(groupId);
}, new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())));
@@ -2494,37 +2448,31 @@ public class ConsumerCoordinatorTest {
}
private MockClient.RequestMatcher offsetCommitRequestMatcher(final
Map<TopicPartition, Long> expectedOffsets) {
- return new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- OffsetCommitRequest req = (OffsetCommitRequest) body;
- Map<TopicPartition, Long> offsets = req.offsets();
- if (offsets.size() != expectedOffsets.size())
- return false;
+ return body -> {
+ OffsetCommitRequest req = (OffsetCommitRequest) body;
+ Map<TopicPartition, Long> offsets = req.offsets();
+ if (offsets.size() != expectedOffsets.size())
+ return false;
- for (Map.Entry<TopicPartition, Long> expectedOffset :
expectedOffsets.entrySet()) {
- if (!offsets.containsKey(expectedOffset.getKey())) {
+ for (Map.Entry<TopicPartition, Long> expectedOffset :
expectedOffsets.entrySet()) {
+ if (!offsets.containsKey(expectedOffset.getKey())) {
+ return false;
+ } else {
+ Long actualOffset = offsets.get(expectedOffset.getKey());
+ if (!actualOffset.equals(expectedOffset.getValue())) {
return false;
- } else {
- Long actualOffset =
offsets.get(expectedOffset.getKey());
- if (!actualOffset.equals(expectedOffset.getValue())) {
- return false;
- }
}
}
- return true;
}
+ return true;
};
}
private OffsetCommitCallback callback(final Map<TopicPartition,
OffsetAndMetadata> expectedOffsets,
final AtomicBoolean success) {
- return new OffsetCommitCallback() {
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata>
offsets, Exception exception) {
- if (expectedOffsets.equals(offsets) && exception == null)
- success.set(true);
- }
+ return (offsets, exception) -> {
+ if (expectedOffsets.equals(offsets) && exception == null)
+ success.set(true);
};
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 5213ec2..58c650a 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -21,8 +21,10 @@ import
org.apache.kafka.common.errors.UnsupportedVersionException;
import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
import
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.BoundField;
@@ -48,6 +50,10 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public final class MessageTest {
+
+ private final String memberId = "memberId";
+ private final String instanceId = "instanceId";
+
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@@ -115,7 +121,7 @@ public final class MessageTest {
public void testHeartbeatVersions() throws Exception {
Supplier<HeartbeatRequestData> newRequest = () -> new
HeartbeatRequestData()
.setGroupId("groupId")
- .setMemberId("memberId")
+ .setMemberId(memberId)
.setGenerationId(15);
testAllMessageRoundTrips(newRequest.get());
testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
@@ -126,7 +132,7 @@ public final class MessageTest {
public void testJoinGroupRequestVersions() throws Exception {
Supplier<JoinGroupRequestData> newRequest = () -> new
JoinGroupRequestData()
.setGroupId("groupId")
- .setMemberId("memberId")
+ .setMemberId(memberId)
.setProtocolType("consumer")
.setProtocols(new
JoinGroupRequestData.JoinGroupRequestProtocolCollection())
.setSessionTimeoutMs(10000);
@@ -138,7 +144,6 @@ public final class MessageTest {
@Test
public void testJoinGroupResponseVersions() throws Exception {
- String memberId = "memberId";
Supplier<JoinGroupResponseData> newResponse = () -> new
JoinGroupResponseData()
.setMemberId(memberId)
.setLeader(memberId)
@@ -154,15 +159,30 @@ public final class MessageTest {
}
@Test
+ public void testLeaveGroupResponseVersions() throws Exception {
+ Supplier<LeaveGroupResponseData> newResponse = () -> new
LeaveGroupResponseData()
+
.setErrorCode(Errors.NOT_COORDINATOR.code());
+
+ testAllMessageRoundTrips(newResponse.get());
+ testAllMessageRoundTripsFromVersion((short) 1,
newResponse.get().setThrottleTimeMs(1000));
+
+ testAllMessageRoundTripsFromVersion((short) 3,
newResponse.get().setMembers(
+ Collections.singletonList(new MemberResponse()
+ .setMemberId(memberId)
+ .setGroupInstanceId(instanceId))
+ ));
+ }
+
+ @Test
public void testSyncGroupDefaultGroupInstanceId() throws Exception {
Supplier<SyncGroupRequestData> request = () -> new
SyncGroupRequestData()
.setGroupId("groupId")
- .setMemberId("memberId")
+ .setMemberId(memberId)
.setGenerationId(15)
.setAssignments(new ArrayList<>());
testAllMessageRoundTrips(request.get());
testAllMessageRoundTrips(request.get().setGroupInstanceId(null));
- testAllMessageRoundTripsFromVersion((short) 3,
request.get().setGroupInstanceId("instanceId"));
+ testAllMessageRoundTripsFromVersion((short) 3,
request.get().setGroupInstanceId(instanceId));
}
@Test
@@ -173,12 +193,12 @@ public final class MessageTest {
Supplier<OffsetCommitRequestData> request = () -> new
OffsetCommitRequestData()
.setGroupId("groupId")
- .setMemberId("memberId")
+ .setMemberId(memberId)
.setTopics(new ArrayList<>())
.setGenerationId(15);
testAllMessageRoundTripsFromVersion((short) 1, request.get());
testAllMessageRoundTripsFromVersion((short) 1,
request.get().setGroupInstanceId(null));
- testAllMessageRoundTripsFromVersion((short) 7,
request.get().setGroupInstanceId("instanceId"));
+ testAllMessageRoundTripsFromVersion((short) 7,
request.get().setGroupInstanceId(instanceId));
}
@Test
@@ -317,7 +337,7 @@ public final class MessageTest {
* Test that the JSON response files match the schemas accessible through
the ApiKey class.
*/
@Test
- public void testResponseSchemas() throws Exception {
+ public void testResponseSchemas() {
for (ApiKeys apiKey : ApiKeys.values()) {
Schema[] manualSchemas = apiKey.responseSchemas;
Schema[] generatedSchemas =
ApiMessageType.fromApiKey(apiKey.id).responseSchemas();
@@ -439,17 +459,17 @@ public final class MessageTest {
verifySizeRaisesUve((short) 0, "groupInstanceId", new
HeartbeatRequestData()
.setGroupId("groupId")
.setGenerationId(15)
- .setMemberId("memberId")
- .setGroupInstanceId("instanceId"));
+ .setMemberId(memberId)
+ .setGroupInstanceId(instanceId));
verifySizeSucceeds((short) 0, new HeartbeatRequestData()
.setGroupId("groupId")
.setGenerationId(15)
- .setMemberId("memberId")
+ .setMemberId(memberId)
.setGroupInstanceId(null));
verifySizeSucceeds((short) 0, new HeartbeatRequestData()
.setGroupId("groupId")
.setGenerationId(15)
- .setMemberId("memberId"));
+ .setMemberId(memberId));
}
private void verifySizeRaisesUve(short version, String problemFieldName,
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java
index 9fa9d3b..2ff928b 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java
@@ -16,34 +16,97 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.junit.Before;
import org.junit.Test;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class LeaveGroupRequestTest {
+ private final String groupId = "group_id";
+ private final String memberIdOne = "member_1";
+ private final String instanceIdOne = "instance_1";
+ private final String memberIdTwo = "member_2";
+ private final String instanceIdTwo = "instance_2";
+
+ private final int throttleTimeMs = 10;
+
+ private LeaveGroupRequest.Builder builder;
+ private List<MemberIdentity> members;
+
+ @Before
+ public void setUp() {
+ members = Arrays.asList(new MemberIdentity()
+ .setMemberId(memberIdOne)
+ .setGroupInstanceId(instanceIdOne),
+ new MemberIdentity()
+ .setMemberId(memberIdTwo)
+ .setGroupInstanceId(instanceIdTwo));
+ builder = new LeaveGroupRequest.Builder(
+ groupId,
+ members
+ );
+ }
+
@Test
- public void testLeaveConstructor() {
- final String groupId = "group_id";
- final String memberId = "member_id";
- final int throttleTimeMs = 10;
+ public void testMultiLeaveConstructor() {
+ final LeaveGroupRequestData expectedData = new LeaveGroupRequestData()
+ .setGroupId(groupId)
+ .setMembers(members);
+
+ for (short version = 0; version <=
ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
+ try {
+ LeaveGroupRequest request = builder.build(version);
+ if (version <= 2) {
+ fail("Older version " + version +
+ " request data should not be created due to
non-single members");
+ }
+ assertEquals(expectedData, request.data());
+ assertEquals(members, request.members());
+ LeaveGroupResponse expectedResponse = new LeaveGroupResponse(
+ Collections.emptyList(),
+ Errors.COORDINATOR_LOAD_IN_PROGRESS,
+ throttleTimeMs,
+ version
+ );
+
+ assertEquals(expectedResponse,
request.getErrorResponse(throttleTimeMs,
+
Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
+ } catch (UnsupportedVersionException e) {
+ assertTrue(e.getMessage().contains("leave group request only
supports single member instance"));
+ }
+ }
+
+ }
+
+ @Test
+ public void testSingleLeaveConstructor() {
final LeaveGroupRequestData expectedData = new LeaveGroupRequestData()
.setGroupId(groupId)
- .setMemberId(memberId);
+
.setMemberId(memberIdOne);
+ List<MemberIdentity> singleMember = Collections.singletonList(
+ new MemberIdentity()
+ .setMemberId(memberIdOne));
- final LeaveGroupRequest.Builder builder =
- new LeaveGroupRequest.Builder(new LeaveGroupRequestData()
- .setGroupId(groupId)
- .setMemberId(memberId));
+ builder = new LeaveGroupRequest.Builder(groupId, singleMember);
- for (short version = 0; version <=
ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
+ for (short version = 0; version <= 2; version++) {
LeaveGroupRequest request = builder.build(version);
assertEquals(expectedData, request.data());
+ assertEquals(singleMember, request.members());
int expectedThrottleTime = version >= 1 ? throttleTimeMs
:
AbstractResponse.DEFAULT_THROTTLE_TIME;
@@ -57,4 +120,9 @@ public class LeaveGroupRequestTest {
Errors.NOT_CONTROLLER.exception()));
}
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBuildEmptyMembers() {
+ new LeaveGroupRequest.Builder(groupId, Collections.emptyList());
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
index a1368b5..187640b 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
@@ -17,11 +17,16 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.junit.Before;
import org.junit.Test;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static
org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
@@ -30,10 +35,31 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class LeaveGroupResponseTest {
+
+ private final String memberIdOne = "member_1";
+ private final String instanceIdOne = "instance_1";
+ private final String memberIdTwo = "member_2";
+ private final String instanceIdTwo = "instance_2";
+
private final int throttleTimeMs = 10;
+ private List<MemberResponse> memberResponses;
+
+ @Before
+ public void setUp() {
+ memberResponses = Arrays.asList(new MemberResponse()
+ .setMemberId(memberIdOne)
+ .setGroupInstanceId(instanceIdOne)
+
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+ new MemberResponse()
+ .setMemberId(memberIdTwo)
+ .setGroupInstanceId(instanceIdTwo)
+
.setErrorCode(Errors.FENCED_INSTANCE_ID.code())
+ );
+ }
+
@Test
- public void testConstructor() {
+ public void testConstructorWithStruct() {
Map<Errors, Integer> expectedErrorCounts =
Collections.singletonMap(Errors.NOT_COORDINATOR, 1);
LeaveGroupResponseData responseData = new LeaveGroupResponseData()
@@ -54,9 +80,40 @@ public class LeaveGroupResponseTest {
}
}
+
+ @Test
+ public void testConstructorWithMemberResponses() {
+ Map<Errors, Integer> expectedErrorCounts = new HashMap<>();
+ expectedErrorCounts.put(Errors.UNKNOWN_MEMBER_ID, 1);
+ expectedErrorCounts.put(Errors.FENCED_INSTANCE_ID, 1);
+
+ for (short version = 0; version <=
ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
+ LeaveGroupResponse leaveGroupResponse = new
LeaveGroupResponse(memberResponses,
+
Errors.NONE,
+
throttleTimeMs,
+
version);
+
+ if (version >= 3) {
+ assertEquals(expectedErrorCounts,
leaveGroupResponse.errorCounts());
+ assertEquals(memberResponses,
leaveGroupResponse.memberResponses());
+ } else {
+
assertEquals(Collections.singletonMap(Errors.UNKNOWN_MEMBER_ID, 1),
+ leaveGroupResponse.errorCounts());
+ assertEquals(Collections.emptyList(),
leaveGroupResponse.memberResponses());
+ }
+
+ if (version >= 1) {
+ assertEquals(throttleTimeMs,
leaveGroupResponse.throttleTimeMs());
+ } else {
+ assertEquals(DEFAULT_THROTTLE_TIME,
leaveGroupResponse.throttleTimeMs());
+ }
+
+ assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResponse.error());
+ }
+ }
+
@Test
public void testShouldThrottle() {
- // A dummy setup is ok.
LeaveGroupResponse response = new LeaveGroupResponse(new
LeaveGroupResponseData());
for (short version = 0; version <=
ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
if (version >= 2) {
@@ -68,7 +125,7 @@ public class LeaveGroupResponseTest {
}
@Test
- public void testEquality() {
+ public void testEqualityWithStruct() {
LeaveGroupResponseData responseData = new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setThrottleTimeMs(throttleTimeMs);
@@ -80,6 +137,29 @@ public class LeaveGroupResponseTest {
assertEquals(primaryResponse, primaryResponse);
assertEquals(primaryResponse, secondaryResponse);
assertEquals(primaryResponse.hashCode(),
secondaryResponse.hashCode());
+
+ }
+ }
+
+ @Test
+ public void testEqualityWithMemberResponses() {
+ for (short version = 0; version <=
ApiKeys.LEAVE_GROUP.latestVersion(); version++) {
+ List<MemberResponse> localResponses = version > 2 ?
memberResponses : memberResponses.subList(0, 1);
+ LeaveGroupResponse primaryResponse = new
LeaveGroupResponse(localResponses,
+
Errors.NONE,
+
throttleTimeMs,
+
version);
+
+ // The order of members should not alter result data.
+ Collections.reverse(localResponses);
+ LeaveGroupResponse reversedResponse = new
LeaveGroupResponse(localResponses,
+
Errors.NONE,
+
throttleTimeMs,
+
version);
+
+ assertEquals(primaryResponse, primaryResponse);
+ assertEquals(primaryResponse, reversedResponse);
+ assertEquals(primaryResponse.hashCode(),
reversedResponse.hashCode());
}
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 10da9f5..9855210 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -55,6 +55,7 @@ import
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicRe
import org.apache.kafka.common.message.DeleteTopicsResponseData;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
@@ -69,7 +70,7 @@ import
org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
-import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
@@ -125,7 +126,6 @@ import java.util.Optional;
import java.util.Set;
import static java.util.Arrays.asList;
-import static java.util.Collections.singletonList;
import static
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
import static org.apache.kafka.test.TestUtils.toBuffer;
import static org.junit.Assert.assertEquals;
@@ -184,14 +184,14 @@ public class RequestResponseTest {
checkErrorResponse(createListOffsetRequest(2), new
UnknownServerException(), true);
checkResponse(createListOffsetResponse(2), 2, true);
checkRequest(MetadataRequest.Builder.allTopics().build((short) 2),
true);
- checkRequest(createMetadataRequest(1, singletonList("topic1")), true);
- checkErrorResponse(createMetadataRequest(1, singletonList("topic1")),
new UnknownServerException(), true);
+ checkRequest(createMetadataRequest(1,
Collections.singletonList("topic1")), true);
+ checkErrorResponse(createMetadataRequest(1,
Collections.singletonList("topic1")), new UnknownServerException(), true);
checkResponse(createMetadataResponse(), 2, true);
- checkErrorResponse(createMetadataRequest(2, singletonList("topic1")),
new UnknownServerException(), true);
+ checkErrorResponse(createMetadataRequest(2,
Collections.singletonList("topic1")), new UnknownServerException(), true);
checkResponse(createMetadataResponse(), 3, true);
- checkErrorResponse(createMetadataRequest(3, singletonList("topic1")),
new UnknownServerException(), true);
+ checkErrorResponse(createMetadataRequest(3,
Collections.singletonList("topic1")), new UnknownServerException(), true);
checkResponse(createMetadataResponse(), 4, true);
- checkErrorResponse(createMetadataRequest(4, singletonList("topic1")),
new UnknownServerException(), true);
+ checkErrorResponse(createMetadataRequest(4,
Collections.singletonList("topic1")), new UnknownServerException(), true);
checkRequest(OffsetFetchRequest.forAllPartitions("group1"), true);
checkErrorResponse(OffsetFetchRequest.forAllPartitions("group1"), new
NotCoordinatorException("Not Coordinator"), true);
checkRequest(createOffsetFetchRequest(0), true);
@@ -262,7 +262,7 @@ public class RequestResponseTest {
checkOlderFetchVersions();
checkResponse(createMetadataResponse(), 0, true);
checkResponse(createMetadataResponse(), 1, true);
- checkErrorResponse(createMetadataRequest(1, singletonList("topic1")),
new UnknownServerException(), true);
+ checkErrorResponse(createMetadataRequest(1,
Collections.singletonList("topic1")), new UnknownServerException(), true);
checkRequest(createOffsetCommitRequest(0), true);
checkErrorResponse(createOffsetCommitRequest(0), new
UnknownServerException(), true);
checkRequest(createOffsetCommitRequest(1), true);
@@ -723,8 +723,7 @@ public class RequestResponseTest {
public void testOffsetFetchRequestBuilderToString() {
String allTopicPartitionsString =
OffsetFetchRequest.Builder.allTopicPartitions("someGroup").toString();
assertTrue(allTopicPartitionsString.contains("<ALL>"));
- String string = new OffsetFetchRequest.Builder("group1",
- singletonList(new TopicPartition("test11", 1))).toString();
+ String string = new OffsetFetchRequest.Builder("group1",
Collections.singletonList(new TopicPartition("test11", 1))).toString();
assertTrue(string.contains("test11"));
assertTrue(string.contains("group1"));
}
@@ -896,14 +895,22 @@ public class RequestResponseTest {
DescribeGroupsResponseData describeGroupsResponseData = new
DescribeGroupsResponseData();
DescribeGroupsResponseData.DescribedGroupMember member =
DescribeGroupsResponse.groupMember("memberId", null,
clientId, clientHost, new byte[0], new byte[0]);
- DescribeGroupsResponseData.DescribedGroup metadata =
DescribeGroupsResponse.groupMetadata("test-group", Errors.NONE,
- "STABLE", "consumer", "roundrobin", asList(member),
Collections.emptySet());
+ DescribedGroup metadata =
DescribeGroupsResponse.groupMetadata("test-group",
+
Errors.NONE,
+
"STABLE",
+
"consumer",
+
"roundrobin",
+
Collections.singletonList(member),
+
Collections.emptySet());
describeGroupsResponseData.groups().add(metadata);
return new DescribeGroupsResponse(describeGroupsResponseData);
}
private LeaveGroupRequest createLeaveGroupRequest() {
- return new LeaveGroupRequest.Builder(new
LeaveGroupRequestData().setGroupId("group1").setMemberId("consumer1")).build();
+ return new LeaveGroupRequest.Builder(
+ "group1", Collections.singletonList(new MemberIdentity()
+ .setMemberId("consumer1"))
+ ).build();
}
private LeaveGroupResponse createLeaveGroupResponse() {
@@ -1040,7 +1047,7 @@ public class RequestResponseTest {
}
private OffsetFetchRequest createOffsetFetchRequest(int version) {
- return new OffsetFetchRequest.Builder("group1", singletonList(new
TopicPartition("test11", 1)))
+ return new OffsetFetchRequest.Builder("group1",
Collections.singletonList(new TopicPartition("test11", 1)))
.build((short) version);
}
@@ -1188,7 +1195,7 @@ public class RequestResponseTest {
private SaslHandshakeResponse createSaslHandshakeResponse() {
return new SaslHandshakeResponse(
new SaslHandshakeResponseData()
-
.setErrorCode(Errors.NONE.code()).setMechanisms(singletonList("GSSAPI")));
+
.setErrorCode(Errors.NONE.code()).setMechanisms(Collections.singletonList("GSSAPI")));
}
private SaslAuthenticateRequest createSaslAuthenticateRequest() {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 7f6641c..6a57d59 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -28,6 +28,7 @@ import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH,
NO_PRODUCER_ID}
import org.apache.kafka.common.requests._
@@ -252,7 +253,7 @@ class GroupCoordinator(val brokerId: Int,
} else if (group.isPendingMember(memberId)) {
// A rejoining pending member will be accepted. Note that pending
member will never be a static member.
if (groupInstanceId.isDefined) {
- throw new IllegalStateException(s"the static member $groupInstanceId
was unexpectedly to be assigned " +
+ throw new IllegalStateException(s"the static member $groupInstanceId
was not expected to be assigned " +
s"into pending member bucket with member id $memberId")
} else {
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs,
memberId, groupInstanceId,
@@ -419,36 +420,58 @@ class GroupCoordinator(val brokerId: Int,
}
}
- def handleLeaveGroup(groupId: String, memberId: String, responseCallback:
Errors => Unit): Unit = {
- validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).foreach { error =>
- responseCallback(error)
- return
- }
-
- groupManager.getGroup(groupId) match {
+ def handleLeaveGroup(groupId: String,
+ leavingMembers: List[MemberIdentity],
+ responseCallback: LeaveGroupResult => Unit) {
+ validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP) match {
+ case Some(error) =>
+ responseCallback(leaveError(error, List.empty))
case None =>
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
- case Some(group) =>
- group.inLock {
- if (group.is(Dead)) {
- responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
- } else if (group.isPendingMember(memberId)) {
- // if a pending member is leaving, it needs to be removed from the
pending list, heartbeat cancelled
- // and if necessary, prompt a JoinGroup completion.
- info(s"Pending member $memberId is leaving group
${group.groupId}.")
- removePendingMemberAndUpdateGroup(group, memberId)
- heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId,
memberId))
- responseCallback(Errors.NONE)
- } else if (!group.has(memberId)) {
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
- } else {
- val member = group.get(memberId)
- removeHeartbeatForLeavingMember(group, member)
- info(s"Member ${member.memberId} in group ${group.groupId} has
left, removing it from the group")
- removeMemberAndUpdateGroup(group, member, s"removing member
$memberId on LeaveGroup")
- responseCallback(Errors.NONE)
- }
+ groupManager.getGroup(groupId) match {
+ case None =>
+ responseCallback(leaveError(Errors.NONE, leavingMembers.map
{leavingMember =>
+ memberLeaveError(leavingMember, Errors.UNKNOWN_MEMBER_ID)
+ }))
+ case Some(group) =>
+ group.inLock {
+ if (group.is(Dead)) {
+ responseCallback(leaveError(Errors.COORDINATOR_NOT_AVAILABLE,
List.empty))
+ } else {
+ val memberErrors = leavingMembers.map { leavingMember =>
+ val memberId = leavingMember.memberId
+ val groupInstanceId = Option(leavingMember.groupInstanceId)
+ if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID
+ && group.isStaticMemberFenced(memberId, groupInstanceId)) {
+ memberLeaveError(leavingMember, Errors.FENCED_INSTANCE_ID)
+ } else if (group.isPendingMember(memberId)) {
+ if (groupInstanceId.isDefined) {
+ throw new IllegalStateException(s"the static member
$groupInstanceId was not expected to be leaving " +
+ s"from pending member bucket with member id $memberId")
+ } else {
+ // if a pending member is leaving, it needs to be
removed from the pending list, heartbeat cancelled
+ // and if necessary, prompt a JoinGroup completion.
+ info(s"Pending member $memberId is leaving group
${group.groupId}.")
+ removePendingMemberAndUpdateGroup(group, memberId)
+
heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
+ memberLeaveError(leavingMember, Errors.NONE)
+ }
+ } else if (!group.has(memberId) &&
!group.hasStaticMember(groupInstanceId)) {
+ memberLeaveError(leavingMember, Errors.UNKNOWN_MEMBER_ID)
+ } else {
+ val member = if (group.hasStaticMember(groupInstanceId))
+ group.get(group.getStaticMemberId(groupInstanceId))
+ else
+ group.get(memberId)
+ removeHeartbeatForLeavingMember(group, member)
+ info(s"Member[group.instance.id ${member.groupInstanceId},
member.id ${member.memberId}] " +
+ s"in group ${group.groupId} has left, removing it from
the group")
+ removeMemberAndUpdateGroup(group, member, s"removing
member $memberId on LeaveGroup")
+ memberLeaveError(leavingMember, Errors.NONE)
+ }
+ }
+ responseCallback(leaveError(Errors.NONE, memberErrors))
+ }
+ }
}
}
}
@@ -1106,6 +1129,21 @@ object GroupCoordinator {
leaderId = GroupCoordinator.NoLeader,
error = error)
}
+
+ private def memberLeaveError(memberIdentity: MemberIdentity,
+ error: Errors): LeaveMemberResponse = {
+ LeaveMemberResponse(
+ memberId = memberIdentity.memberId,
+ groupInstanceId = Option(memberIdentity.groupInstanceId),
+ error = error)
+ }
+
+ private def leaveError(topLevelError: Errors,
+ memberResponses: List[LeaveMemberResponse]):
LeaveGroupResult = {
+ LeaveGroupResult(
+ topLevelError = topLevelError,
+ memberResponses = memberResponses)
+ }
}
case class GroupConfig(groupMinSessionTimeoutMs: Int,
@@ -1122,3 +1160,10 @@ case class JoinGroupResult(members:
List[JoinGroupResponseMember],
case class SyncGroupResult(memberAssignment: Array[Byte],
error: Errors)
+
+case class LeaveMemberResponse(memberId: String,
+ groupInstanceId: Option[String],
+ error: Errors)
+
+case class LeaveGroupResult(topLevelError: Errors,
+ memberResponses : List[LeaveMemberResponse])
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index cec7470..ecd6c94 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -31,7 +31,7 @@ import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
import kafka.controller.KafkaController
-import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult,
SyncGroupResult}
+import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult,
LeaveGroupResult, SyncGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
import kafka.message.ZStdCompressionCodec
import kafka.network.RequestChannel
@@ -62,6 +62,7 @@ import org.apache.kafka.common.message.HeartbeatResponseData
import org.apache.kafka.common.message.InitProducerIdResponseData
import org.apache.kafka.common.message.JoinGroupResponseData
import org.apache.kafka.common.message.LeaveGroupResponseData
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.ListGroupsResponseData
import org.apache.kafka.common.message.OffsetCommitRequestData
import org.apache.kafka.common.message.OffsetCommitResponseData
@@ -1514,9 +1515,9 @@ class KafkaApis(val requestChannel: RequestChannel,
def sendResponseCallback(error: Errors) {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val response = new HeartbeatResponse(
- new HeartbeatResponseData()
- .setThrottleTimeMs(requestThrottleMs)
- .setErrorCode(error.code))
+ new HeartbeatResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setErrorCode(error.code))
trace("Sending heartbeat response %s for correlation id %d to client
%s."
.format(response, request.header.correlationId,
request.header.clientId))
response
@@ -1549,29 +1550,37 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleLeaveGroupRequest(request: RequestChannel.Request) {
val leaveGroupRequest = request.body[LeaveGroupRequest]
- // the callback for sending a leave-group response
- def sendResponseCallback(error: Errors) {
- def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val response = new LeaveGroupResponse(new LeaveGroupResponseData()
- .setThrottleTimeMs(requestThrottleMs)
- .setErrorCode(error.code()))
- trace("Sending leave group response %s for correlation id %d to client
%s."
- .format(response, request.header.correlationId,
request.header.clientId))
- response
- }
- sendResponseMaybeThrottle(request, createResponse)
- }
+ val members = leaveGroupRequest.members().asScala.toList
- if (!authorize(request.session, Read, Resource(Group,
leaveGroupRequest.data().groupId(), LITERAL))) {
- sendResponseMaybeThrottle(request, requestThrottleMs =>
+ if (!authorize(request.session, Read, Resource(Group,
leaveGroupRequest.data.groupId, LITERAL))) {
+ sendResponseMaybeThrottle(request, requestThrottleMs => {
new LeaveGroupResponse(new LeaveGroupResponseData()
.setThrottleTimeMs(requestThrottleMs)
- .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())))
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
+ )
+ })
} else {
- // let the coordinator to handle leave-group
+ def sendResponseCallback(leaveGroupResult : LeaveGroupResult) {
+ val memberResponses = leaveGroupResult.memberResponses.map(
+ leaveGroupResult =>
+ new MemberResponse()
+ .setErrorCode(leaveGroupResult.error.code)
+ .setMemberId(leaveGroupResult.memberId)
+ .setGroupInstanceId(leaveGroupResult.groupInstanceId.orNull)
+ )
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ new LeaveGroupResponse(
+ memberResponses.asJava,
+ leaveGroupResult.topLevelError,
+ requestThrottleMs,
+ leaveGroupRequest.version)
+ }
+ sendResponseMaybeThrottle(request, createResponse)
+ }
+
groupCoordinator.handleLeaveGroup(
- leaveGroupRequest.data().groupId(),
- leaveGroupRequest.data().memberId(),
+ leaveGroupRequest.data.groupId,
+ members,
sendResponseCallback)
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 41bfb0d..cb5f1aa 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -46,7 +46,7 @@ import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource,
AlterableConfig, AlterableConfigCollection}
import org.apache.kafka.common.message.JoinGroupRequestData
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
-import org.apache.kafka.common.message.LeaveGroupRequestData
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.OffsetCommitRequestData
import org.apache.kafka.common.message.SyncGroupRequestData
import org.apache.kafka.common.network.ListenerName
@@ -398,9 +398,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)).build()
private def leaveGroupRequest = new LeaveGroupRequest.Builder(
- new LeaveGroupRequestData()
- .setGroupId(group)
- .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)).build()
+ group, Collections.singletonList(
+ new MemberIdentity()
+ .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ )).build()
private def deleteGroupsRequest = new DeleteGroupsRequest.Builder(
new DeleteGroupsRequestData()
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 1cee665..b85035f 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -26,6 +26,7 @@ import
kafka.coordinator.group.GroupCoordinatorConcurrencyTest._
import kafka.server.{DelayedOperationPurgatory, KafkaConfig}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.JoinGroupRequest
import org.apache.kafka.common.utils.Time
@@ -153,8 +154,8 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
}
- class JoinGroupOperation extends GroupOperation[JoinGroupResult,
JoinGroupCallback] {
- override def responseCallback(responsePromise: Promise[JoinGroupResult]):
JoinGroupCallback = {
+ class JoinGroupOperation extends GroupOperation[JoinGroupCallbackParams,
JoinGroupCallback] {
+ override def responseCallback(responsePromise:
Promise[JoinGroupCallbackParams]): JoinGroupCallback = {
val callback: JoinGroupCallback = responsePromise.success(_)
callback
}
@@ -263,21 +264,28 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
class LeaveGroupOperation extends GroupOperation[LeaveGroupCallbackParams,
LeaveGroupCallback] {
override def responseCallback(responsePromise:
Promise[LeaveGroupCallbackParams]): LeaveGroupCallback = {
- val callback: LeaveGroupCallback = error =>
responsePromise.success(error)
+ val callback: LeaveGroupCallback = result =>
responsePromise.success(result)
callback
}
override def runWithCallback(member: GroupMember, responseCallback:
LeaveGroupCallback): Unit = {
- groupCoordinator.handleLeaveGroup(member.group.groupId, member.memberId,
responseCallback)
+ val memberIdentity = new MemberIdentity()
+ .setMemberId(member.memberId)
+ groupCoordinator.handleLeaveGroup(member.group.groupId,
List(memberIdentity), responseCallback)
}
override def awaitAndVerify(member: GroupMember): Unit = {
- val error = await(member, DefaultSessionTimeout)
- assertEquals(Errors.NONE, error)
+ val leaveGroupResult = await(member, DefaultSessionTimeout)
+
+ val memberResponses = leaveGroupResult.memberResponses
+ GroupCoordinatorTest.verifyLeaveGroupResult(leaveGroupResult,
Errors.NONE, List(Errors.NONE))
+ assertEquals(member.memberId, memberResponses.head.memberId)
+ assertEquals(None, memberResponses.head.groupInstanceId)
}
}
}
object GroupCoordinatorConcurrencyTest {
+ type JoinGroupCallbackParams = JoinGroupResult
type JoinGroupCallback = JoinGroupResult => Unit
type SyncGroupCallbackParams = (Array[Byte], Errors)
type SyncGroupCallback = SyncGroupResult => Unit
@@ -285,8 +293,8 @@ object GroupCoordinatorConcurrencyTest {
type HeartbeatCallback = Errors => Unit
type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
- type LeaveGroupCallbackParams = Errors
- type LeaveGroupCallback = Errors => Unit
+ type LeaveGroupCallbackParams = LeaveGroupResult
+ type LeaveGroupCallback = LeaveGroupResult => Unit
type CompleteTxnCallbackParams = Errors
type CompleteTxnCallback = Errors => Unit
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index d72bfc8..cdf1518 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.group
import java.util.Optional
import kafka.common.OffsetAndMetadata
-import kafka.server.{DelayedOperationPurgatory, KafkaConfig, HostedPartition,
ReplicaManager}
+import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig,
ReplicaManager}
import kafka.utils._
import kafka.utils.timer.MockTimer
import org.apache.kafka.common.TopicPartition
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.cluster.Partition
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}
import org.scalatest.Assertions.intercept
@@ -45,6 +46,8 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, Promise, TimeoutException}
class GroupCoordinatorTest {
+ import GroupCoordinatorTest._
+
type JoinGroupCallback = JoinGroupResult => Unit
type SyncGroupCallbackParams = (Array[Byte], Errors)
type SyncGroupCallback = SyncGroupResult => Unit
@@ -52,8 +55,7 @@ class GroupCoordinatorTest {
type HeartbeatCallback = Errors => Unit
type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
- type LeaveGroupCallbackParams = Errors
- type LeaveGroupCallback = Errors => Unit
+ type LeaveGroupCallback = LeaveGroupResult => Unit
val ClientId = "consumer-test"
val ClientHost = "localhost"
@@ -951,6 +953,24 @@ class GroupCoordinatorTest {
}
@Test
+ def staticMemberLeaveWithIllegalStateAsPendingMember() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId,
followerInstanceId)
+ val group = groupCoordinator.groupManager.getGroup(groupId).get
+ group.addPendingMember(rebalanceResult.followerId)
+ group.remove(rebalanceResult.followerId)
+ EasyMock.reset(replicaManager)
+
+ // Illegal state exception shall trigger since follower id resides in
pending member bucket.
+ val expectedException = intercept[IllegalStateException] {
+ singleLeaveGroup(groupId, rebalanceResult.followerId, followerInstanceId)
+ }
+
+ val message = expectedException.getMessage
+ assertTrue(message.contains(rebalanceResult.followerId))
+ assertTrue(message.contains(followerInstanceId.get))
+ }
+
+ @Test
def staticMemberReJoinWithIllegalStateAsUnknownMember() {
staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
val group = groupCoordinator.groupManager.getGroup(groupId).get
@@ -1036,8 +1056,8 @@ class GroupCoordinatorTest {
// Send a special leave group request from static follower, moving group
towards PreparingRebalance
EasyMock.reset(replicaManager)
- val followerLeaveGroupResult = leaveGroup(groupId,
rebalanceResult.followerId)
- assertEquals(Errors.NONE, followerLeaveGroupResult)
+ val followerLeaveGroupResults = singleLeaveGroup(groupId,
rebalanceResult.followerId)
+ verifyLeaveGroupResult(followerLeaveGroupResults)
assertGroupState(groupState = PreparingRebalance)
timer.advanceClock(DefaultRebalanceTimeout + 1)
@@ -1791,8 +1811,8 @@ class GroupCoordinatorTest {
val pending = setupGroupWithPendingMember()
EasyMock.reset(replicaManager)
- val leaveGroupResult = leaveGroup(groupId, pending.memberId)
- assertEquals(Errors.NONE, leaveGroupResult)
+ val leaveGroupResults = singleLeaveGroup(groupId, pending.memberId)
+ verifyLeaveGroupResult(leaveGroupResults)
assertGroupState(groupState = CompletingRebalance)
assertEquals(2, group().allMembers.size)
@@ -2004,8 +2024,8 @@ class GroupCoordinatorTest {
// and leaves.
EasyMock.reset(replicaManager)
- val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
- assertEquals(Errors.NONE, leaveGroupResult)
+ val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId)
+ verifyLeaveGroupResult(leaveGroupResults)
// The simple offset commit should now fail
EasyMock.reset(replicaManager)
@@ -2436,25 +2456,14 @@ class GroupCoordinatorTest {
def testLeaveGroupWrongCoordinator() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val leaveGroupResult = leaveGroup(otherGroupId, memberId)
- assertEquals(Errors.NOT_COORDINATOR, leaveGroupResult)
+ val leaveGroupResults = singleLeaveGroup(otherGroupId, memberId)
+ verifyLeaveGroupResult(leaveGroupResults, Errors.NOT_COORDINATOR)
}
@Test
def testLeaveGroupUnknownGroup() {
- val leaveGroupResult = leaveGroup(groupId, memberId)
- assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult)
- }
-
- @Test
- def testLeaveDeadGroup() {
- val memberId = "memberId"
-
- val deadGroupId = "deadGroupId"
-
- groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId,
Dead, new MockTime()))
- val leaveGroupResult = leaveGroup(deadGroupId, memberId)
- assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, leaveGroupResult)
+ val leaveGroupResults = singleLeaveGroup(groupId, memberId)
+ verifyLeaveGroupResult(leaveGroupResults, Errors.NONE,
List(Errors.UNKNOWN_MEMBER_ID))
}
@Test
@@ -2467,8 +2476,27 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, joinGroupError)
EasyMock.reset(replicaManager)
- val leaveGroupResult = leaveGroup(groupId, otherMemberId)
- assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult)
+ val leaveGroupResults = singleLeaveGroup(groupId, otherMemberId)
+ verifyLeaveGroupResult(leaveGroupResults, Errors.NONE,
List(Errors.UNKNOWN_MEMBER_ID))
+ }
+
+ @Test
+ def testSingleLeaveDeadGroup() {
+ val deadGroupId = "deadGroupId"
+
+ groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId,
Dead, new MockTime()))
+ val leaveGroupResults = singleLeaveGroup(deadGroupId, memberId)
+ verifyLeaveGroupResult(leaveGroupResults, Errors.COORDINATOR_NOT_AVAILABLE)
+ }
+
+ @Test
+ def testBatchLeaveDeadGroup() {
+ val deadGroupId = "deadGroupId"
+
+ groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId,
Dead, new MockTime()))
+ val leaveGroupResults = batchLeaveGroup(deadGroupId,
+ List(new MemberIdentity().setMemberId(memberId), new
MemberIdentity().setMemberId(memberId)))
+ verifyLeaveGroupResult(leaveGroupResults, Errors.COORDINATOR_NOT_AVAILABLE)
}
@Test
@@ -2481,8 +2509,116 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, joinGroupError)
EasyMock.reset(replicaManager)
- val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
- assertEquals(Errors.NONE, leaveGroupResult)
+ val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId)
+ verifyLeaveGroupResult(leaveGroupResults)
+ }
+
+ @Test
+ def testLeaveGroupWithFencedInstanceId() {
+ val joinGroupResult = staticJoinGroup(groupId,
JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType,
protocolSuperset)
+ assertEquals(Errors.NONE, joinGroupResult.error)
+
+ EasyMock.reset(replicaManager)
+ val leaveGroupResults = singleLeaveGroup(groupId, "some_member",
leaderInstanceId)
+ verifyLeaveGroupResult(leaveGroupResults, Errors.NONE,
List(Errors.FENCED_INSTANCE_ID))
+ }
+
+ @Test
+ def testLeaveGroupStaticMemberWithUnknownMemberId() {
+ val joinGroupResult = staticJoinGroup(groupId,
JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType,
protocolSuperset)
+ assertEquals(Errors.NONE, joinGroupResult.error)
+
+ EasyMock.reset(replicaManager)
+ // Having unknown member id will not affect the request processing.
+ val leaveGroupResults = singleLeaveGroup(groupId,
JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId)
+ verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.NONE))
+ }
+
+ @Test
+ def testStaticMembersValidBatchLeaveGroup() {
+ staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ EasyMock.reset(replicaManager)
+ val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity()
+ .setGroupInstanceId(leaderInstanceId.get), new
MemberIdentity().setGroupInstanceId(followerInstanceId.get)))
+
+ verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.NONE,
Errors.NONE))
+ }
+
+ @Test
+ def testStaticMembersWrongCoordinatorBatchLeaveGroup() {
+ staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ EasyMock.reset(replicaManager)
+ val leaveGroupResults = batchLeaveGroup("invalid-group", List(new
MemberIdentity()
+ .setGroupInstanceId(leaderInstanceId.get), new
MemberIdentity().setGroupInstanceId(followerInstanceId.get)))
+
+ verifyLeaveGroupResult(leaveGroupResults, Errors.NOT_COORDINATOR)
+ }
+
+ @Test
+ def testStaticMembersUnknownGroupBatchLeaveGroup() {
+ val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity()
+ .setGroupInstanceId(leaderInstanceId.get), new
MemberIdentity().setGroupInstanceId(followerInstanceId.get)))
+
+ verifyLeaveGroupResult(leaveGroupResults, Errors.NONE,
List(Errors.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID))
+ }
+
+ @Test
+ def testStaticMembersFencedInstanceBatchLeaveGroup() {
+ staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ EasyMock.reset(replicaManager)
+ val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity()
+ .setGroupInstanceId(leaderInstanceId.get), new MemberIdentity()
+ .setGroupInstanceId(followerInstanceId.get)
+ .setMemberId("invalid-member")))
+
+ verifyLeaveGroupResult(leaveGroupResults, Errors.NONE, List(Errors.NONE,
Errors.FENCED_INSTANCE_ID))
+ }
+
+ @Test
+ def testStaticMembersUnknownInstanceBatchLeaveGroup() {
+ staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ EasyMock.reset(replicaManager)
+ val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity()
+ .setGroupInstanceId("unknown-instance"), new MemberIdentity()
+ .setGroupInstanceId(followerInstanceId.get)))
+
+ verifyLeaveGroupResult(leaveGroupResults, Errors.NONE,
List(Errors.UNKNOWN_MEMBER_ID, Errors.NONE))
+ }
+
+ @Test
+ def testPendingMemberBatchLeaveGroup() {
+ val pendingMember = setupGroupWithPendingMember()
+
+ EasyMock.reset(replicaManager)
+ val leaveGroupResults = batchLeaveGroup(groupId, List(new MemberIdentity()
+ .setGroupInstanceId("unknown-instance"), new MemberIdentity()
+ .setMemberId(pendingMember.memberId)))
+
+ verifyLeaveGroupResult(leaveGroupResults, Errors.NONE,
List(Errors.UNKNOWN_MEMBER_ID, Errors.NONE))
+ }
+
+ @Test
+ def testPendingMemberWithUnexpectedInstanceIdBatchLeaveGroup() {
+ val pendingMember = setupGroupWithPendingMember()
+
+ EasyMock.reset(replicaManager)
+
+ // Bypass the FENCED_INSTANCE_ID check by defining pending member as a
static member.
+ val instanceId = "instanceId"
+ val pendingMemberId = pendingMember.memberId
+ getGroup(groupId).addStaticMember(Option(instanceId), pendingMemberId)
+ val expectedException = intercept[IllegalStateException] {
+ batchLeaveGroup(groupId, List(new
MemberIdentity().setGroupInstanceId("unknown-instance"),
+ new
MemberIdentity().setGroupInstanceId(instanceId).setMemberId(pendingMemberId)))
+ }
+
+ val message = expectedException.getMessage
+ assertTrue(message.contains(instanceId))
+ assertTrue(message.contains(pendingMemberId))
}
@Test
@@ -2622,8 +2758,8 @@ class GroupCoordinatorTest {
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType,
protocols)
EasyMock.reset(replicaManager)
- val leaveGroupResult = leaveGroup(groupId, joinGroupResult.memberId)
- assertEquals(Errors.NONE, leaveGroupResult)
+ val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId)
+ verifyLeaveGroupResult(leaveGroupResults)
val groupTopicPartition = new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition: Partition = EasyMock.niceMock(classOf[Partition])
@@ -2663,8 +2799,8 @@ class GroupCoordinatorTest {
assertEquals(assignedMemberId,
describeGroupResult._2.members.head.memberId)
EasyMock.reset(replicaManager)
- val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
- assertEquals(Errors.NONE, leaveGroupResult)
+ val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId)
+ verifyLeaveGroupResult(leaveGroupResults)
val groupTopicPartition = new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition: Partition = EasyMock.niceMock(classOf[Partition])
@@ -2785,6 +2921,13 @@ class GroupCoordinatorTest {
(responseFuture, responseCallback)
}
+ private def setupLeaveGroupCallback: (Future[LeaveGroupResult],
LeaveGroupCallback) = {
+ val responsePromise = Promise[LeaveGroupResult]
+ val responseFuture = responsePromise.future
+ val responseCallback: LeaveGroupCallback = result =>
responsePromise.success(result)
+ (responseFuture, responseCallback)
+ }
+
private def sendJoinGroup(groupId: String,
memberId: String,
protocolType: String,
@@ -2980,15 +3123,26 @@ class GroupCoordinatorTest {
result
}
- private def leaveGroup(groupId: String, consumerId: String):
LeaveGroupCallbackParams = {
- val (responseFuture, responseCallback) = setupHeartbeatCallback
+ private def singleLeaveGroup(groupId: String,
+ consumerId: String,
+ groupInstanceId: Option[String] = None):
LeaveGroupResult = {
+ val singleMemberIdentity = List(
+ new MemberIdentity()
+ .setMemberId(consumerId)
+ .setGroupInstanceId(groupInstanceId.orNull))
+ batchLeaveGroup(groupId, singleMemberIdentity)
+ }
+
+ private def batchLeaveGroup(groupId: String,
+ memberIdentities: List[MemberIdentity]):
LeaveGroupResult = {
+ val (responseFuture, responseCallback) = setupLeaveGroupCallback
EasyMock.expect(replicaManager.getPartition(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)))
.andReturn(HostedPartition.None)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
- groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
+ groupCoordinator.handleLeaveGroup(groupId, memberIdentities,
responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}
@@ -3003,3 +3157,17 @@ class GroupCoordinatorTest {
OffsetAndMetadata(offset, "", timer.time.milliseconds())
}
}
+
+object GroupCoordinatorTest {
+ def verifyLeaveGroupResult(leaveGroupResult: LeaveGroupResult,
+ expectedTopLevelError: Errors = Errors.NONE,
+ expectedMemberLevelErrors: List[Errors] =
List.empty) {
+ assertEquals(expectedTopLevelError, leaveGroupResult.topLevelError)
+ if (expectedMemberLevelErrors.nonEmpty) {
+ assertEquals(expectedMemberLevelErrors.size,
leaveGroupResult.memberResponses.size)
+ for (i <- expectedMemberLevelErrors.indices) {
+ assertEquals(expectedMemberLevelErrors(i),
leaveGroupResult.memberResponses(i).error)
+ }
+ }
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index c26c3fd..44f0301 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -50,6 +50,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
import EasyMock._
import org.apache.kafka.common.message.{HeartbeatRequestData,
JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData,
SyncGroupRequestData}
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.replica.ClientMetadata
import org.junit.Assert.{assertEquals, assertNull, assertTrue}
import org.junit.{After, Test}
@@ -643,6 +644,63 @@ class KafkaApisTest {
EasyMock.replay(groupCoordinator)
}
+ @Test
+ def testMultipleLeaveGroup() {
+ val groupId = "groupId"
+
+ val leaveMemberList = List(
+ new MemberIdentity()
+ .setMemberId("member-1")
+ .setGroupInstanceId("instance-1"),
+ new MemberIdentity()
+ .setMemberId("member-2")
+ .setGroupInstanceId("instance-2")
+ )
+
+ EasyMock.expect(groupCoordinator.handleLeaveGroup(
+ EasyMock.eq(groupId),
+ EasyMock.eq(leaveMemberList),
+ anyObject()
+ ))
+
+ val (_, leaveRequest) = buildRequest(
+ new LeaveGroupRequest.Builder(
+ groupId,
+ leaveMemberList.asJava)
+ )
+
+ createKafkaApis().handleLeaveGroupRequest(leaveRequest)
+
+ EasyMock.replay(groupCoordinator)
+ }
+
+ @Test
+ def testSingleLeaveGroup() {
+ val groupId = "groupId"
+ val memberId = "member"
+
+ val singleLeaveMember = List(
+ new MemberIdentity()
+ .setMemberId(memberId)
+ )
+
+ EasyMock.expect(groupCoordinator.handleLeaveGroup(
+ EasyMock.eq(groupId),
+ EasyMock.eq(singleLeaveMember),
+ anyObject()
+ ))
+
+ val (_, leaveRequest) = buildRequest(
+ new LeaveGroupRequest.Builder(
+ groupId,
+ singleLeaveMember.asJava)
+ )
+
+ createKafkaApis().handleLeaveGroupRequest(leaveRequest)
+
+ EasyMock.replay(groupCoordinator)
+ }
+
/**
* Return pair of listener names in the metadataCache: PLAINTEXT and
LISTENER2 respectively.
*/
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 7fc3de9..242ab21 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -39,7 +39,7 @@ import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
import org.apache.kafka.common.message.InitProducerIdRequestData
import org.apache.kafka.common.message.JoinGroupRequestData
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
-import org.apache.kafka.common.message.LeaveGroupRequestData
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListGroupsRequestData
import org.apache.kafka.common.message.OffsetCommitRequestData
import org.apache.kafka.common.message.SaslAuthenticateRequestData
@@ -325,9 +325,10 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.LEAVE_GROUP =>
new LeaveGroupRequest.Builder(
- new LeaveGroupRequestData()
- .setGroupId("test-leave-group")
- .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ "test-leave-group",
+ Collections.singletonList(
+ new MemberIdentity()
+ .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID))
)
case ApiKeys.SYNC_GROUP =>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index a9baa3f..e1bfe37 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -278,7 +278,6 @@ abstract class AssignedTasks<T extends Task> {
int commit() {
int committed = 0;
RuntimeException firstException = null;
-
for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
final T task = it.next();
try {