This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit dc5b2ef7ecd259fca8dc729de25339fce3bd7e04
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Aug 16 19:58:35 2024 +0200

    Resolve conflict from 11/25 trunk rebase - Rebase on AK trunk 2024-08-15
---
 .../internals/AbstractMembershipManager.java       |  1 +
 .../consumer/internals/RequestManagers.java        |  4 ++-
 .../StreamsGroupHeartbeatRequestManager.java       | 18 ++++++-------
 .../StreamsGroupHeartbeatRequestManagerTest.java   |  4 +--
 .../events/ApplicationEventProcessorTest.java      |  2 --
 .../group/GroupCoordinatorRecordHelpers.java       |  4 +--
 .../coordinator/group/GroupMetadataManager.java    |  2 +-
 .../streams/CoordinatorStreamsRecordHelpers.java   | 31 ++++++++++++----------
 .../StreamsGroupCurrentMemberAssignmentKey.json    |  6 ++---
 .../message/StreamsGroupMemberMetadataKey.json     |  6 ++---
 .../common/message/StreamsGroupMetadataKey.json    |  4 +--
 .../message/StreamsGroupPartitionMetadataKey.json  |  4 +--
 .../StreamsGroupTargetAssignmentMemberKey.json     |  6 ++---
 .../StreamsGroupTargetAssignmentMetadataKey.json   |  4 +--
 .../common/message/StreamsGroupTopologyKey.json    |  4 +--
 .../group/GroupCoordinatorServiceTest.java         | 30 ++++++++++++++-------
 .../CoordinatorStreamsRecordHelpersTest.java       |  2 +-
 .../group/streams/StreamsGroupTest.java            |  6 ++---
 .../assignment/TaskAssignorConvergenceTest.java    |  4 +--
 19 files changed, 78 insertions(+), 64 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index c6aa70d805e..049944ed09f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -1270,6 +1270,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
 
     protected void updateMemberEpoch(int newEpoch) {
         boolean newEpochReceived = this.memberEpoch != newEpoch;
+        log.info("Updating member epoch to {}", newEpoch);
         this.memberEpoch = newEpoch;
         // Simply notify based on epoch changes only, since the member will 
generate a member ID
         // at startup, and it will remain unchanged for its entire lifetime.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 0f7e7cf1178..7cf4a3e67f6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -70,7 +70,7 @@ public class RequestManagers implements Closeable {
                            Optional<CoordinatorRequestManager> 
coordinatorRequestManager,
                            Optional<CommitRequestManager> commitRequestManager,
                            Optional<ConsumerHeartbeatRequestManager> 
heartbeatRequestManager,
-                           Optional<ConsumerMembershipManager> 
membershipManager) {
+                           Optional<ConsumerMembershipManager> 
membershipManager,
                            Optional<StreamsGroupHeartbeatRequestManager> 
streamsHeartbeatRequestManager,
                            Optional<StreamsInitializeRequestManager> 
streamsInitializeRequestManager) {
         this.log = logContext.logger(RequestManagers.class);
@@ -113,6 +113,8 @@ public class RequestManagers implements Closeable {
         this.shareHeartbeatRequestManager = shareHeartbeatRequestManager;
         this.consumerMembershipManager = Optional.empty();
         this.shareMembershipManager = shareMembershipManager;
+        this.streamsHeartbeatRequestManager = Optional.empty();
+        this.streamsInitializeRequestManager = Optional.empty();
         this.offsetsRequestManager = null;
         this.topicMetadataRequestManager = null;
         this.fetchRequestManager = null;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 7b3713b4131..2b3b7c8f248 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -61,7 +61,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
     private final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState;
 
-    private final MembershipManager membershipManager;
+    private final ConsumerMembershipManager membershipManager;
 
     private final StreamsInitializeRequestManager 
streamsInitializeRequestManager;
 
@@ -83,7 +83,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
         final ConsumerConfig config,
         final CoordinatorRequestManager coordinatorRequestManager,
         final StreamsInitializeRequestManager streamsInitializeRequestManager,
-        final MembershipManager membershipManager,
+        final ConsumerMembershipManager membershipManager,
         final BackgroundEventHandler backgroundEventHandler,
         final Metrics metrics,
         final StreamsAssignmentInterface streamsAssignmentInterface,
@@ -158,7 +158,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
                                                                      final 
boolean ignoreResponse) {
         NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
         heartbeatRequestState.onSendAttempt(currentTimeMs);
-        membershipManager.onHeartbeatRequestSent();
+        membershipManager.onHeartbeatRequestGenerated();
         metricsManager.recordHeartbeatSentMs(currentTimeMs);
         return request;
     }
@@ -335,7 +335,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
         this.heartbeatState.reset();
         this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
-        membershipManager.onHeartbeatFailure();
+        membershipManager.onHeartbeatFailure(false);
 
         switch (error) {
             case NOT_COORDINATOR:
@@ -503,7 +503,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
     static class HeartbeatState {
 
-        private final MembershipManager membershipManager;
+        private final ConsumerMembershipManager membershipManager;
         private final int rebalanceTimeoutMs;
         private final 
StreamsGroupHeartbeatRequestManager.HeartbeatState.SentFields sentFields;
 
@@ -514,7 +514,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
         public HeartbeatState(
             final StreamsAssignmentInterface streamsInterface,
-            final MembershipManager membershipManager,
+            final ConsumerMembershipManager membershipManager,
             final int rebalanceTimeoutMs) {
             this.membershipManager = membershipManager;
             this.rebalanceTimeoutMs = rebalanceTimeoutMs;
@@ -552,9 +552,9 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
             // Immutable -- only sent when joining
             if (joining) {
                 data.setProcessId(streamsInterface.processID().toString());
-//                data.setActiveTasks(Collections.emptyList());
-//                data.setStandbyTasks(Collections.emptyList());
-//                data.setWarmupTasks(Collections.emptyList());
+                data.setActiveTasks(Collections.emptyList());
+                data.setStandbyTasks(Collections.emptyList());
+                data.setWarmupTasks(Collections.emptyList());
                 streamsInterface.endpoint().ifPresent(streamsEndpoint -> {
                     data.setUserEndpoint(new 
StreamsGroupHeartbeatRequestData.Endpoint()
                         .setHost(streamsEndpoint.host)
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index f4c67e1d9b9..aed1b4dafd7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -55,7 +55,6 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -74,6 +73,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
     public static final int TEST_MEMBER_EPOCH = 5;
     public static final String TEST_INSTANCE_ID = "instanceId";
     public static final int TEST_THROTTLE_TIME_MS = 5;
+    private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
     private StreamsGroupHeartbeatRequestManager heartbeatRequestManager;
 
     private Time time;
@@ -89,7 +89,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
     private StreamsInitializeRequestManager streamsInitializeRequestManager;
 
     @Mock
-    private MembershipManager membershipManager;
+    private ConsumerMembershipManager membershipManager;
 
     @Mock
     private BackgroundEventHandler backgroundEventHandler;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 28f938c2657..c56bef00c9f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -97,8 +97,6 @@ public class ApplicationEventProcessorTest {
                 withGroupId ? Optional.of(heartbeatRequestManager) : 
Optional.empty(),
                 withGroupId ? Optional.of(membershipManager) : 
Optional.empty(),
                 Optional.empty(),
-                Optional.empty(),
-                Optional.empty(),
                 Optional.empty()
         );
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index 63d5523f137..7b7a2d701f8 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -271,7 +271,7 @@ public class GroupCoordinatorRecordHelpers {
      * @param memberId The consumer group member id.
      * @return The record.
      */
-    public static CoordinatorRecord newTargetAssignmentTombstoneRecord(
+    public static CoordinatorRecord 
newConsumerGroupTargetAssignmentTombstoneRecord(
         String groupId,
         String memberId
     ) {
@@ -317,7 +317,7 @@ public class GroupCoordinatorRecordHelpers {
      * @param groupId The consumer group id.
      * @return The record.
      */
-    public static CoordinatorRecord 
newConsumerTargetAssignmentEpochTombstoneRecord(
+    public static CoordinatorRecord 
newConsumerGroupTargetAssignmentEpochTombstoneRecord(
         String groupId
     ) {
         return new CoordinatorRecord(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index e934304b168..3e51e4b798a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3534,7 +3534,7 @@ public class GroupMetadataManager {
         if (!updatedMember.equals(member)) {
             records.add(newStreamsGroupMemberRecord(groupId, updatedMember));
 
-            if (updatedMember.topologyId().equals(member.topologyId())) {
+            if (!updatedMember.topologyId().equals(member.topologyId())) {
                 log.info("[GroupId {}] Member {} updated its topology ID to: 
{}.",
                     groupId, memberId, updatedMember.topologyId());
                 return true;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
index f5cac3026aa..2f48151e137 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
@@ -58,7 +58,7 @@ public class CoordinatorStreamsRecordHelpers {
                 new StreamsGroupMemberMetadataKey()
                     .setGroupId(groupId)
                     .setMemberId(member.memberId()),
-                (short) 14
+                (short) 17
             ),
             new ApiMessageAndVersion(
                 new StreamsGroupMemberMetadataValue()
@@ -96,7 +96,7 @@ public class CoordinatorStreamsRecordHelpers {
                 new StreamsGroupMemberMetadataKey()
                     .setGroupId(groupId)
                     .setMemberId(memberId),
-                (short) 14
+                (short) 17
             ),
             null // Tombstone.
         );
@@ -137,7 +137,7 @@ public class CoordinatorStreamsRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupPartitionMetadataKey()
                     .setGroupId(groupId),
-                (short) 13
+                (short) 16
             ),
             new ApiMessageAndVersion(
                 value,
@@ -159,7 +159,7 @@ public class CoordinatorStreamsRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupPartitionMetadataKey()
                     .setGroupId(groupId),
-                (short) 13
+                (short) 16
             ),
             null // Tombstone.
         );
@@ -173,7 +173,7 @@ public class CoordinatorStreamsRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupMetadataKey()
                     .setGroupId(groupId),
-                (short) 12
+                (short) 15
             ),
             new ApiMessageAndVersion(
                 new StreamsGroupMetadataValue()
@@ -196,7 +196,7 @@ public class CoordinatorStreamsRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupMetadataKey()
                     .setGroupId(groupId),
-                (short) 12
+                (short) 15
             ),
             null // Tombstone.
         );
@@ -239,7 +239,7 @@ public class CoordinatorStreamsRecordHelpers {
                 new StreamsGroupTargetAssignmentMemberKey()
                     .setGroupId(groupId)
                     .setMemberId(memberId),
-                (short) 16
+                (short) 19
             ),
             new ApiMessageAndVersion(
                 new StreamsGroupTargetAssignmentMemberValue()
@@ -267,7 +267,7 @@ public class CoordinatorStreamsRecordHelpers {
                 new StreamsGroupTargetAssignmentMemberKey()
                     .setGroupId(groupId)
                     .setMemberId(memberId),
-                (short) 16
+                (short) 19
             ),
             null // Tombstone.
         );
@@ -282,7 +282,7 @@ public class CoordinatorStreamsRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupTargetAssignmentMetadataKey()
                     .setGroupId(groupId),
-                (short) 15
+                (short) 18
             ),
             new ApiMessageAndVersion(
                 new StreamsGroupTargetAssignmentMetadataValue()
@@ -305,7 +305,7 @@ public class CoordinatorStreamsRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupTargetAssignmentMetadataKey()
                     .setGroupId(groupId),
-                (short) 15
+                (short) 18
             ),
             null // Tombstone.
         );
@@ -320,7 +320,7 @@ public class CoordinatorStreamsRecordHelpers {
                 new StreamsGroupCurrentMemberAssignmentKey()
                     .setGroupId(groupId)
                     .setMemberId(member.memberId()),
-                (short) 17
+                (short) 20
             ),
             new ApiMessageAndVersion(
                 new StreamsGroupCurrentMemberAssignmentValue()
@@ -352,7 +352,7 @@ public class CoordinatorStreamsRecordHelpers {
                 new StreamsGroupCurrentMemberAssignmentKey()
                     .setGroupId(groupId)
                     .setMemberId(memberId),
-                (short) 17
+                (short) 20
             ),
             null // Tombstone
         );
@@ -402,7 +402,10 @@ public class CoordinatorStreamsRecordHelpers {
                 
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
         });
 
-        return new CoordinatorRecord(new ApiMessageAndVersion(new 
StreamsGroupTopologyKey().setGroupId(groupId), (short) 18),
+        return new CoordinatorRecord(new ApiMessageAndVersion(
+            new StreamsGroupTopologyKey()
+                .setGroupId(groupId),
+            (short) 21),
             new ApiMessageAndVersion(value, (short) 0));
     }
 
@@ -419,7 +422,7 @@ public class CoordinatorStreamsRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupTopologyKey()
                     .setGroupId(groupId),
-                (short) 18
+                (short) 21
             ),
             null // Tombstone
         );
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
index 771f7324a9e..a99e32754c6 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json
@@ -17,12 +17,12 @@
 {
   "type": "data",
   "name": "StreamsGroupCurrentMemberAssignmentKey",
-  "validVersions": "17",
+  "validVersions": "20",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "17",
+    { "name": "GroupId", "type": "string", "versions": "20",
       "about": "The group id." },
-    { "name": "MemberId", "type": "string", "versions": "17",
+    { "name": "MemberId", "type": "string", "versions": "20",
       "about": "The member id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
index bac9ac247cf..8e0f66deba7 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json
@@ -17,12 +17,12 @@
 {
   "type": "data",
   "name": "StreamsGroupMemberMetadataKey",
-  "validVersions": "14",
+  "validVersions": "17",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "14",
+    { "name": "GroupId", "type": "string", "versions": "17",
       "about": "The group id." },
-    { "name": "MemberId", "type": "string", "versions": "14",
+    { "name": "MemberId", "type": "string", "versions": "17",
       "about": "The member id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
index 26121bf2ba2..6c54fb8bd14 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json
@@ -17,10 +17,10 @@
 {
   "type": "data",
   "name": "StreamsGroupMetadataKey",
-  "validVersions": "12",
+  "validVersions": "15",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "12",
+    { "name": "GroupId", "type": "string", "versions": "15",
       "about": "The group id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
index 546a8f80535..0d91a992d0c 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
@@ -17,10 +17,10 @@
 {
   "type": "data",
   "name": "StreamsGroupPartitionMetadataKey",
-  "validVersions": "13",
+  "validVersions": "16",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "13",
+    { "name": "GroupId", "type": "string", "versions": "16",
       "about": "The group id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
index 4fc8231ec3d..f96f6f89e01 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json
@@ -17,12 +17,12 @@
 {
   "type": "data",
   "name": "StreamsGroupTargetAssignmentMemberKey",
-  "validVersions": "16",
+  "validVersions": "19",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "16",
+    { "name": "GroupId", "type": "string", "versions": "19",
       "about": "The group id." },
-    { "name": "MemberId", "type": "string", "versions": "16",
+    { "name": "MemberId", "type": "string", "versions": "19",
       "about": "The member id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
index 02b40f727c4..514885f8ad7 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json
@@ -17,10 +17,10 @@
 {
   "type": "data",
   "name": "StreamsGroupTargetAssignmentMetadataKey",
-  "validVersions": "15",
+  "validVersions": "18",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "15",
+    { "name": "GroupId", "type": "string", "versions": "18",
       "about": "The group id." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
index 261b755cd51..2e51c117bd7 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json
@@ -17,10 +17,10 @@
 {
   "type": "data",
   "name": "StreamsGroupTopologyKey",
-  "validVersions": "18",
+  "validVersions": "21",
   "flexibleVersions": "none",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "18",
+    { "name": "GroupId", "type": "string", "versions": "21",
       "about": "The group id." }
   ]
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 5b1aad09027..dfd898c225c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -273,7 +273,8 @@ public class GroupCoordinatorServiceTest {
             new LogContext(),
             createConfig(),
             runtime,
-            new GroupCoordinatorMetrics()
+            new GroupCoordinatorMetrics(),
+            createConfigManager()
         );
 
         StreamsGroupInitializeRequestData request = new 
StreamsGroupInitializeRequestData()
@@ -298,7 +299,8 @@ public class GroupCoordinatorServiceTest {
             new LogContext(),
             createConfig(),
             runtime,
-            new GroupCoordinatorMetrics()
+            new GroupCoordinatorMetrics(),
+            createConfigManager()
         );
 
         StreamsGroupInitializeRequestData request = new 
StreamsGroupInitializeRequestData()
@@ -350,7 +352,8 @@ public class GroupCoordinatorServiceTest {
             new LogContext(),
             createConfig(),
             runtime,
-            new GroupCoordinatorMetrics()
+            new GroupCoordinatorMetrics(),
+            createConfigManager()
         );
 
         StreamsGroupInitializeRequestData request = new 
StreamsGroupInitializeRequestData()
@@ -385,7 +388,8 @@ public class GroupCoordinatorServiceTest {
             new LogContext(),
             createConfig(),
             runtime,
-            new GroupCoordinatorMetrics()
+            new GroupCoordinatorMetrics(),
+            createConfigManager()
         );
 
         StreamsGroupHeartbeatRequestData request = new 
StreamsGroupHeartbeatRequestData()
@@ -410,7 +414,8 @@ public class GroupCoordinatorServiceTest {
             new LogContext(),
             createConfig(),
             runtime,
-            new GroupCoordinatorMetrics()
+            new GroupCoordinatorMetrics(),
+            createConfigManager()
         );
 
         StreamsGroupHeartbeatRequestData request = new 
StreamsGroupHeartbeatRequestData()
@@ -461,7 +466,8 @@ public class GroupCoordinatorServiceTest {
             new LogContext(),
             createConfig(),
             runtime,
-            new GroupCoordinatorMetrics()
+            new GroupCoordinatorMetrics(),
+            createConfigManager()
         );
 
         StreamsGroupHeartbeatRequestData request = new 
StreamsGroupHeartbeatRequestData()
@@ -1779,7 +1785,8 @@ public class GroupCoordinatorServiceTest {
             new LogContext(),
             createConfig(),
             runtime,
-            new GroupCoordinatorMetrics()
+            new GroupCoordinatorMetrics(),
+            createConfigManager()
         );
         int partitionCount = 2;
         service.startup(() -> partitionCount);
@@ -1821,7 +1828,8 @@ public class GroupCoordinatorServiceTest {
             new LogContext(),
             createConfig(),
             runtime,
-            new GroupCoordinatorMetrics()
+            new GroupCoordinatorMetrics(),
+            createConfigManager()
         );
         int partitionCount = 1;
         service.startup(() -> partitionCount);
@@ -1855,7 +1863,8 @@ public class GroupCoordinatorServiceTest {
             new LogContext(),
             createConfig(),
             runtime,
-            new GroupCoordinatorMetrics()
+            new GroupCoordinatorMetrics(),
+            createConfigManager()
         );
         int partitionCount = 1;
         service.startup(() -> partitionCount);
@@ -1887,7 +1896,8 @@ public class GroupCoordinatorServiceTest {
             new LogContext(),
             createConfig(),
             runtime,
-            new GroupCoordinatorMetrics()
+            new GroupCoordinatorMetrics(),
+            createConfigManager()
         );
         when(runtime.scheduleReadOperation(
             ArgumentMatchers.eq("streams-group-describe"),
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
index 44b46421205..cf1a003cd84 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
@@ -96,7 +96,7 @@ class CoordinatorStreamsRecordHelpersTest {
             new ApiMessageAndVersion(
                 new StreamsGroupTopologyKey()
                     .setGroupId("group-id"),
-                (short) 18),
+                (short) 21),
             new ApiMessageAndVersion(
                 new StreamsGroupTopologyValue()
                     .setTopology(expectedTopology),
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 3da224f3a95..2fd5d461c99 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -649,7 +649,7 @@ public class StreamsGroupTest {
             group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE));
 
         // Create a member.
-        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.idempotentCreateSnapshot(0);
         group.updateMember(new 
StreamsGroupMember.Builder("member-id").build());
 
         // The member does not exist at last committed offset 0.
@@ -716,13 +716,13 @@ public class StreamsGroupTest {
             new TopicPartition("__consumer_offsets", 0)
         );
         StreamsGroup group = new StreamsGroup(snapshotRegistry, "group-foo", 
metricsShard);
-        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.idempotentCreateSnapshot(0);
         assertTrue(group.isInStates(Collections.singleton("empty"), 0));
         assertFalse(group.isInStates(Collections.singleton("Empty"), 0));
 
         group.updateMember(new StreamsGroupMember.Builder("member1")
             .build());
-        snapshotRegistry.getOrCreateSnapshot(1);
+        snapshotRegistry.idempotentCreateSnapshot((1));
         assertTrue(group.isInStates(Collections.singleton("empty"), 0));
         assertTrue(group.isInStates(Collections.singleton("stable"), 1));
         assertFalse(group.isInStates(Collections.singleton("empty"), 1));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
index 936c9eb6a85..19eb361a1d3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
@@ -471,8 +471,8 @@ public class TaskAssignorConvergenceTest {
 
     @ParameterizedTest
     @ValueSource(strings = {
-        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
-        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+//        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+//        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
         StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
     })
     public void randomClusterPerturbationsShouldConverge(final String 
rackAwareStrategy) {

Reply via email to