This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 36f17cfee55 KAFKA-19765: Store the last used assignment configuration
in the group metadata (#20702)
36f17cfee55 is described below
commit 36f17cfee5586497b067047eb3a8c5603a203271
Author: lucliu1108 <[email protected]>
AuthorDate: Fri Oct 17 06:53:30 2025 -0500
KAFKA-19765: Store the last used assignment configuration in the group
metadata (#20702)
## What
Ticket: https://issues.apache.org/jira/browse/KAFKA-19765
In KIP-1071, there are configurations that affect the assignment, and
that can be configured dynamically.
- The previous assignment config of streams group is stored as part of
the StreamsGroupMetadata as a collection of key-value pairs.
- If the assignment configuration is changed, group epoch is also
bumped.
Reviewers: Lucas Brutschy <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 33 ++++-
.../streams/StreamsCoordinatorRecordHelpers.java | 13 +-
.../coordinator/group/streams/StreamsGroup.java | 26 ++++
.../common/message/StreamsGroupMetadataValue.json | 12 +-
.../group/GroupMetadataManagerTest.java | 160 ++++++++++++++++++---
.../StreamsCoordinatorRecordHelpersTest.java | 22 ++-
.../group/streams/StreamsGroupBuilder.java | 8 +-
7 files changed, 239 insertions(+), 35 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 95fc9bbb466..d05fe4c94eb 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1974,11 +1974,20 @@ public class GroupMetadataManager {
bumpGroupEpoch = true;
}
+ // Check if assignment configurations have changed
+ Map<String, String> currentAssignmentConfigs =
streamsGroupAssignmentConfigs(groupId);
+ Map<String, String> storedAssignmentConfigs =
group.lastAssignmentConfigs();
+ if (!bumpGroupEpoch &&
!currentAssignmentConfigs.equals(storedAssignmentConfigs)) {
+ log.info("[GroupId {}][MemberId {}] Assignment configurations
changed to {}. Triggering rebalance.",
+ groupId, memberId, currentAssignmentConfigs);
+ bumpGroupEpoch = true;
+ }
+
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
groupEpoch += 1;
- records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch,
metadataHash, validatedTopologyEpoch));
+ records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch,
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId,
groupEpoch, metadataHash, validatedTopologyEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs +
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
@@ -1996,7 +2005,8 @@ public class GroupMetadataManager {
updatedMember,
updatedConfiguredTopology,
metadataImage,
- records
+ records,
+ currentAssignmentConfigs
);
targetAssignmentEpoch = groupEpoch;
} else {
@@ -3950,10 +3960,10 @@ public class GroupMetadataManager {
StreamsGroupMember updatedMember,
ConfiguredTopology configuredTopology,
CoordinatorMetadataImage metadataImage,
- List<CoordinatorRecord> records
+ List<CoordinatorRecord> records,
+ Map<String, String> assignmentConfigs
) {
TaskAssignor assignor = streamsGroupAssignor(group.groupId());
- Map<String, String> assignmentConfigs =
streamsGroupAssignmentConfigs(group.groupId());
try {
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder
assignmentResultBuilder =
new
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder(
@@ -4298,7 +4308,7 @@ public class GroupMetadataManager {
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
- records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch,
group.metadataHash(), group.validatedTopologyEpoch()));
+ records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch,
group.metadataHash(), group.validatedTopologyEpoch(),
group.lastAssignmentConfigs()));
cancelTimers(group.groupId(), member.memberId());
@@ -5419,6 +5429,19 @@ public class GroupMetadataManager {
streamsGroup.setGroupEpoch(value.epoch());
streamsGroup.setMetadataHash(value.metadataHash());
streamsGroup.setValidatedTopologyEpoch(value.validatedTopologyEpoch());
+
+ if (value.lastAssignmentConfigs() != null) {
+ streamsGroup.setLastAssignmentConfigs(
+ value.lastAssignmentConfigs().stream()
+ .collect(Collectors.toMap(
+
StreamsGroupMetadataValue.LastAssignmentConfig::key,
+
StreamsGroupMetadataValue.LastAssignmentConfig::value
+ ))
+ );
+ } else {
+ streamsGroup.setLastAssignmentConfigs(Map.of());
+ }
+
} else {
StreamsGroup streamsGroup;
try {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
index 4302d65b6c7..dc0f4dd7d2f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -100,9 +100,17 @@ public class StreamsCoordinatorRecordHelpers {
String groupId,
int newGroupEpoch,
long metadataHash,
- int validatedTopologyEpoch
+ int validatedTopologyEpoch,
+ Map<String, String> assignmentConfigs
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
+ Objects.requireNonNull(assignmentConfigs, "assignmentConfigs should
not be null here");
+
+ List<StreamsGroupMetadataValue.LastAssignmentConfig>
assignmentConfigList = assignmentConfigs.entrySet().stream()
+ .map(entry -> new StreamsGroupMetadataValue.LastAssignmentConfig()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue()))
+ .toList();
return CoordinatorRecord.record(
new StreamsGroupMetadataKey()
@@ -111,7 +119,8 @@ public class StreamsCoordinatorRecordHelpers {
new StreamsGroupMetadataValue()
.setEpoch(newGroupEpoch)
.setMetadataHash(metadataHash)
- .setValidatedTopologyEpoch(validatedTopologyEpoch),
+ .setValidatedTopologyEpoch(validatedTopologyEpoch)
+ .setLastAssignmentConfigs(assignmentConfigList),
(short) 0
)
);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 0a4a739ec6d..ad43be02401 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -208,6 +208,12 @@ public class StreamsGroup implements Group {
*/
private int endpointInformationEpoch = -1;
+ /**
+ * The last used assignment configurations for this streams group.
+ * This is used to determine when assignment configuration changes should
trigger a rebalance.
+ */
+ private TimelineHashMap<String, String> lastAssignmentConfigs;
+
public StreamsGroup(
LogContext logContext,
SnapshotRegistry snapshotRegistry,
@@ -229,6 +235,7 @@ public class StreamsGroup implements Group {
this.currentWarmupTaskToProcessIds = new
TimelineHashMap<>(snapshotRegistry, 0);
this.topology = new TimelineObject<>(snapshotRegistry,
Optional.empty());
this.configuredTopology = new TimelineObject<>(snapshotRegistry,
Optional.empty());
+ this.lastAssignmentConfigs = new TimelineHashMap<>(snapshotRegistry,
0);
}
/**
@@ -1091,4 +1098,23 @@ public class StreamsGroup implements Group {
public void setEndpointInformationEpoch(int endpointInformationEpoch) {
this.endpointInformationEpoch = endpointInformationEpoch;
}
+
+ /**
+ * @return The assignment configurations for this streams group.
+ */
+ public Map<String, String> lastAssignmentConfigs() {
+ return Collections.unmodifiableMap(lastAssignmentConfigs);
+ }
+
+ /**
+ * Sets last assignment configurations.
+ *
+ * @param lastAssignmentConfigs The last assignment configurations to set.
+ */
+ public void setLastAssignmentConfigs(Map<String, String>
lastAssignmentConfigs) {
+ this.lastAssignmentConfigs.clear();
+ if (lastAssignmentConfigs != null) {
+ this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
+ }
+ }
}
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
index b66f8181af9..d7bd377daa9 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
@@ -26,6 +26,16 @@
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"about": "The hash of all topics in the group." },
{ "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions":
"0+", "tag": 0, "default": -1, "type": "int32",
- "about": "The topology epoch whose topics where validated to be present
in a valid configuration in the metadata." }
+ "about": "The topology epoch whose topics are validated to be present in
a valid configuration in the metadata." },
+ { "name": "LastAssignmentConfigs","taggedVersions": "0+",
"nullableVersions": "0+","tag": 1, "default": null,
+ "type": "[]LastAssignmentConfig", "about": "The last used configuration
parameters as key-value pairs." }
+ ],
+ "commonStructs": [
+ { "name": "LastAssignmentConfig", "versions": "0+", "fields": [
+ { "name": "Key", "type": "string", "versions": "0+",
+ "about": "Key of the config." },
+ { "name": "Value", "type": "string", "versions": "0+",
+ "about": "Value of the config." }
+ ]}
]
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 65e7e87e4fd..02fc987e101 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -113,6 +113,8 @@ import
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
import
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
@@ -9739,7 +9741,7 @@ public class GroupMetadataManagerTest {
StreamsGroupMember.Builder memberBuilder1 =
streamsGroupMemberBuilderWithDefaults(memberId1);
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
memberBuilder1.build()));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
epoch + 1, 0, -1));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
epoch + 1, 0, -1, Map.of()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(streamsGroupId,
topology));
TasksTuple assignment = new TasksTuple(
@@ -9752,7 +9754,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId,
memberId2, assignment));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
memberBuilder2.build()));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
epoch + 2, 0, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
epoch + 2, 0, 0, Map.of()));
List<StreamsGroupDescribeResponseData.DescribedGroup> actual =
context.groupMetadataManager.streamsGroupDescribe(List.of(streamsGroupId),
context.lastCommittedOffset);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new
StreamsGroupDescribeResponseData.DescribedGroup()
@@ -16038,7 +16040,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
member));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
100, 0, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
100, 0, 0, Map.of()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology));
@@ -16286,7 +16288,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
groupMetadataHash, 0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -16461,7 +16463,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- )), -1),
+ )), -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -16547,7 +16549,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- )), -1),
+ )), -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -16631,7 +16633,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
- )), -1),
+ )), -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -16728,7 +16730,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
- )), 1),
+ )), 1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -16781,6 +16783,7 @@ public class GroupMetadataManagerTest {
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withMetadataHash(groupMetadataHash)
.withValidatedTopologyEpoch(0)
+ .withLastAssignmentConfigs(getDefaultAssignmentConfigs())
)
.build();
@@ -16870,6 +16873,7 @@ public class GroupMetadataManagerTest {
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withValidatedTopologyEpoch(0)
.withMetadataHash(metadataHash)
+ .withLastAssignmentConfigs(getDefaultAssignmentConfigs())
)
.build();
@@ -16894,7 +16898,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId1),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
metadataHash, 0)
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
metadataHash, 0, getDefaultAssignmentConfigs())
),
result1.records()
);
@@ -17013,7 +17017,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
groupMetadataHash, 0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -17122,7 +17126,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, newMetadataImage),
barTopicName, computeTopicHash(barTopicName, newMetadataImage)
- )), 0),
+ )), 0, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -17303,7 +17307,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId2),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 0,
-1)
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 0,
-1, Map.of())
);
assertRecordsEquals(expectedRecords, result.records());
@@ -17874,7 +17878,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberId1)
.build()));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
11, groupMetadataHash, -1));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
11, groupMetadataHash, -1, Map.of()));
assertEquals(StreamsGroupState.NOT_READY,
context.streamsGroupState(groupId));
@@ -18038,7 +18042,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- )), 0),
+ )), 0, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)
@@ -18160,7 +18164,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- )), 0),
+ )), 0, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)
@@ -18309,7 +18313,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
groupMetadataHash, 0)
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
)
)
)),
@@ -18613,7 +18617,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId1),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3,
groupMetadataHash, 0)
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
)
)
)),
@@ -18815,6 +18819,109 @@ public class GroupMetadataManagerTest {
assertNull(result.response().data().partitionsByUserEndpoint());
}
+ @Test
+ public void testStreamsGroupEpochIncreaseWithAssignmentConfigChanges() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(metadataImage)
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 0)
+ .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+ .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2,
3, 4, 5)))
+ .build())
+ .withTargetAssignment(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
+ .withTargetAssignmentEpoch(10)
+ .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+ .withValidatedTopologyEpoch(0)
+ )
+ .build();
+
+ // Change the assignment config
+ Properties newConfig = new Properties();
+ newConfig.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "2");
+ context.groupConfigManager.updateGroupConfig(groupId, newConfig);
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+ );
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10)
+ .setActiveTasks(List.of(new
StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1, 2))))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1, 2))))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ result.response().data()
+ );
+
+ // Find the StreamsGroupMetadata record
+ CoordinatorRecord metadataRecord = result.records().stream()
+ .filter(record -> record.key() instanceof StreamsGroupMetadataKey)
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(metadataRecord, "Expected a StreamsGroupMetadata
record");
+ // Verify the metadata record contains the updated assignment config
+ StreamsGroupMetadataValue metadataValue = (StreamsGroupMetadataValue)
metadataRecord.value().message();
+ assertEquals(11, metadataValue.epoch());
+
+ // Verify the assignment config contains the updated value
+ List<StreamsGroupMetadataValue.LastAssignmentConfig> assignmentConfigs
= metadataValue.lastAssignmentConfigs();
+ assertFalse(assignmentConfigs.isEmpty(), "Expected assignment configs
to be present");
+
+ StreamsGroupMetadataValue.LastAssignmentConfig standbyReplicasConfig =
assignmentConfigs.stream()
+ .filter(config -> "num.standby.replicas".equals(config.key()))
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(standbyReplicasConfig, "Expected num.standby.replicas
config to be present");
+ assertEquals("2", standbyReplicasConfig.value());
+
+ // Verify that group epoch was bumped
+ StreamsGroup group =
context.groupMetadataManager.streamsGroup(groupId);
+ int newGroupEpoch = group.groupEpoch();
+ assertEquals(11, newGroupEpoch);
+ assertEquals("2",
group.lastAssignmentConfigs().get("num.standby.replicas"));
+ }
+
@Test
public void testStreamsGroupHeartbeatWithNonEmptyClassicGroup() {
String classicGroupId = "classic-group-id";
@@ -18901,7 +19008,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId,
1, 0, -1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId,
1, 0, -1, Map.of("num.standby.replicas", "0")),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId,
1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId,
expectedMember)
@@ -19485,7 +19592,7 @@ public class GroupMetadataManagerTest {
// The group still exists but the member is already gone. Replaying the
// StreamsGroupMemberMetadata tombstone should be a no-op.
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0, Map.of("num.standby.replicas", "0")));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
"m1"));
assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
@@ -19541,7 +19648,7 @@ public class GroupMetadataManagerTest {
.build();
// The group is created if it does not exist.
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0, Map.of("num.standby.replicas", "0")));
assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
}
@@ -19733,7 +19840,7 @@ public class GroupMetadataManagerTest {
// The group still exists, but the member is already gone. Replaying
the
// StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0, Map.of("num.standby.replicas", "0")));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo",
"m1"));
assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
@@ -19809,7 +19916,7 @@ public class GroupMetadataManagerTest {
// The group still exists, but the member is already gone. Replaying
the
// StreamsGroupTopology tombstone should be a no-op.
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0, Map.of("num.standby.replicas", "0")));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo"));
assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty());
@@ -24057,4 +24164,13 @@ public class GroupMetadataManagerTest {
return responseTopics.stream()
.collect(Collectors.toMap(DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic::topicId,
Function.identity()));
}
+
+ /**
+ * Returns the default assignment configurations that would be used by the
system.
+ * This matches what streamsGroupAssignmentConfigs() would return.
+ */
+ private Map<String, String> getDefaultAssignmentConfigs() {
+ // Use the same default value as
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT
+ return Map.of("num.standby.replicas",
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT));
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
index 457bd55c602..c57f8cbd60d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -249,8 +249,19 @@ class StreamsCoordinatorRecordHelpersTest {
assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(GROUP_ID,
MEMBER_ID));
}
+ @Test
+ public void testNewStreamsGroupMetadataRecordWithNullAssignmentConfig() {
+ assertThrows(NullPointerException.class, () ->
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 43,
44, null));
+ }
+
@Test
public void testNewStreamsGroupMetadataRecord() {
+ List<StreamsGroupMetadataValue.LastAssignmentConfig>
expectedAssignmentConfigs = List.of(
+ new StreamsGroupMetadataValue.LastAssignmentConfig()
+ .setKey("num.standby.replicas")
+ .setValue("2")
+ );
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupMetadataKey()
.setGroupId(GROUP_ID),
@@ -258,12 +269,15 @@ class StreamsCoordinatorRecordHelpersTest {
new StreamsGroupMetadataValue()
.setEpoch(42)
.setMetadataHash(43)
- .setValidatedTopologyEpoch(44),
+ .setValidatedTopologyEpoch(44)
+ .setLastAssignmentConfigs(expectedAssignmentConfigs),
(short) 0
)
);
- assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 43,
44));
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 43,
44, Map.of(
+ "num.standby.replicas", "2"
+ )));
}
@Test
@@ -675,9 +689,9 @@ class StreamsCoordinatorRecordHelpersTest {
}
@Test
- public void testNewStreamsGroupEpochRecordNullGroupId() {
+ public void testNewStreamsGroupMetadataRecordNullGroupId() {
NullPointerException exception =
assertThrows(NullPointerException.class, () ->
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(null, 1, 1, 1));
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(null, 1, 1, 1,
Map.of()));
assertEquals("groupId should not be null here",
exception.getMessage());
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index b7ffdd82def..aa2f75ec846 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -36,6 +36,7 @@ public class StreamsGroupBuilder {
private final Map<String, TasksTuple> targetAssignments = new HashMap<>();
private long metadataHash = 0L;
private int validatedTopologyEpoch = -1;
+ private final Map<String, String> lastAssignmentConfigs = new HashMap<>();
public StreamsGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
@@ -74,6 +75,11 @@ public class StreamsGroupBuilder {
return this;
}
+ public StreamsGroupBuilder withLastAssignmentConfigs(Map<String, String>
lastAssignmentConfigs) {
+ this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
+ return this;
+ }
+
public List<CoordinatorRecord> build() {
List<CoordinatorRecord> records = new ArrayList<>();
@@ -85,7 +91,7 @@ public class StreamsGroupBuilder {
// Add group epoch record.
records.add(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
groupEpoch, metadataHash, validatedTopologyEpoch));
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
groupEpoch, metadataHash, validatedTopologyEpoch, lastAssignmentConfigs));
// Add target assignment records.
targetAssignments.forEach((memberId, assignment) ->