This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5a5239770ff MINOR: Refactor GroupCoordinator's Assertions (#17755)
5a5239770ff is described below

commit 5a5239770ff3565233e5cbecf11446e76339f8fe
Author: David Jacot <[email protected]>
AuthorDate: Tue Nov 12 14:30:58 2024 +0100

    MINOR: Refactor GroupCoordinator's Assertions (#17755)
    
    This patch cleans up the `Assertions` class in the `group-coordinator` 
module.
    
    Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../apache/kafka/coordinator/group/Assertions.java | 493 +++++++++------------
 .../group/GroupMetadataManagerTestContext.java     |   4 +-
 2 files changed, 210 insertions(+), 287 deletions(-)

diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
index 4206f7a690b..12cd0c1ab9f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
@@ -19,10 +19,10 @@ package org.apache.kafka.coordinator.group;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
@@ -36,20 +36,34 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.opentest4j.AssertionFailedError;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class Assertions {
+    private static final BiConsumer<ApiMessage, ApiMessage> 
API_MESSAGE_DEFAULT_COMPARATOR = org.junit.jupiter.api.Assertions::assertEquals;
+    private static final Map<Class<?>, BiConsumer<ApiMessage, ApiMessage>> 
API_MESSAGE_COMPARATORS = Map.of(
+        // Register request/response comparators.
+        ConsumerGroupHeartbeatResponseData.class, 
Assertions::assertConsumerGroupHeartbeatResponse,
+        ShareGroupHeartbeatResponseData.class, 
Assertions::assertShareGroupHeartbeatResponse,
+        SyncGroupResponseData.class, Assertions::assertSyncGroupResponse,
+
+        // Register record comparators.
+        ConsumerGroupCurrentMemberAssignmentValue.class, 
Assertions::assertConsumerGroupCurrentMemberAssignmentValue,
+        ConsumerGroupPartitionMetadataValue.class, 
Assertions::assertConsumerGroupPartitionMetadataValue,
+        GroupMetadataValue.class, Assertions::assertGroupMetadataValue,
+        ConsumerGroupTargetAssignmentMemberValue.class, 
Assertions::assertConsumerGroupTargetAssignmentMemberValue,
+        ShareGroupPartitionMetadataValue.class, 
Assertions::assertShareGroupPartitionMetadataValue
+    );
+
     public static <T> void assertUnorderedListEquals(
         List<T> expected,
         List<T> actual
@@ -58,101 +72,12 @@ public class Assertions {
     }
 
     public static void assertResponseEquals(
-        ConsumerGroupHeartbeatResponseData expected,
-        ConsumerGroupHeartbeatResponseData actual
-    ) {
-        if (!responseEquals(expected, actual)) {
-            assertionFailure()
-                .expected(expected)
-                .actual(actual)
-                .buildAndThrow();
-        }
-    }
-
-    public static void assertResponseEquals(
-        ShareGroupHeartbeatResponseData expected,
-        ShareGroupHeartbeatResponseData actual
-    ) {
-        if (!responseEquals(expected, actual)) {
-            assertionFailure()
-                .expected(expected)
-                .actual(actual)
-                .buildAndThrow();
-        }
-    }
-
-    private static boolean responseEquals(
-        ConsumerGroupHeartbeatResponseData expected,
-        ConsumerGroupHeartbeatResponseData actual
-    ) {
-        if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-        if (expected.errorCode() != actual.errorCode()) return false;
-        if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-        if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-        if (expected.memberEpoch() != actual.memberEpoch()) return false;
-        if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-        // Unordered comparison of the assignments.
-        return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-    }
-
-    private static boolean responseEquals(
-        ShareGroupHeartbeatResponseData expected,
-        ShareGroupHeartbeatResponseData actual
-    ) {
-        if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-        if (expected.errorCode() != actual.errorCode()) return false;
-        if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-        if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-        if (expected.memberEpoch() != actual.memberEpoch()) return false;
-        if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-        // Unordered comparison of the assignments.
-        return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-    }
-
-    private static boolean responseAssignmentEquals(
-        ConsumerGroupHeartbeatResponseData.Assignment expected,
-        ConsumerGroupHeartbeatResponseData.Assignment actual
-    ) {
-        if (expected == actual) return true;
-        if (expected == null) return false;
-        if (actual == null) return false;
-
-        return Objects.equals(fromAssignment(expected.topicPartitions()), 
fromAssignment(actual.topicPartitions()));
-    }
-
-    private static boolean responseAssignmentEquals(
-        ShareGroupHeartbeatResponseData.Assignment expected,
-        ShareGroupHeartbeatResponseData.Assignment actual
-    ) {
-        if (expected == actual) return true;
-        if (expected == null) return false;
-        if (actual == null) return false;
-
-        return 
Objects.equals(fromShareGroupAssignment(expected.topicPartitions()), 
fromShareGroupAssignment(actual.topicPartitions()));
-    }
-
-    private static Map<Uuid, Set<Integer>> fromAssignment(
-        List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment
-    ) {
-        if (assignment == null) return null;
-
-        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
-        assignment.forEach(topicPartitions ->
-            assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()))
-        );
-        return assignmentMap;
-    }
-
-    private static Map<Uuid, Set<Integer>> fromShareGroupAssignment(
-        List<ShareGroupHeartbeatResponseData.TopicPartitions> assignment
+        ApiMessage expected,
+        ApiMessage actual
     ) {
-        if (assignment == null) return null;
-
-        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
-        assignment.forEach(topicPartitions -> {
-            assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()));
-        });
-        return assignmentMap;
+        BiConsumer<ApiMessage, ApiMessage> asserter = API_MESSAGE_COMPARATORS
+            .getOrDefault(expected.getClass(), API_MESSAGE_DEFAULT_COMPARATOR);
+        asserter.accept(expected, actual);
     }
 
     public static void assertRecordsEquals(
@@ -190,221 +115,219 @@ public class Assertions {
         }
     }
 
-    @SuppressWarnings({ "CyclomaticComplexity", "MethodLength" })
+    private static void assertConsumerGroupHeartbeatResponse(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        ConsumerGroupHeartbeatResponseData expected = 
(ConsumerGroupHeartbeatResponseData) exp.duplicate();
+        ConsumerGroupHeartbeatResponseData actual = 
(ConsumerGroupHeartbeatResponseData) act.duplicate();
+
+        Consumer<ConsumerGroupHeartbeatResponseData> normalize = message -> {
+            if (message.assignment() != null) {
+                
message.assignment().topicPartitions().sort(Comparator.comparing(ConsumerGroupHeartbeatResponseData.TopicPartitions::topicId));
+                message.assignment().topicPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+            }
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
+    private static void assertShareGroupHeartbeatResponse(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        ShareGroupHeartbeatResponseData expected = 
(ShareGroupHeartbeatResponseData) exp.duplicate();
+        ShareGroupHeartbeatResponseData actual = 
(ShareGroupHeartbeatResponseData) act.duplicate();
+
+        Consumer<ShareGroupHeartbeatResponseData> normalize = message -> {
+            if (message.assignment() != null) {
+                
message.assignment().topicPartitions().sort(Comparator.comparing(ShareGroupHeartbeatResponseData.TopicPartitions::topicId));
+                message.assignment().topicPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+            }
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
     private static void assertApiMessageAndVersionEquals(
         ApiMessageAndVersion expected,
         ApiMessageAndVersion actual
     ) {
         if (expected == actual) return;
-
+        assertNotNull(expected);
+        assertNotNull(actual);
         assertEquals(expected.version(), actual.version());
+        BiConsumer<ApiMessage, ApiMessage> asserter = API_MESSAGE_COMPARATORS
+            .getOrDefault(expected.message().getClass(), 
API_MESSAGE_DEFAULT_COMPARATOR);
+        asserter.accept(expected.message(), actual.message());
+    }
 
-        if (actual.message() instanceof 
ConsumerGroupCurrentMemberAssignmentValue) {
-            // The order of the topics stored in 
ConsumerGroupCurrentMemberAssignmentValue is not
-            // always guaranteed. Therefore, we need a special comparator.
-            ConsumerGroupCurrentMemberAssignmentValue expectedValue =
-                (ConsumerGroupCurrentMemberAssignmentValue) expected.message();
-            ConsumerGroupCurrentMemberAssignmentValue actualValue =
-                (ConsumerGroupCurrentMemberAssignmentValue) actual.message();
-
-            assertEquals(expectedValue.memberEpoch(), 
actualValue.memberEpoch());
-            assertEquals(expectedValue.previousMemberEpoch(), 
actualValue.previousMemberEpoch());
-
-            // We transform those to Maps before comparing them.
-            
assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()),
-                fromTopicPartitions(actualValue.assignedPartitions()));
-            
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()),
-                
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
-        } else if (actual.message() instanceof 
ConsumerGroupPartitionMetadataValue) {
-            // The order of the racks stored in the PartitionMetadata of the 
ConsumerGroupPartitionMetadataValue
-            // is not always guaranteed. Therefore, we need a special 
comparator.
-            ConsumerGroupPartitionMetadataValue expectedValue =
-                (ConsumerGroupPartitionMetadataValue) 
expected.message().duplicate();
-            ConsumerGroupPartitionMetadataValue actualValue =
-                (ConsumerGroupPartitionMetadataValue) 
actual.message().duplicate();
-
-            List<ConsumerGroupPartitionMetadataValue.TopicMetadata> 
expectedTopicMetadataList =
-                expectedValue.topics();
-            List<ConsumerGroupPartitionMetadataValue.TopicMetadata> 
actualTopicMetadataList =
-                actualValue.topics();
-
-            if (expectedTopicMetadataList.size() != 
actualTopicMetadataList.size()) {
-                fail("Topic metadata lists have different sizes");
-            }
+    private static void assertConsumerGroupCurrentMemberAssignmentValue(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        // The order of the topics stored in 
ConsumerGroupCurrentMemberAssignmentValue is not
+        // always guaranteed. Therefore, we need a special comparator.
+        ConsumerGroupCurrentMemberAssignmentValue expected = 
(ConsumerGroupCurrentMemberAssignmentValue) exp.duplicate();
+        ConsumerGroupCurrentMemberAssignmentValue actual = 
(ConsumerGroupCurrentMemberAssignmentValue) act.duplicate();
 
-            
expectedTopicMetadataList.sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId));
-            
actualTopicMetadataList.sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId));
-
-            for (int i = 0; i < expectedTopicMetadataList.size(); i++) {
-                ConsumerGroupPartitionMetadataValue.TopicMetadata 
expectedTopicMetadata =
-                    expectedTopicMetadataList.get(i);
-                ConsumerGroupPartitionMetadataValue.TopicMetadata 
actualTopicMetadata =
-                    actualTopicMetadataList.get(i);
-
-                assertEquals(expectedTopicMetadata.topicId(), 
actualTopicMetadata.topicId());
-                assertEquals(expectedTopicMetadata.topicName(), 
actualTopicMetadata.topicName());
-                assertEquals(expectedTopicMetadata.numPartitions(), 
actualTopicMetadata.numPartitions());
-
-                List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> 
expectedPartitionMetadataList =
-                    expectedTopicMetadata.partitionMetadata();
-                List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> 
actualPartitionMetadataList =
-                    actualTopicMetadata.partitionMetadata();
-
-                // If the list is empty, rack information wasn't available for 
any replica of
-                // the partition and hence, the entry wasn't added to the 
record.
-                if (expectedPartitionMetadataList.size() != 
actualPartitionMetadataList.size()) {
-                    fail("Partition metadata lists have different sizes");
-                } else if (!expectedPartitionMetadataList.isEmpty() && 
!actualPartitionMetadataList.isEmpty()) {
-                    for (int j = 0; j < expectedPartitionMetadataList.size(); 
j++) {
-                        ConsumerGroupPartitionMetadataValue.PartitionMetadata 
expectedPartitionMetadata =
-                            expectedPartitionMetadataList.get(j);
-                        ConsumerGroupPartitionMetadataValue.PartitionMetadata 
actualPartitionMetadata =
-                            actualPartitionMetadataList.get(j);
-
-                        assertEquals(expectedPartitionMetadata.partition(), 
actualPartitionMetadata.partition());
-                        
assertUnorderedListEquals(expectedPartitionMetadata.racks(), 
actualPartitionMetadata.racks());
-                    }
-                }
-            }
-        } else if (actual.message() instanceof GroupMetadataValue) {
-            GroupMetadataValue expectedValue = (GroupMetadataValue) 
expected.message().duplicate();
-            GroupMetadataValue actualValue = (GroupMetadataValue) 
actual.message().duplicate();
-
-            Comparator<GroupMetadataValue.MemberMetadata> comparator =
-                
Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId);
-            expectedValue.members().sort(comparator);
-            actualValue.members().sort(comparator);
+        
Consumer<List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>> 
sortTopicsAndPartitions = topicPartitions -> {
+            
topicPartitions.sort(Comparator.comparing(ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId));
+            topicPartitions.forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+        };
+
+        Consumer<ConsumerGroupCurrentMemberAssignmentValue> normalize = 
message -> {
+            sortTopicsAndPartitions.accept(message.assignedPartitions());
+            
sortTopicsAndPartitions.accept(message.partitionsPendingRevocation());
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
+    private static void assertConsumerGroupPartitionMetadataValue(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        // The order of the racks stored in the PartitionMetadata of the 
ConsumerGroupPartitionMetadataValue
+        // is not always guaranteed. Therefore, we need a special comparator.
+        ConsumerGroupPartitionMetadataValue expected = 
(ConsumerGroupPartitionMetadataValue) exp.duplicate();
+        ConsumerGroupPartitionMetadataValue actual = 
(ConsumerGroupPartitionMetadataValue) act.duplicate();
+
+        Consumer<ConsumerGroupPartitionMetadataValue> normalize = message -> {
+            
message.topics().sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId));
+            message.topics().forEach(topic -> {
+                
topic.partitionMetadata().sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.PartitionMetadata::partition));
+                topic.partitionMetadata().forEach(partition -> 
partition.racks().sort(String::compareTo));
+            });
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
+    private static void assertShareGroupPartitionMetadataValue(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        // The order of the racks stored in the PartitionMetadata of the 
ShareGroupPartitionMetadataValue
+        // is not always guaranteed. Therefore, we need a special comparator.
+        ShareGroupPartitionMetadataValue expected = 
(ShareGroupPartitionMetadataValue) exp.duplicate();
+        ShareGroupPartitionMetadataValue actual = 
(ShareGroupPartitionMetadataValue) act.duplicate();
+
+        Consumer<ShareGroupPartitionMetadataValue> normalize = message -> {
+            
message.topics().sort(Comparator.comparing(ShareGroupPartitionMetadataValue.TopicMetadata::topicId));
+            message.topics().forEach(topic -> {
+                
topic.partitionMetadata().sort(Comparator.comparing(ShareGroupPartitionMetadataValue.PartitionMetadata::partition));
+                topic.partitionMetadata().forEach(partition -> 
partition.racks().sort(String::compareTo));
+            });
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
+    private static void assertGroupMetadataValue(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        GroupMetadataValue expected = (GroupMetadataValue) exp.duplicate();
+        GroupMetadataValue actual = (GroupMetadataValue) act.duplicate();
+
+        Consumer<GroupMetadataValue> normalize = message -> {
+            
message.members().sort(Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId));
             try {
-                Arrays.asList(expectedValue, actualValue).forEach(value ->
-                    value.members().forEach(memberMetadata -> {
-                        // Sort topics and ownedPartitions in Subscription.
-                        ConsumerPartitionAssignor.Subscription subscription =
-                            
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberMetadata.subscription()));
-                        subscription.topics().sort(String::compareTo);
-                        subscription.ownedPartitions().sort(
-                            
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
-                        );
-                        
memberMetadata.setSubscription(Utils.toArray(ConsumerProtocol.serializeSubscription(
-                            subscription,
-                            
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.subscription()))
-                        )));
-
-                        // Sort partitions in Assignment.
-                        ConsumerPartitionAssignor.Assignment assignment =
-                            
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(memberMetadata.assignment()));
-                        assignment.partitions().sort(
-                            
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
-                        );
-                        
memberMetadata.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
-                            assignment,
-                            
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.assignment()))
-                        )));
-                    })
-                );
+                message.members().forEach(memberMetadata -> {
+                    // Sort topics and ownedPartitions in Subscription.
+                    ConsumerPartitionAssignor.Subscription subscription =
+                        
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberMetadata.subscription()));
+                    subscription.topics().sort(String::compareTo);
+                    subscription.ownedPartitions().sort(
+                        
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
+                    );
+                    
memberMetadata.setSubscription(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                        subscription,
+                        
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.subscription()))
+                    )));
+
+                    // Sort partitions in Assignment.
+                    ConsumerPartitionAssignor.Assignment assignment =
+                        
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(memberMetadata.assignment()));
+                    assignment.partitions().sort(
+                        
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
+                    );
+                    
memberMetadata.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
+                        assignment,
+                        
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.assignment()))
+                    )));
+                });
             } catch (SchemaException ex) {
                 fail("Failed deserialization: " + ex.getMessage());
             }
-            assertEquals(expectedValue, actualValue);
-        } else if (actual.message() instanceof 
ConsumerGroupTargetAssignmentMemberValue) {
-            ConsumerGroupTargetAssignmentMemberValue expectedValue =
-                (ConsumerGroupTargetAssignmentMemberValue) 
expected.message().duplicate();
-            ConsumerGroupTargetAssignmentMemberValue actualValue =
-                (ConsumerGroupTargetAssignmentMemberValue) 
actual.message().duplicate();
-
-            
Comparator<ConsumerGroupTargetAssignmentMemberValue.TopicPartition> comparator =
-                
Comparator.comparing(ConsumerGroupTargetAssignmentMemberValue.TopicPartition::topicId);
-            expectedValue.topicPartitions().sort(comparator);
-            actualValue.topicPartitions().sort(comparator);
-
-            assertEquals(expectedValue, actualValue);
-        } else if (actual.message() instanceof 
ShareGroupPartitionMetadataValue) {
-            // The order of the racks stored in the PartitionMetadata of the 
ShareGroupPartitionMetadataValue
-            // is not always guaranteed. Therefore, we need a special 
comparator.
-            ShareGroupPartitionMetadataValue expectedValue =
-                (ShareGroupPartitionMetadataValue) 
expected.message().duplicate();
-            ShareGroupPartitionMetadataValue actualValue =
-                (ShareGroupPartitionMetadataValue) 
actual.message().duplicate();
-
-            List<ShareGroupPartitionMetadataValue.TopicMetadata> 
expectedTopicMetadataList =
-                expectedValue.topics();
-            List<ShareGroupPartitionMetadataValue.TopicMetadata> 
actualTopicMetadataList =
-                actualValue.topics();
-
-            if (expectedTopicMetadataList.size() != 
actualTopicMetadataList.size()) {
-                fail("Topic metadata lists have different sizes");
-            }
+        };
 
-            
expectedTopicMetadataList.sort(Comparator.comparing(ShareGroupPartitionMetadataValue.TopicMetadata::topicId));
-            
actualTopicMetadataList.sort(Comparator.comparing(ShareGroupPartitionMetadataValue.TopicMetadata::topicId));
-
-            for (int i = 0; i < expectedTopicMetadataList.size(); i++) {
-                ShareGroupPartitionMetadataValue.TopicMetadata 
expectedTopicMetadata =
-                    expectedTopicMetadataList.get(i);
-                ShareGroupPartitionMetadataValue.TopicMetadata 
actualTopicMetadata =
-                    actualTopicMetadataList.get(i);
-
-                assertEquals(expectedTopicMetadata.topicId(), 
actualTopicMetadata.topicId());
-                assertEquals(expectedTopicMetadata.topicName(), 
actualTopicMetadata.topicName());
-                assertEquals(expectedTopicMetadata.numPartitions(), 
actualTopicMetadata.numPartitions());
-
-                List<ShareGroupPartitionMetadataValue.PartitionMetadata> 
expectedPartitionMetadataList =
-                    expectedTopicMetadata.partitionMetadata();
-                List<ShareGroupPartitionMetadataValue.PartitionMetadata> 
actualPartitionMetadataList =
-                    actualTopicMetadata.partitionMetadata();
-
-                // If the list is empty, rack information wasn't available for 
any replica of
-                // the partition and hence, the entry wasn't added to the 
record.
-                if (expectedPartitionMetadataList.size() != 
actualPartitionMetadataList.size()) {
-                    fail("Partition metadata lists have different sizes");
-                } else if (!expectedPartitionMetadataList.isEmpty() && 
!actualPartitionMetadataList.isEmpty()) {
-                    for (int j = 0; j < expectedPartitionMetadataList.size(); 
j++) {
-                        ShareGroupPartitionMetadataValue.PartitionMetadata 
expectedPartitionMetadata =
-                            expectedPartitionMetadataList.get(j);
-                        ShareGroupPartitionMetadataValue.PartitionMetadata 
actualPartitionMetadata =
-                            actualPartitionMetadataList.get(j);
-
-                        assertEquals(expectedPartitionMetadata.partition(), 
actualPartitionMetadata.partition());
-                        
assertUnorderedListEquals(expectedPartitionMetadata.racks(), 
actualPartitionMetadata.racks());
-                    }
-                }
-            }
-        } else {
-            assertEquals(expected.message(), actual.message());
-        }
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
     }
 
-    private static Map<Uuid, Set<Integer>> fromTopicPartitions(
-        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
assignment
+    private static void assertConsumerGroupTargetAssignmentMemberValue(
+        ApiMessage exp,
+        ApiMessage act
     ) {
-        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
-        assignment.forEach(topicPartitions ->
-            assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()))
-        );
-        return assignmentMap;
+        ConsumerGroupTargetAssignmentMemberValue expected = 
(ConsumerGroupTargetAssignmentMemberValue) exp.duplicate();
+        ConsumerGroupTargetAssignmentMemberValue actual = 
(ConsumerGroupTargetAssignmentMemberValue) act.duplicate();
+
+        Consumer<ConsumerGroupTargetAssignmentMemberValue> normalize = message 
-> {
+            
message.topicPartitions().sort(Comparator.comparing(ConsumerGroupTargetAssignmentMemberValue.TopicPartition::topicId));
+            message.topicPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
     }
 
-    public static void assertSyncGroupResponseEquals(
-        SyncGroupResponseData expected,
-        SyncGroupResponseData actual
+    private static void assertSyncGroupResponse(
+        ApiMessage exp,
+        ApiMessage act
     ) {
-        SyncGroupResponseData expectedDuplicate = expected.duplicate();
-        SyncGroupResponseData actualDuplicate = actual.duplicate();
+        SyncGroupResponseData expected = (SyncGroupResponseData) 
exp.duplicate();
+        SyncGroupResponseData actual = (SyncGroupResponseData) act.duplicate();
 
-        Arrays.asList(expectedDuplicate, actualDuplicate).forEach(duplicate -> 
{
+        Consumer<SyncGroupResponseData> normalize = message -> {
             try {
                 ConsumerPartitionAssignor.Assignment assignment =
-                    
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(duplicate.assignment()));
+                    
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(message.assignment()));
                 assignment.partitions().sort(
                     
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
                 );
-                
duplicate.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
+                
message.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
                     assignment,
-                    
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(duplicate.assignment()))
+                    
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(message.assignment()))
                 )));
             } catch (SchemaException ex) {
                 fail("Failed deserialization: " + ex.getMessage());
             }
-        });
-        assertEquals(expectedDuplicate, actualDuplicate);
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index f752fa82544..0fbecb8cbe8 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -109,7 +109,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static 
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
-import static 
org.apache.kafka.coordinator.group.Assertions.assertSyncGroupResponseEquals;
+import static 
org.apache.kafka.coordinator.group.Assertions.assertResponseEquals;
 import static 
org.apache.kafka.coordinator.group.GroupConfigManagerTest.createConfigManager;
 import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
 import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey;
@@ -1416,7 +1416,7 @@ public class GroupMetadataManagerTestContext {
 
         // Simulate a successful write to log.
         syncResult.appendFuture.complete(null);
-        assertSyncGroupResponseEquals(
+        assertResponseEquals(
             new SyncGroupResponseData()
                 .setProtocolType(protocolType)
                 .setProtocolName(protocolName)

Reply via email to