mumrah commented on code in PR #14122:
URL: https://github.com/apache/kafka/pull/14122#discussion_r1298845661
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9119,4 +8742,257 @@ public void
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
HeartbeatResponseData heartbeatResponse =
context.sendGenericGroupHeartbeat(heartbeatRequest);
assertEquals(Errors.REBALANCE_IN_PROGRESS.code(),
heartbeatResponse.errorCode());
}
+
+ private static void assertNoOrEmptyResult(List<ExpiredTimeout<Void,
Record>> timeouts) {
+ assertTrue(timeouts.size() <= 1);
+ timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT,
timeout.result));
+ }
+
+ private List<String> verifyGenericGroupJoinResponses(
+ List<JoinResult> joinResults,
+ int expectedSuccessCount,
+ Errors expectedFailure
+ ) {
+ int successCount = 0;
+ List<String> memberIds = new ArrayList<>();
+ for (JoinResult joinResult : joinResults) {
+ if (!joinResult.joinFuture.isDone()) {
+ fail("All responseFutures should be completed.");
+ }
+ try {
+ JoinGroupResponseData joinResponse =
joinResult.joinFuture.get();
+ if (joinResponse.errorCode() == Errors.NONE.code()) {
+ successCount++;
+ } else {
+ assertEquals(expectedFailure.code(),
joinResponse.errorCode());
+ }
+ memberIds.add(joinResponse.memberId());
+ } catch (Exception e) {
+ fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+
+ assertEquals(expectedSuccessCount, successCount);
+ return memberIds;
+ }
+
+ private static JoinGroupRequestProtocolCollection toProtocols(String...
protocolNames) {
+ JoinGroupRequestProtocolCollection protocols = new
JoinGroupRequestProtocolCollection(0);
+ List<String> topicNames = Arrays.asList("foo", "bar", "baz");
+ for (int i = 0; i < protocolNames.length; i++) {
+ protocols.add(new JoinGroupRequestProtocol()
+ .setName(protocolNames[i])
+ .setMetadata(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList(topicNames.get(i %
topicNames.size())))).array())
+ );
+ }
+ return protocols;
+ }
+
+ private static Record newGroupMetadataRecord(
+ String groupId,
+ GroupMetadataValue value,
+ MetadataVersion metadataVersion
+ ) {
+ return new Record(
+ new ApiMessageAndVersion(
+ new GroupMetadataKey()
+ .setGroup(groupId),
+ (short) 2
+ ),
+ new ApiMessageAndVersion(
+ value,
+ metadataVersion.groupMetadataValueVersion()
+ )
+ );
+ }
+
+ private static Record newGroupMetadataRecordWithCurrentState(
+ GenericGroup group,
+ MetadataVersion metadataVersion
+ ) {
+ return RecordHelpers.newGroupMetadataRecord(group,
group.groupAssignment(), metadataVersion);
+ }
+
+ private static class JoinGroupRequestBuilder {
Review Comment:
I realize it's not really in scope here (since these Builder classes already
existed), but it's worth noting that the ApiMessage class support a fluent
style API already. So, we could probably do without these builders.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9119,4 +8742,257 @@ public void
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
HeartbeatResponseData heartbeatResponse =
context.sendGenericGroupHeartbeat(heartbeatRequest);
assertEquals(Errors.REBALANCE_IN_PROGRESS.code(),
heartbeatResponse.errorCode());
}
+
Review Comment:
Is it possible to keep these helper functions and classes in their original
locations? As it stands, it's hard to see if anything actually change. If
nothing did change, we should leave them where they are to minimize the diff
(unless there's a good reason to move them)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]