This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5a5239770ff MINOR: Refactor GroupCoordinator's Assertions (#17755)
5a5239770ff is described below
commit 5a5239770ff3565233e5cbecf11446e76339f8fe
Author: David Jacot <[email protected]>
AuthorDate: Tue Nov 12 14:30:58 2024 +0100
MINOR: Refactor GroupCoordinator's Assertions (#17755)
This patch cleans up the `Assertions` class in the `group-coordinator`
module.
Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../apache/kafka/coordinator/group/Assertions.java | 493 +++++++++------------
.../group/GroupMetadataManagerTestContext.java | 4 +-
2 files changed, 210 insertions(+), 287 deletions(-)
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
index 4206f7a690b..12cd0c1ab9f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
@@ -19,10 +19,10 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
@@ -36,20 +36,34 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.opentest4j.AssertionFailedError;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
public class Assertions {
+ private static final BiConsumer<ApiMessage, ApiMessage>
API_MESSAGE_DEFAULT_COMPARATOR = org.junit.jupiter.api.Assertions::assertEquals;
+ private static final Map<Class<?>, BiConsumer<ApiMessage, ApiMessage>>
API_MESSAGE_COMPARATORS = Map.of(
+ // Register request/response comparators.
+ ConsumerGroupHeartbeatResponseData.class,
Assertions::assertConsumerGroupHeartbeatResponse,
+ ShareGroupHeartbeatResponseData.class,
Assertions::assertShareGroupHeartbeatResponse,
+ SyncGroupResponseData.class, Assertions::assertSyncGroupResponse,
+
+ // Register record comparators.
+ ConsumerGroupCurrentMemberAssignmentValue.class,
Assertions::assertConsumerGroupCurrentMemberAssignmentValue,
+ ConsumerGroupPartitionMetadataValue.class,
Assertions::assertConsumerGroupPartitionMetadataValue,
+ GroupMetadataValue.class, Assertions::assertGroupMetadataValue,
+ ConsumerGroupTargetAssignmentMemberValue.class,
Assertions::assertConsumerGroupTargetAssignmentMemberValue,
+ ShareGroupPartitionMetadataValue.class,
Assertions::assertShareGroupPartitionMetadataValue
+ );
+
public static <T> void assertUnorderedListEquals(
List<T> expected,
List<T> actual
@@ -58,101 +72,12 @@ public class Assertions {
}
public static void assertResponseEquals(
- ConsumerGroupHeartbeatResponseData expected,
- ConsumerGroupHeartbeatResponseData actual
- ) {
- if (!responseEquals(expected, actual)) {
- assertionFailure()
- .expected(expected)
- .actual(actual)
- .buildAndThrow();
- }
- }
-
- public static void assertResponseEquals(
- ShareGroupHeartbeatResponseData expected,
- ShareGroupHeartbeatResponseData actual
- ) {
- if (!responseEquals(expected, actual)) {
- assertionFailure()
- .expected(expected)
- .actual(actual)
- .buildAndThrow();
- }
- }
-
- private static boolean responseEquals(
- ConsumerGroupHeartbeatResponseData expected,
- ConsumerGroupHeartbeatResponseData actual
- ) {
- if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
- if (expected.errorCode() != actual.errorCode()) return false;
- if (!Objects.equals(expected.errorMessage(), actual.errorMessage()))
return false;
- if (!Objects.equals(expected.memberId(), actual.memberId())) return
false;
- if (expected.memberEpoch() != actual.memberEpoch()) return false;
- if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs())
return false;
- // Unordered comparison of the assignments.
- return responseAssignmentEquals(expected.assignment(),
actual.assignment());
- }
-
- private static boolean responseEquals(
- ShareGroupHeartbeatResponseData expected,
- ShareGroupHeartbeatResponseData actual
- ) {
- if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
- if (expected.errorCode() != actual.errorCode()) return false;
- if (!Objects.equals(expected.errorMessage(), actual.errorMessage()))
return false;
- if (!Objects.equals(expected.memberId(), actual.memberId())) return
false;
- if (expected.memberEpoch() != actual.memberEpoch()) return false;
- if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs())
return false;
- // Unordered comparison of the assignments.
- return responseAssignmentEquals(expected.assignment(),
actual.assignment());
- }
-
- private static boolean responseAssignmentEquals(
- ConsumerGroupHeartbeatResponseData.Assignment expected,
- ConsumerGroupHeartbeatResponseData.Assignment actual
- ) {
- if (expected == actual) return true;
- if (expected == null) return false;
- if (actual == null) return false;
-
- return Objects.equals(fromAssignment(expected.topicPartitions()),
fromAssignment(actual.topicPartitions()));
- }
-
- private static boolean responseAssignmentEquals(
- ShareGroupHeartbeatResponseData.Assignment expected,
- ShareGroupHeartbeatResponseData.Assignment actual
- ) {
- if (expected == actual) return true;
- if (expected == null) return false;
- if (actual == null) return false;
-
- return
Objects.equals(fromShareGroupAssignment(expected.topicPartitions()),
fromShareGroupAssignment(actual.topicPartitions()));
- }
-
- private static Map<Uuid, Set<Integer>> fromAssignment(
- List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment
- ) {
- if (assignment == null) return null;
-
- Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
- assignment.forEach(topicPartitions ->
- assignmentMap.put(topicPartitions.topicId(), new
HashSet<>(topicPartitions.partitions()))
- );
- return assignmentMap;
- }
-
- private static Map<Uuid, Set<Integer>> fromShareGroupAssignment(
- List<ShareGroupHeartbeatResponseData.TopicPartitions> assignment
+ ApiMessage expected,
+ ApiMessage actual
) {
- if (assignment == null) return null;
-
- Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
- assignment.forEach(topicPartitions -> {
- assignmentMap.put(topicPartitions.topicId(), new
HashSet<>(topicPartitions.partitions()));
- });
- return assignmentMap;
+ BiConsumer<ApiMessage, ApiMessage> asserter = API_MESSAGE_COMPARATORS
+ .getOrDefault(expected.getClass(), API_MESSAGE_DEFAULT_COMPARATOR);
+ asserter.accept(expected, actual);
}
public static void assertRecordsEquals(
@@ -190,221 +115,219 @@ public class Assertions {
}
}
- @SuppressWarnings({ "CyclomaticComplexity", "MethodLength" })
+ private static void assertConsumerGroupHeartbeatResponse(
+ ApiMessage exp,
+ ApiMessage act
+ ) {
+ ConsumerGroupHeartbeatResponseData expected =
(ConsumerGroupHeartbeatResponseData) exp.duplicate();
+ ConsumerGroupHeartbeatResponseData actual =
(ConsumerGroupHeartbeatResponseData) act.duplicate();
+
+ Consumer<ConsumerGroupHeartbeatResponseData> normalize = message -> {
+ if (message.assignment() != null) {
+
message.assignment().topicPartitions().sort(Comparator.comparing(ConsumerGroupHeartbeatResponseData.TopicPartitions::topicId));
+ message.assignment().topicPartitions().forEach(topic ->
topic.partitions().sort(Integer::compareTo));
+ }
+ };
+
+ normalize.accept(expected);
+ normalize.accept(actual);
+
+ assertEquals(expected, actual);
+ }
+
+ private static void assertShareGroupHeartbeatResponse(
+ ApiMessage exp,
+ ApiMessage act
+ ) {
+ ShareGroupHeartbeatResponseData expected =
(ShareGroupHeartbeatResponseData) exp.duplicate();
+ ShareGroupHeartbeatResponseData actual =
(ShareGroupHeartbeatResponseData) act.duplicate();
+
+ Consumer<ShareGroupHeartbeatResponseData> normalize = message -> {
+ if (message.assignment() != null) {
+
message.assignment().topicPartitions().sort(Comparator.comparing(ShareGroupHeartbeatResponseData.TopicPartitions::topicId));
+ message.assignment().topicPartitions().forEach(topic ->
topic.partitions().sort(Integer::compareTo));
+ }
+ };
+
+ normalize.accept(expected);
+ normalize.accept(actual);
+
+ assertEquals(expected, actual);
+ }
+
private static void assertApiMessageAndVersionEquals(
ApiMessageAndVersion expected,
ApiMessageAndVersion actual
) {
if (expected == actual) return;
-
+ assertNotNull(expected);
+ assertNotNull(actual);
assertEquals(expected.version(), actual.version());
+ BiConsumer<ApiMessage, ApiMessage> asserter = API_MESSAGE_COMPARATORS
+ .getOrDefault(expected.message().getClass(),
API_MESSAGE_DEFAULT_COMPARATOR);
+ asserter.accept(expected.message(), actual.message());
+ }
- if (actual.message() instanceof
ConsumerGroupCurrentMemberAssignmentValue) {
- // The order of the topics stored in
ConsumerGroupCurrentMemberAssignmentValue is not
- // always guaranteed. Therefore, we need a special comparator.
- ConsumerGroupCurrentMemberAssignmentValue expectedValue =
- (ConsumerGroupCurrentMemberAssignmentValue) expected.message();
- ConsumerGroupCurrentMemberAssignmentValue actualValue =
- (ConsumerGroupCurrentMemberAssignmentValue) actual.message();
-
- assertEquals(expectedValue.memberEpoch(),
actualValue.memberEpoch());
- assertEquals(expectedValue.previousMemberEpoch(),
actualValue.previousMemberEpoch());
-
- // We transform those to Maps before comparing them.
-
assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()),
- fromTopicPartitions(actualValue.assignedPartitions()));
-
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()),
-
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
- } else if (actual.message() instanceof
ConsumerGroupPartitionMetadataValue) {
- // The order of the racks stored in the PartitionMetadata of the
ConsumerGroupPartitionMetadataValue
- // is not always guaranteed. Therefore, we need a special
comparator.
- ConsumerGroupPartitionMetadataValue expectedValue =
- (ConsumerGroupPartitionMetadataValue)
expected.message().duplicate();
- ConsumerGroupPartitionMetadataValue actualValue =
- (ConsumerGroupPartitionMetadataValue)
actual.message().duplicate();
-
- List<ConsumerGroupPartitionMetadataValue.TopicMetadata>
expectedTopicMetadataList =
- expectedValue.topics();
- List<ConsumerGroupPartitionMetadataValue.TopicMetadata>
actualTopicMetadataList =
- actualValue.topics();
-
- if (expectedTopicMetadataList.size() !=
actualTopicMetadataList.size()) {
- fail("Topic metadata lists have different sizes");
- }
+ private static void assertConsumerGroupCurrentMemberAssignmentValue(
+ ApiMessage exp,
+ ApiMessage act
+ ) {
+ // The order of the topics stored in
ConsumerGroupCurrentMemberAssignmentValue is not
+ // always guaranteed. Therefore, we need a special comparator.
+ ConsumerGroupCurrentMemberAssignmentValue expected =
(ConsumerGroupCurrentMemberAssignmentValue) exp.duplicate();
+ ConsumerGroupCurrentMemberAssignmentValue actual =
(ConsumerGroupCurrentMemberAssignmentValue) act.duplicate();
-
expectedTopicMetadataList.sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId));
-
actualTopicMetadataList.sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId));
-
- for (int i = 0; i < expectedTopicMetadataList.size(); i++) {
- ConsumerGroupPartitionMetadataValue.TopicMetadata
expectedTopicMetadata =
- expectedTopicMetadataList.get(i);
- ConsumerGroupPartitionMetadataValue.TopicMetadata
actualTopicMetadata =
- actualTopicMetadataList.get(i);
-
- assertEquals(expectedTopicMetadata.topicId(),
actualTopicMetadata.topicId());
- assertEquals(expectedTopicMetadata.topicName(),
actualTopicMetadata.topicName());
- assertEquals(expectedTopicMetadata.numPartitions(),
actualTopicMetadata.numPartitions());
-
- List<ConsumerGroupPartitionMetadataValue.PartitionMetadata>
expectedPartitionMetadataList =
- expectedTopicMetadata.partitionMetadata();
- List<ConsumerGroupPartitionMetadataValue.PartitionMetadata>
actualPartitionMetadataList =
- actualTopicMetadata.partitionMetadata();
-
- // If the list is empty, rack information wasn't available for
any replica of
- // the partition and hence, the entry wasn't added to the
record.
- if (expectedPartitionMetadataList.size() !=
actualPartitionMetadataList.size()) {
- fail("Partition metadata lists have different sizes");
- } else if (!expectedPartitionMetadataList.isEmpty() &&
!actualPartitionMetadataList.isEmpty()) {
- for (int j = 0; j < expectedPartitionMetadataList.size();
j++) {
- ConsumerGroupPartitionMetadataValue.PartitionMetadata
expectedPartitionMetadata =
- expectedPartitionMetadataList.get(j);
- ConsumerGroupPartitionMetadataValue.PartitionMetadata
actualPartitionMetadata =
- actualPartitionMetadataList.get(j);
-
- assertEquals(expectedPartitionMetadata.partition(),
actualPartitionMetadata.partition());
-
assertUnorderedListEquals(expectedPartitionMetadata.racks(),
actualPartitionMetadata.racks());
- }
- }
- }
- } else if (actual.message() instanceof GroupMetadataValue) {
- GroupMetadataValue expectedValue = (GroupMetadataValue)
expected.message().duplicate();
- GroupMetadataValue actualValue = (GroupMetadataValue)
actual.message().duplicate();
-
- Comparator<GroupMetadataValue.MemberMetadata> comparator =
-
Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId);
- expectedValue.members().sort(comparator);
- actualValue.members().sort(comparator);
+
Consumer<List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>>
sortTopicsAndPartitions = topicPartitions -> {
+
topicPartitions.sort(Comparator.comparing(ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId));
+ topicPartitions.forEach(topic ->
topic.partitions().sort(Integer::compareTo));
+ };
+
+ Consumer<ConsumerGroupCurrentMemberAssignmentValue> normalize =
message -> {
+ sortTopicsAndPartitions.accept(message.assignedPartitions());
+
sortTopicsAndPartitions.accept(message.partitionsPendingRevocation());
+ };
+
+ normalize.accept(expected);
+ normalize.accept(actual);
+
+ assertEquals(expected, actual);
+ }
+
+ private static void assertConsumerGroupPartitionMetadataValue(
+ ApiMessage exp,
+ ApiMessage act
+ ) {
+ // The order of the racks stored in the PartitionMetadata of the
ConsumerGroupPartitionMetadataValue
+ // is not always guaranteed. Therefore, we need a special comparator.
+ ConsumerGroupPartitionMetadataValue expected =
(ConsumerGroupPartitionMetadataValue) exp.duplicate();
+ ConsumerGroupPartitionMetadataValue actual =
(ConsumerGroupPartitionMetadataValue) act.duplicate();
+
+ Consumer<ConsumerGroupPartitionMetadataValue> normalize = message -> {
+
message.topics().sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId));
+ message.topics().forEach(topic -> {
+
topic.partitionMetadata().sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.PartitionMetadata::partition));
+ topic.partitionMetadata().forEach(partition ->
partition.racks().sort(String::compareTo));
+ });
+ };
+
+ normalize.accept(expected);
+ normalize.accept(actual);
+
+ assertEquals(expected, actual);
+ }
+
+ private static void assertShareGroupPartitionMetadataValue(
+ ApiMessage exp,
+ ApiMessage act
+ ) {
+ // The order of the racks stored in the PartitionMetadata of the
ShareGroupPartitionMetadataValue
+ // is not always guaranteed. Therefore, we need a special comparator.
+ ShareGroupPartitionMetadataValue expected =
(ShareGroupPartitionMetadataValue) exp.duplicate();
+ ShareGroupPartitionMetadataValue actual =
(ShareGroupPartitionMetadataValue) act.duplicate();
+
+ Consumer<ShareGroupPartitionMetadataValue> normalize = message -> {
+
message.topics().sort(Comparator.comparing(ShareGroupPartitionMetadataValue.TopicMetadata::topicId));
+ message.topics().forEach(topic -> {
+
topic.partitionMetadata().sort(Comparator.comparing(ShareGroupPartitionMetadataValue.PartitionMetadata::partition));
+ topic.partitionMetadata().forEach(partition ->
partition.racks().sort(String::compareTo));
+ });
+ };
+
+ normalize.accept(expected);
+ normalize.accept(actual);
+
+ assertEquals(expected, actual);
+ }
+
+ private static void assertGroupMetadataValue(
+ ApiMessage exp,
+ ApiMessage act
+ ) {
+ GroupMetadataValue expected = (GroupMetadataValue) exp.duplicate();
+ GroupMetadataValue actual = (GroupMetadataValue) act.duplicate();
+
+ Consumer<GroupMetadataValue> normalize = message -> {
+
message.members().sort(Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId));
try {
- Arrays.asList(expectedValue, actualValue).forEach(value ->
- value.members().forEach(memberMetadata -> {
- // Sort topics and ownedPartitions in Subscription.
- ConsumerPartitionAssignor.Subscription subscription =
-
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberMetadata.subscription()));
- subscription.topics().sort(String::compareTo);
- subscription.ownedPartitions().sort(
-
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
- );
-
memberMetadata.setSubscription(Utils.toArray(ConsumerProtocol.serializeSubscription(
- subscription,
-
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.subscription()))
- )));
-
- // Sort partitions in Assignment.
- ConsumerPartitionAssignor.Assignment assignment =
-
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(memberMetadata.assignment()));
- assignment.partitions().sort(
-
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
- );
-
memberMetadata.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
- assignment,
-
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.assignment()))
- )));
- })
- );
+ message.members().forEach(memberMetadata -> {
+ // Sort topics and ownedPartitions in Subscription.
+ ConsumerPartitionAssignor.Subscription subscription =
+
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberMetadata.subscription()));
+ subscription.topics().sort(String::compareTo);
+ subscription.ownedPartitions().sort(
+
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
+ );
+
memberMetadata.setSubscription(Utils.toArray(ConsumerProtocol.serializeSubscription(
+ subscription,
+
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.subscription()))
+ )));
+
+ // Sort partitions in Assignment.
+ ConsumerPartitionAssignor.Assignment assignment =
+
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(memberMetadata.assignment()));
+ assignment.partitions().sort(
+
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
+ );
+
memberMetadata.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
+ assignment,
+
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.assignment()))
+ )));
+ });
} catch (SchemaException ex) {
fail("Failed deserialization: " + ex.getMessage());
}
- assertEquals(expectedValue, actualValue);
- } else if (actual.message() instanceof
ConsumerGroupTargetAssignmentMemberValue) {
- ConsumerGroupTargetAssignmentMemberValue expectedValue =
- (ConsumerGroupTargetAssignmentMemberValue)
expected.message().duplicate();
- ConsumerGroupTargetAssignmentMemberValue actualValue =
- (ConsumerGroupTargetAssignmentMemberValue)
actual.message().duplicate();
-
-
Comparator<ConsumerGroupTargetAssignmentMemberValue.TopicPartition> comparator =
-
Comparator.comparing(ConsumerGroupTargetAssignmentMemberValue.TopicPartition::topicId);
- expectedValue.topicPartitions().sort(comparator);
- actualValue.topicPartitions().sort(comparator);
-
- assertEquals(expectedValue, actualValue);
- } else if (actual.message() instanceof
ShareGroupPartitionMetadataValue) {
- // The order of the racks stored in the PartitionMetadata of the
ShareGroupPartitionMetadataValue
- // is not always guaranteed. Therefore, we need a special
comparator.
- ShareGroupPartitionMetadataValue expectedValue =
- (ShareGroupPartitionMetadataValue)
expected.message().duplicate();
- ShareGroupPartitionMetadataValue actualValue =
- (ShareGroupPartitionMetadataValue)
actual.message().duplicate();
-
- List<ShareGroupPartitionMetadataValue.TopicMetadata>
expectedTopicMetadataList =
- expectedValue.topics();
- List<ShareGroupPartitionMetadataValue.TopicMetadata>
actualTopicMetadataList =
- actualValue.topics();
-
- if (expectedTopicMetadataList.size() !=
actualTopicMetadataList.size()) {
- fail("Topic metadata lists have different sizes");
- }
+ };
-
expectedTopicMetadataList.sort(Comparator.comparing(ShareGroupPartitionMetadataValue.TopicMetadata::topicId));
-
actualTopicMetadataList.sort(Comparator.comparing(ShareGroupPartitionMetadataValue.TopicMetadata::topicId));
-
- for (int i = 0; i < expectedTopicMetadataList.size(); i++) {
- ShareGroupPartitionMetadataValue.TopicMetadata
expectedTopicMetadata =
- expectedTopicMetadataList.get(i);
- ShareGroupPartitionMetadataValue.TopicMetadata
actualTopicMetadata =
- actualTopicMetadataList.get(i);
-
- assertEquals(expectedTopicMetadata.topicId(),
actualTopicMetadata.topicId());
- assertEquals(expectedTopicMetadata.topicName(),
actualTopicMetadata.topicName());
- assertEquals(expectedTopicMetadata.numPartitions(),
actualTopicMetadata.numPartitions());
-
- List<ShareGroupPartitionMetadataValue.PartitionMetadata>
expectedPartitionMetadataList =
- expectedTopicMetadata.partitionMetadata();
- List<ShareGroupPartitionMetadataValue.PartitionMetadata>
actualPartitionMetadataList =
- actualTopicMetadata.partitionMetadata();
-
- // If the list is empty, rack information wasn't available for
any replica of
- // the partition and hence, the entry wasn't added to the
record.
- if (expectedPartitionMetadataList.size() !=
actualPartitionMetadataList.size()) {
- fail("Partition metadata lists have different sizes");
- } else if (!expectedPartitionMetadataList.isEmpty() &&
!actualPartitionMetadataList.isEmpty()) {
- for (int j = 0; j < expectedPartitionMetadataList.size();
j++) {
- ShareGroupPartitionMetadataValue.PartitionMetadata
expectedPartitionMetadata =
- expectedPartitionMetadataList.get(j);
- ShareGroupPartitionMetadataValue.PartitionMetadata
actualPartitionMetadata =
- actualPartitionMetadataList.get(j);
-
- assertEquals(expectedPartitionMetadata.partition(),
actualPartitionMetadata.partition());
-
assertUnorderedListEquals(expectedPartitionMetadata.racks(),
actualPartitionMetadata.racks());
- }
- }
- }
- } else {
- assertEquals(expected.message(), actual.message());
- }
+ normalize.accept(expected);
+ normalize.accept(actual);
+
+ assertEquals(expected, actual);
}
- private static Map<Uuid, Set<Integer>> fromTopicPartitions(
- List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
assignment
+ private static void assertConsumerGroupTargetAssignmentMemberValue(
+ ApiMessage exp,
+ ApiMessage act
) {
- Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
- assignment.forEach(topicPartitions ->
- assignmentMap.put(topicPartitions.topicId(), new
HashSet<>(topicPartitions.partitions()))
- );
- return assignmentMap;
+ ConsumerGroupTargetAssignmentMemberValue expected =
(ConsumerGroupTargetAssignmentMemberValue) exp.duplicate();
+ ConsumerGroupTargetAssignmentMemberValue actual =
(ConsumerGroupTargetAssignmentMemberValue) act.duplicate();
+
+ Consumer<ConsumerGroupTargetAssignmentMemberValue> normalize = message
-> {
+
message.topicPartitions().sort(Comparator.comparing(ConsumerGroupTargetAssignmentMemberValue.TopicPartition::topicId));
+ message.topicPartitions().forEach(topic ->
topic.partitions().sort(Integer::compareTo));
+ };
+
+ normalize.accept(expected);
+ normalize.accept(actual);
+
+ assertEquals(expected, actual);
}
- public static void assertSyncGroupResponseEquals(
- SyncGroupResponseData expected,
- SyncGroupResponseData actual
+ private static void assertSyncGroupResponse(
+ ApiMessage exp,
+ ApiMessage act
) {
- SyncGroupResponseData expectedDuplicate = expected.duplicate();
- SyncGroupResponseData actualDuplicate = actual.duplicate();
+ SyncGroupResponseData expected = (SyncGroupResponseData)
exp.duplicate();
+ SyncGroupResponseData actual = (SyncGroupResponseData) act.duplicate();
- Arrays.asList(expectedDuplicate, actualDuplicate).forEach(duplicate ->
{
+ Consumer<SyncGroupResponseData> normalize = message -> {
try {
ConsumerPartitionAssignor.Assignment assignment =
-
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(duplicate.assignment()));
+
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(message.assignment()));
assignment.partitions().sort(
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
);
-
duplicate.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
+
message.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
assignment,
-
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(duplicate.assignment()))
+
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(message.assignment()))
)));
} catch (SchemaException ex) {
fail("Failed deserialization: " + ex.getMessage());
}
- });
- assertEquals(expectedDuplicate, actualDuplicate);
+ };
+
+ normalize.accept(expected);
+ normalize.accept(actual);
+
+ assertEquals(expected, actual);
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index f752fa82544..0fbecb8cbe8 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -109,7 +109,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
-import static
org.apache.kafka.coordinator.group.Assertions.assertSyncGroupResponseEquals;
+import static
org.apache.kafka.coordinator.group.Assertions.assertResponseEquals;
import static
org.apache.kafka.coordinator.group.GroupConfigManagerTest.createConfigManager;
import static
org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
import static
org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey;
@@ -1416,7 +1416,7 @@ public class GroupMetadataManagerTestContext {
// Simulate a successful write to log.
syncResult.appendFuture.complete(null);
- assertSyncGroupResponseEquals(
+ assertResponseEquals(
new SyncGroupResponseData()
.setProtocolType(protocolType)
.setProtocolName(protocolName)