This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 754e924ab09 MINOR: Set unequal assignment epochs fallback to default
epoch for streams group (#21655)
754e924ab09 is described below
commit 754e924ab090aa6a288755eb826337fa2c242ce9
Author: Lucy Liu <[email protected]>
AuthorDate: Tue Mar 10 05:33:00 2026 -0400
MINOR: Set unequal assignment epochs fallback to default epoch for streams
group (#21655)
## Summary
This PR modifies the way of handling unequal assignmentEpoch and
partition array length, from throwing an `IllegalStateException` to
logging an error and then fallback to default member epoch.
Reviewers: Lucas Brutschy <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 2 +-
.../group/streams/StreamsGroupMember.java | 8 +++++++-
.../group/streams/TasksTupleWithEpochs.java | 23 ++++++++++++----------
.../group/streams/StreamsGroupMemberTest.java | 6 +++++-
.../group/streams/TasksTupleWithEpochsTest.java | 22 ++++++++++++++++-----
5 files changed, 43 insertions(+), 18 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a03e8900555..1967089ebd5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -5734,7 +5734,7 @@ public class GroupMetadataManager {
StreamsGroup streamsGroup =
getOrMaybeCreatePersistedStreamsGroup(groupId, true);
StreamsGroupMember oldMember =
streamsGroup.getOrCreateUninitializedMember(memberId);
StreamsGroupMember newMember = new
StreamsGroupMember.Builder(oldMember)
- .updateWith(value)
+ .updateWith(log, groupId, value)
.build();
streamsGroup.updateMember(newMember);
} else {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index 47e05e72688..887472132a6 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -20,6 +20,8 @@ import
org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.slf4j.Logger;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -249,12 +251,14 @@ public record StreamsGroupMember(String memberId,
return this;
}
- public Builder updateWith(StreamsGroupCurrentMemberAssignmentValue
record) {
+ public Builder updateWith(Logger log, String groupId,
StreamsGroupCurrentMemberAssignmentValue record) {
setMemberEpoch(record.memberEpoch());
setPreviousMemberEpoch(record.previousMemberEpoch());
setState(MemberState.fromValue(record.state()));
setAssignedTasks(
TasksTupleWithEpochs.fromCurrentAssignmentRecord(
+ log,
+ groupId,
record.activeTasks(),
record.standbyTasks(),
record.warmupTasks(),
@@ -263,6 +267,8 @@ public record StreamsGroupMember(String memberId,
);
setTasksPendingRevocation(
TasksTupleWithEpochs.fromCurrentAssignmentRecord(
+ log,
+ groupId,
record.activeTasksPendingRevocation(),
record.standbyTasksPendingRevocation(),
record.warmupTasksPendingRevocation(),
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java
index e5d726b0e64..99220d466bc 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java
@@ -18,6 +18,8 @@ package org.apache.kafka.coordinator.group.streams;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import org.slf4j.Logger;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -102,13 +104,15 @@ public record TasksTupleWithEpochs(Map<String,
Map<Integer, Integer>> activeTask
* @return The TasksTupleWithEpochs
*/
public static TasksTupleWithEpochs fromCurrentAssignmentRecord(
+ Logger log,
+ String groupId,
List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> activeTasks,
List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> standbyTasks,
List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> warmupTasks,
int memberEpoch
) {
return new TasksTupleWithEpochs(
- parseActiveTasksWithEpochs(activeTasks, memberEpoch),
+ parseActiveTasksWithEpochs(log, groupId, activeTasks, memberEpoch),
parseSimpleTasks(standbyTasks),
parseSimpleTasks(warmupTasks)
);
@@ -125,6 +129,8 @@ public record TasksTupleWithEpochs(Map<String, Map<Integer,
Integer>> activeTask
}
private static Map<String, Map<Integer, Integer>>
parseActiveTasksWithEpochs(
+ Logger log,
+ String groupId,
List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIdsList,
int memberEpoch
) {
@@ -137,19 +143,16 @@ public record TasksTupleWithEpochs(Map<String,
Map<Integer, Integer>> activeTask
Map<Integer, Integer> partitionsWithEpochs = new HashMap<>();
- if (epochs != null && !epochs.isEmpty()) {
- if (epochs.size() != partitions.size()) {
- throw new IllegalStateException(
- "Assignment epochs must be provided for all
partitions. " +
- "Subtopology " + subtopologyId + " has " +
partitions.size() +
- " partitions but " + epochs.size() + " epochs"
- );
- }
-
+ if (epochs != null && epochs.size() == partitions.size()) {
for (int i = 0; i < partitions.size(); i++) {
partitionsWithEpochs.put(partitions.get(i), epochs.get(i));
}
} else {
+ if (epochs != null) {
+ log.error("[GroupId {}] Size of assignment epochs {} is
not equal to partitions {} for subtopology {}. " +
+ "Using default epoch {} for all partitions.",
+ groupId, epochs.size(), partitions.size(),
subtopologyId, memberEpoch);
+ }
// Legacy record without epochs: use member epoch as default
for (Integer partition : partitions) {
partitionsWithEpochs.put(partition, memberEpoch);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
index 5fd33272857..d332ffb632a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -23,6 +23,8 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataVa
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.KeyValue;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
@@ -46,6 +48,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class StreamsGroupMemberTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamsGroupMemberTest.class);
+ private static final String GROUP_ID = "test-group";
private static final String MEMBER_ID = "member-id";
private static final int MEMBER_EPOCH = 10;
private static final int PREVIOUS_MEMBER_EPOCH = 9;
@@ -226,7 +230,7 @@ public class StreamsGroupMemberTest {
.setWarmupTasksPendingRevocation(List.of(new
TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS6)));
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
- .updateWith(record)
+ .updateWith(LOG, GROUP_ID, record)
.build();
assertEquals(MEMBER_ID, member.memberId());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochsTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochsTest.java
index f92a23847fa..d31db67af7a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochsTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochsTest.java
@@ -16,9 +16,12 @@
*/
package org.apache.kafka.coordinator.group.streams;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -35,6 +38,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TasksTupleWithEpochsTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(TasksTupleWithEpochs.class);
+ private static final String GROUP_ID = "test-group";
private static final String SUBTOPOLOGY_1 = "1";
private static final String SUBTOPOLOGY_2 = "2";
private static final String SUBTOPOLOGY_3 = "3";
@@ -97,7 +102,7 @@ public class TasksTupleWithEpochsTest {
.setPartitions(Arrays.asList(7, 8, 9)));
TasksTupleWithEpochs tuple =
TasksTupleWithEpochs.fromCurrentAssignmentRecord(
- activeTasks, standbyTasks, warmupTasks, 100
+ LOG, GROUP_ID, activeTasks, standbyTasks, warmupTasks, 100
);
assertEquals(
@@ -133,7 +138,7 @@ public class TasksTupleWithEpochsTest {
int memberEpoch = 100;
TasksTupleWithEpochs tuple =
TasksTupleWithEpochs.fromCurrentAssignmentRecord(
- activeTasks, List.of(), List.of(), memberEpoch
+ LOG, GROUP_ID, activeTasks, List.of(), List.of(), memberEpoch
);
// Should use member epoch as default
@@ -152,9 +157,16 @@ public class TasksTupleWithEpochsTest {
.setPartitions(Arrays.asList(1, 2, 3))
.setAssignmentEpochs(Arrays.asList(10, 11))); // Only 2 epochs for
3 partitions
- assertThrows(IllegalStateException.class, () ->
- TasksTupleWithEpochs.fromCurrentAssignmentRecord(activeTasks,
List.of(), List.of(), 100)
- );
+ try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(TasksTupleWithEpochs.class)) {
+ TasksTupleWithEpochs tuple =
TasksTupleWithEpochs.fromCurrentAssignmentRecord(LOG, GROUP_ID, activeTasks,
List.of(), List.of(), 100);
+ assertEquals(
+ Map.of(SUBTOPOLOGY_1, Map.of(1, 100, 2, 100, 3, 100)),
+ tuple.activeTasksWithEpochs()
+ );
+ assertEquals(1, appender.getMessages("ERROR").stream()
+ .filter(msg -> msg.contains("[GroupId " + GROUP_ID + "] Size
of assignment epochs 2 is not equal to partitions 3 for subtopology 1."))
+ .count());
+ }
}
@Test