This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 36f17cfee55 KAFKA-19765: Store the last used assignment configuration 
in the group metadata (#20702)
36f17cfee55 is described below

commit 36f17cfee5586497b067047eb3a8c5603a203271
Author: lucliu1108 <[email protected]>
AuthorDate: Fri Oct 17 06:53:30 2025 -0500

    KAFKA-19765: Store the last used assignment configuration in the group 
metadata (#20702)
    
    ## What
    Ticket: https://issues.apache.org/jira/browse/KAFKA-19765
    In KIP-1071, there are configurations that affect the assignment, and
    that can be configured dynamically.
    - The previous assignment config of streams group is stored as part of
    the StreamsGroupMetadata as a collection of key-value pairs.
    - If the assignment configuration is changed, group epoch is also
    bumped.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  33 ++++-
 .../streams/StreamsCoordinatorRecordHelpers.java   |  13 +-
 .../coordinator/group/streams/StreamsGroup.java    |  26 ++++
 .../common/message/StreamsGroupMetadataValue.json  |  12 +-
 .../group/GroupMetadataManagerTest.java            | 160 ++++++++++++++++++---
 .../StreamsCoordinatorRecordHelpersTest.java       |  22 ++-
 .../group/streams/StreamsGroupBuilder.java         |   8 +-
 7 files changed, 239 insertions(+), 35 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 95fc9bbb466..d05fe4c94eb 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1974,11 +1974,20 @@ public class GroupMetadataManager {
             bumpGroupEpoch = true;
         }
 
+        // Check if assignment configurations have changed
+        Map<String, String> currentAssignmentConfigs = 
streamsGroupAssignmentConfigs(groupId);
+        Map<String, String> storedAssignmentConfigs = 
group.lastAssignmentConfigs();
+        if (!bumpGroupEpoch && 
!currentAssignmentConfigs.equals(storedAssignmentConfigs)) {
+            log.info("[GroupId {}][MemberId {}] Assignment configurations 
changed to {}. Triggering rebalance.",
+                groupId, memberId, currentAssignmentConfigs);
+            bumpGroupEpoch = true;
+        }
+
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
         if (bumpGroupEpoch) {
             groupEpoch += 1;
-            records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch));
+            records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
             log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId, 
groupEpoch, metadataHash, validatedTopologyEpoch);
             metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
             group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
@@ -1996,7 +2005,8 @@ public class GroupMetadataManager {
                 updatedMember,
                 updatedConfiguredTopology,
                 metadataImage,
-                records
+                records,
+                currentAssignmentConfigs
             );
             targetAssignmentEpoch = groupEpoch;
         } else {
@@ -3950,10 +3960,10 @@ public class GroupMetadataManager {
         StreamsGroupMember updatedMember,
         ConfiguredTopology configuredTopology,
         CoordinatorMetadataImage metadataImage,
-        List<CoordinatorRecord> records
+        List<CoordinatorRecord> records,
+        Map<String, String> assignmentConfigs
     ) {
         TaskAssignor assignor = streamsGroupAssignor(group.groupId());
-        Map<String, String> assignmentConfigs = 
streamsGroupAssignmentConfigs(group.groupId());
         try {
             org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder 
assignmentResultBuilder =
                 new 
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder(
@@ -4298,7 +4308,7 @@ public class GroupMetadataManager {
 
         // We bump the group epoch.
         int groupEpoch = group.groupEpoch() + 1;
-        records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, 
group.metadataHash(), group.validatedTopologyEpoch()));
+        records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, 
group.metadataHash(), group.validatedTopologyEpoch(), 
group.lastAssignmentConfigs()));
 
         cancelTimers(group.groupId(), member.memberId());
 
@@ -5419,6 +5429,19 @@ public class GroupMetadataManager {
             streamsGroup.setGroupEpoch(value.epoch());
             streamsGroup.setMetadataHash(value.metadataHash());
             
streamsGroup.setValidatedTopologyEpoch(value.validatedTopologyEpoch());
+
+            if (value.lastAssignmentConfigs() != null) {
+                streamsGroup.setLastAssignmentConfigs(
+                    value.lastAssignmentConfigs().stream()
+                        .collect(Collectors.toMap(
+                            
StreamsGroupMetadataValue.LastAssignmentConfig::key,
+                            
StreamsGroupMetadataValue.LastAssignmentConfig::value
+                        ))
+                );
+            } else {
+                streamsGroup.setLastAssignmentConfigs(Map.of());
+            }
+
         } else {
             StreamsGroup streamsGroup;
             try {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
index 4302d65b6c7..dc0f4dd7d2f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -100,9 +100,17 @@ public class StreamsCoordinatorRecordHelpers {
         String groupId,
         int newGroupEpoch,
         long metadataHash,
-        int validatedTopologyEpoch
+        int validatedTopologyEpoch,
+        Map<String, String> assignmentConfigs
     ) {
         Objects.requireNonNull(groupId, "groupId should not be null here");
+        Objects.requireNonNull(assignmentConfigs, "assignmentConfigs should 
not be null here");
+
+        List<StreamsGroupMetadataValue.LastAssignmentConfig> 
assignmentConfigList = assignmentConfigs.entrySet().stream()
+            .map(entry -> new StreamsGroupMetadataValue.LastAssignmentConfig()
+                .setKey(entry.getKey())
+                .setValue(entry.getValue()))
+            .toList();
 
         return CoordinatorRecord.record(
             new StreamsGroupMetadataKey()
@@ -111,7 +119,8 @@ public class StreamsCoordinatorRecordHelpers {
                 new StreamsGroupMetadataValue()
                     .setEpoch(newGroupEpoch)
                     .setMetadataHash(metadataHash)
-                    .setValidatedTopologyEpoch(validatedTopologyEpoch),
+                    .setValidatedTopologyEpoch(validatedTopologyEpoch)
+                    .setLastAssignmentConfigs(assignmentConfigList),
                 (short) 0
             )
         );
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 0a4a739ec6d..ad43be02401 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -208,6 +208,12 @@ public class StreamsGroup implements Group {
      */
     private int endpointInformationEpoch = -1;
 
+    /**
+     * The last used assignment configurations for this streams group.
+     * This is used to determine when assignment configuration changes should 
trigger a rebalance.
+     */
+    private TimelineHashMap<String, String> lastAssignmentConfigs;
+
     public StreamsGroup(
         LogContext logContext,
         SnapshotRegistry snapshotRegistry,
@@ -229,6 +235,7 @@ public class StreamsGroup implements Group {
         this.currentWarmupTaskToProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
         this.topology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
         this.configuredTopology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+        this.lastAssignmentConfigs = new TimelineHashMap<>(snapshotRegistry, 
0);
     }
 
     /**
@@ -1091,4 +1098,23 @@ public class StreamsGroup implements Group {
     public void setEndpointInformationEpoch(int endpointInformationEpoch) {
         this.endpointInformationEpoch = endpointInformationEpoch;
     }
+
+    /**
+     * @return The assignment configurations for this streams group.
+     */
+    public Map<String, String> lastAssignmentConfigs() {
+        return Collections.unmodifiableMap(lastAssignmentConfigs);
+    }
+
+    /**
+     * Sets last assignment configurations.
+     *
+     * @param lastAssignmentConfigs The last assignment configurations to set.
+     */
+    public void setLastAssignmentConfigs(Map<String, String> 
lastAssignmentConfigs) {
+        this.lastAssignmentConfigs.clear();
+        if (lastAssignmentConfigs != null) {
+            this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
+        }
+    }
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
index b66f8181af9..d7bd377daa9 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
@@ -26,6 +26,16 @@
     { "name": "MetadataHash", "versions": "0+", "type": "int64",
       "about": "The hash of all topics in the group." },
     { "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions": 
"0+", "tag": 0, "default": -1, "type": "int32",
-      "about": "The topology epoch whose topics where validated to be present 
in a valid configuration in the metadata." }
+      "about": "The topology epoch whose topics are validated to be present in 
a valid configuration in the metadata." },
+    { "name": "LastAssignmentConfigs","taggedVersions": "0+", 
"nullableVersions": "0+","tag": 1, "default": null,
+      "type": "[]LastAssignmentConfig", "about": "The last used configuration 
parameters as key-value pairs." }
+  ],
+  "commonStructs": [
+    { "name": "LastAssignmentConfig", "versions": "0+", "fields": [
+      { "name": "Key", "type": "string", "versions": "0+",
+        "about": "Key of the config." },
+      { "name": "Value", "type": "string", "versions": "0+",
+        "about": "Value of the config." }
+    ]}
   ]
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 65e7e87e4fd..02fc987e101 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -113,6 +113,8 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
@@ -9739,7 +9741,7 @@ public class GroupMetadataManagerTest {
         StreamsGroupMember.Builder memberBuilder1 = 
streamsGroupMemberBuilderWithDefaults(memberId1);
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
 memberBuilder1.build()));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
 memberBuilder1.build()));
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
 epoch + 1, 0, -1));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
 epoch + 1, 0, -1, Map.of()));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(streamsGroupId,
 topology));
 
         TasksTuple assignment = new TasksTuple(
@@ -9752,7 +9754,7 @@ public class GroupMetadataManagerTest {
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
 memberBuilder2.build()));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId,
 memberId2, assignment));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
 memberBuilder2.build()));
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
 epoch + 2, 0, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
 epoch + 2, 0, 0, Map.of()));
 
         List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.groupMetadataManager.streamsGroupDescribe(List.of(streamsGroupId), 
context.lastCommittedOffset);
         StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup()
@@ -16038,7 +16040,7 @@ public class GroupMetadataManagerTest {
 
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 member));
 
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
 100, 0, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
 100, 0, 0, Map.of()));
 
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
 topology));
 
@@ -16286,7 +16288,7 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, 
groupMetadataHash, 0),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, 
groupMetadataHash, 0, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -16461,7 +16463,7 @@ public class GroupMetadataManagerTest {
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            )), -1),
+            )), -1, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -16547,7 +16549,7 @@ public class GroupMetadataManagerTest {
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            )), -1),
+            )), -1, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -16631,7 +16633,7 @@ public class GroupMetadataManagerTest {
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage),
                 barTopicName, computeTopicHash(barTopicName, metadataImage)
-            )), -1),
+            )), -1, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -16728,7 +16730,7 @@ public class GroupMetadataManagerTest {
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage),
                 barTopicName, computeTopicHash(barTopicName, metadataImage)
-            )), 1),
+            )), 1, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 11),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -16781,6 +16783,7 @@ public class GroupMetadataManagerTest {
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
                 .withMetadataHash(groupMetadataHash)
                 .withValidatedTopologyEpoch(0)
+                .withLastAssignmentConfigs(getDefaultAssignmentConfigs())
             )
             .build();
 
@@ -16870,6 +16873,7 @@ public class GroupMetadataManagerTest {
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
                 .withValidatedTopologyEpoch(0)
                 .withMetadataHash(metadataHash)
+                .withLastAssignmentConfigs(getDefaultAssignmentConfigs())
             )
             .build();
 
@@ -16894,7 +16898,7 @@ public class GroupMetadataManagerTest {
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, 
memberId1),
-                
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
metadataHash, 0)
+                
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
metadataHash, 0, getDefaultAssignmentConfigs())
             ),
             result1.records()
         );
@@ -17013,7 +17017,7 @@ public class GroupMetadataManagerTest {
 
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
groupMetadataHash, 0),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
groupMetadataHash, 0, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -17122,7 +17126,7 @@ public class GroupMetadataManagerTest {
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, newMetadataImage),
                 barTopicName, computeTopicHash(barTopicName, newMetadataImage)
-            )), 0),
+            )), 0,  Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -17303,7 +17307,7 @@ public class GroupMetadataManagerTest {
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId2),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
 memberId2),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, 
memberId2),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 0, 
-1)
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 0, 
-1, Map.of())
         );
 
         assertRecordsEquals(expectedRecords, result.records());
@@ -17874,7 +17878,7 @@ public class GroupMetadataManagerTest {
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
 topology));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberId1)
             .build()));
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
 11, groupMetadataHash, -1));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
 11, groupMetadataHash, -1, Map.of()));
 
         assertEquals(StreamsGroupState.NOT_READY, 
context.streamsGroupState(groupId));
 
@@ -18038,7 +18042,7 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            )), 0),
+            )), 0, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)
@@ -18160,7 +18164,7 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            )), 0),
+            )), 0, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)
@@ -18309,7 +18313,7 @@ public class GroupMetadataManagerTest {
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, 
memberId),
-                        
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, 
groupMetadataHash, 0)
+                        
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, 
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
                     )
                 )
             )),
@@ -18613,7 +18617,7 @@ public class GroupMetadataManagerTest {
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, 
memberId1),
-                        
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3, 
groupMetadataHash, 0)
+                        
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3, 
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
                     )
                 )
             )),
@@ -18815,6 +18819,109 @@ public class GroupMetadataManagerTest {
         assertNull(result.response().data().partitionsByUserEndpoint());
     }
 
+    @Test
+    public void testStreamsGroupEpochIncreaseWithAssignmentConfigChanges() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 0)
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
+                    
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 
3, 4, 5)))
+                    .build())
+                .withTargetAssignment(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)))
+                .withTargetAssignmentEpoch(10)
+                .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+                .withValidatedTopologyEpoch(0)
+            )
+            .build();
+
+        // Change the assignment config
+        Properties newConfig = new Properties();
+        newConfig.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "2");
+        context.groupConfigManager.updateGroupConfig(groupId, newConfig);
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+        );
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10)
+                .setActiveTasks(List.of(new 
StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(subtopology1)
+                    .setPartitions(List.of(0, 1, 2))))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatResponseData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(0, 1, 2))))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()),
+            result.response().data()
+        );
+
+        // Find the StreamsGroupMetadata record
+        CoordinatorRecord metadataRecord = result.records().stream()
+            .filter(record -> record.key() instanceof StreamsGroupMetadataKey)
+            .findFirst()
+            .orElse(null);
+
+        assertNotNull(metadataRecord, "Expected a StreamsGroupMetadata 
record");
+        // Verify the metadata record contains the updated assignment config
+        StreamsGroupMetadataValue metadataValue = (StreamsGroupMetadataValue) 
metadataRecord.value().message();
+        assertEquals(11, metadataValue.epoch());
+
+        // Verify the assignment config contains the updated value
+        List<StreamsGroupMetadataValue.LastAssignmentConfig> assignmentConfigs 
= metadataValue.lastAssignmentConfigs();
+        assertFalse(assignmentConfigs.isEmpty(), "Expected assignment configs 
to be present");
+
+        StreamsGroupMetadataValue.LastAssignmentConfig standbyReplicasConfig = 
assignmentConfigs.stream()
+            .filter(config -> "num.standby.replicas".equals(config.key()))
+            .findFirst()
+            .orElse(null);
+
+        assertNotNull(standbyReplicasConfig, "Expected num.standby.replicas 
config to be present");
+        assertEquals("2", standbyReplicasConfig.value());
+
+        // Verify that group epoch was bumped
+        StreamsGroup group = 
context.groupMetadataManager.streamsGroup(groupId);
+        int newGroupEpoch = group.groupEpoch();
+        assertEquals(11, newGroupEpoch);
+        assertEquals("2", 
group.lastAssignmentConfigs().get("num.standby.replicas"));
+    }
+
     @Test
     public void testStreamsGroupHeartbeatWithNonEmptyClassicGroup() {
         String classicGroupId = "classic-group-id";
@@ -18901,7 +19008,7 @@ public class GroupMetadataManagerTest {
                 
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId, 
expectedMember),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId, 
topology),
-                
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId, 
1, 0, -1),
+                
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId, 
1, 0, -1, Map.of("num.standby.replicas", "0")),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId,
 memberId, TasksTuple.EMPTY),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId,
 1),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId,
 expectedMember)
@@ -19485,7 +19592,7 @@ public class GroupMetadataManagerTest {
 
         // The group still exists but the member is already gone. Replaying the
         // StreamsGroupMemberMetadata tombstone should be a no-op.
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0, Map.of("num.standby.replicas", "0")));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
 "m1"));
         assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
 
@@ -19541,7 +19648,7 @@ public class GroupMetadataManagerTest {
             .build();
 
         // The group is created if it does not exist.
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0, Map.of("num.standby.replicas", "0")));
         assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
     }
 
@@ -19733,7 +19840,7 @@ public class GroupMetadataManagerTest {
 
         // The group still exists, but the member is already gone. Replaying 
the
         // StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0, Map.of("num.standby.replicas", "0")));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo",
 "m1"));
         assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
 
@@ -19809,7 +19916,7 @@ public class GroupMetadataManagerTest {
 
         // The group still exists, but the member is already gone. Replaying 
the
         // StreamsGroupTopology tombstone should be a no-op.
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0, Map.of("num.standby.replicas", "0")));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo"));
         
assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty());
 
@@ -24057,4 +24164,13 @@ public class GroupMetadataManagerTest {
         return responseTopics.stream()
             
.collect(Collectors.toMap(DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic::topicId,
 Function.identity()));
     }
+
+    /**
+     * Returns the default assignment configurations that would be used by the 
system.
+     * This matches what streamsGroupAssignmentConfigs() would return.
+     */
+    private Map<String, String> getDefaultAssignmentConfigs() {
+        // Use the same default value as 
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT
+        return Map.of("num.standby.replicas", 
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT));
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
index 457bd55c602..c57f8cbd60d 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -249,8 +249,19 @@ class StreamsCoordinatorRecordHelpersTest {
         assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(GROUP_ID, 
MEMBER_ID));
     }
 
+    @Test
+    public void testNewStreamsGroupMetadataRecordWithNullAssignmentConfig() {
+        assertThrows(NullPointerException.class, () ->
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 43, 
44, null));
+    }
+
     @Test
     public void testNewStreamsGroupMetadataRecord() {
+        List<StreamsGroupMetadataValue.LastAssignmentConfig> 
expectedAssignmentConfigs = List.of(
+            new StreamsGroupMetadataValue.LastAssignmentConfig()
+                .setKey("num.standby.replicas")
+                .setValue("2")
+        );
         CoordinatorRecord expectedRecord = CoordinatorRecord.record(
             new StreamsGroupMetadataKey()
                 .setGroupId(GROUP_ID),
@@ -258,12 +269,15 @@ class StreamsCoordinatorRecordHelpersTest {
                 new StreamsGroupMetadataValue()
                     .setEpoch(42)
                     .setMetadataHash(43)
-                    .setValidatedTopologyEpoch(44),
+                    .setValidatedTopologyEpoch(44)
+                    .setLastAssignmentConfigs(expectedAssignmentConfigs),
                 (short) 0
             )
         );
 
-        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 43, 
44));
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 43, 
44, Map.of(
+            "num.standby.replicas", "2"
+        )));
     }
 
     @Test
@@ -675,9 +689,9 @@ class StreamsCoordinatorRecordHelpersTest {
     }
 
     @Test
-    public void testNewStreamsGroupEpochRecordNullGroupId() {
+    public void testNewStreamsGroupMetadataRecordNullGroupId() {
         NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(null, 1, 1, 1));
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(null, 1, 1, 1, 
Map.of()));
         assertEquals("groupId should not be null here", 
exception.getMessage());
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index b7ffdd82def..aa2f75ec846 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -36,6 +36,7 @@ public class StreamsGroupBuilder {
     private final Map<String, TasksTuple> targetAssignments = new HashMap<>();
     private long metadataHash = 0L;
     private int validatedTopologyEpoch = -1;
+    private final Map<String, String> lastAssignmentConfigs = new HashMap<>();
 
     public StreamsGroupBuilder(String groupId, int groupEpoch) {
         this.groupId = groupId;
@@ -74,6 +75,11 @@ public class StreamsGroupBuilder {
         return this;
     }
 
+    public StreamsGroupBuilder withLastAssignmentConfigs(Map<String, String> 
lastAssignmentConfigs) {
+        this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
+        return this;
+    }
+
     public List<CoordinatorRecord> build() {
         List<CoordinatorRecord> records = new ArrayList<>();
 
@@ -85,7 +91,7 @@ public class StreamsGroupBuilder {
 
         // Add group epoch record.
         records.add(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 
groupEpoch, metadataHash, validatedTopologyEpoch));
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 
groupEpoch, metadataHash, validatedTopologyEpoch, lastAssignmentConfigs));
 
         // Add target assignment records.
         targetAssignments.forEach((memberId, assignment) ->


Reply via email to