This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4cb5f1ca124 KAFKA-19661: Streams groups sometimes describe as
NOT_READY when STABLE (#20600)
4cb5f1ca124 is described below
commit 4cb5f1ca124b810b32fcef1ca23f00faf70603b6
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Oct 14 17:34:30 2025 +0200
KAFKA-19661: Streams groups sometimes describe as NOT_READY when STABLE
(#20600)
Streams groups sometimes describe as NOT_READY when STABLE. That is, the
group is configured and all topics exist, but when you use LIST_GROUP
and STREAMS_GROUP_DESCRIBE, the group will show up as not ready.
The root cause seems to be that
https://github.com/apache/kafka/pull/19802 moved the creation of the
soft state configured topology from the replay path to the heartbeat.
This way, LIST_GROUP and STREAMS_GROUP_DESCRIBE, may not show the
configured topology, because the configured topology that is created in
the heartbeat is "thrown away", and the new group is recreated on the
replay-path.
To reflect a consistent view of the topology via LIST_GROUP and
STREAMS_GROUP_DESCRIBE, we need to store additional information in the
consumer offset topic. In particular, we need to store at least whether
a topology was validated against the current topic metadata, as this
defines whether a group is in STABLE and not in NOT_READY.
This change adds a new field `validatedTopologyEpoch` to the metadata of
the group, which stores precisely this information.
Reviewers: Matthias J. Sax <[email protected]>
---
.../kafka/api/IntegrationTestHarness.scala | 9 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 77 ++++++--
.../coordinator/group/GroupMetadataManager.java | 16 +-
.../streams/StreamsCoordinatorRecordHelpers.java | 8 +-
.../coordinator/group/streams/StreamsGroup.java | 26 ++-
.../common/message/StreamsGroupMetadataValue.json | 4 +-
.../group/GroupMetadataManagerTest.java | 202 ++++++++++++++++-----
.../StreamsCoordinatorRecordHelpersTest.java | 9 +-
.../group/streams/StreamsGroupBuilder.java | 8 +-
.../group/streams/StreamsGroupTest.java | 4 +
10 files changed, 280 insertions(+), 83 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 303e989e9b4..eb14858a944 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -239,7 +239,8 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
configsToRemove: List[String] = List(),
- inputTopic: String,
+ inputTopics: Set[String],
+ changelogTopics: Set[String] = Set(),
streamsGroupId: String): AsyncKafkaConsumer[K,
V] = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
@@ -255,10 +256,10 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
Optional.empty(),
util.Map.of(
"subtopology-0", new StreamsRebalanceData.Subtopology(
- util.Set.of(inputTopic),
+ inputTopics.asJava,
util.Set.of(),
util.Map.of(),
- util.Map.of(inputTopic + "-store-changelog", new
StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(),
util.Map.of())),
+ changelogTopics.map(c => (c, new
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(),
util.Map.of()))).toMap.asJava,
util.Set.of()
)),
Map.empty[String, String].asJava
@@ -270,7 +271,7 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
configOverrides = props,
streamsRebalanceData = streamsRebalanceData
)
- consumer.subscribe(util.Set.of(inputTopic),
+ consumer.subscribe(inputTopics.asJava,
new StreamsRebalanceListener {
override def onTasksRevoked(tasks:
util.Set[StreamsRebalanceData.TaskId]): Unit = ()
override def onTasksAssigned(assignment:
StreamsRebalanceData.Assignment): Unit = ()
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c8e26445922..d48ddaf58ad 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2318,7 +2318,6 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
-
/**
* Test the consumer group APIs for member removal.
*/
@@ -2587,7 +2586,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
val streamsGroup = createStreamsGroup(
- inputTopic = testTopicName,
+ inputTopics = Set(testTopicName),
+ changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId
)
@@ -4412,7 +4412,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
prepareRecords(testTopicName)
val streams = createStreamsGroup(
- inputTopic = testTopicName,
+ inputTopics = Set(testTopicName),
+ changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId
)
streams.poll(JDuration.ofMillis(500L))
@@ -4422,7 +4423,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val firstGroup = client.listGroups().all().get().stream()
.filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
- }, "Stream group not stable yet")
+ }, "Streams group did not transition to STABLE before timeout")
// Verify the describe call works correctly
val describedGroups =
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
@@ -4458,7 +4459,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
client = Admin.create(config)
val streams = createStreamsGroup(
- inputTopic = testTopicName,
+ inputTopics = Set(testTopicName),
+ changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId
)
streams.poll(JDuration.ofMillis(500L))
@@ -4468,7 +4470,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val firstGroup = client.listGroups().all().get().stream()
.filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.NOT_READY &&
firstGroup.groupId() == streamsGroupId
- }, "Stream group not NOT_READY yet")
+ }, "Streams group did not transition to NOT_READY before timeout")
// Verify the describe call works correctly
val describedGroups =
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
@@ -4490,6 +4492,55 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ @Test
+ def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
+ val streamsGroupId = "stream_group_id"
+ val testTopicName = "test_topic"
+ val testNumPartitions = 1
+
+ val config = createConfig
+ client = Admin.create(config)
+
+ prepareTopics(List(testTopicName), testNumPartitions)
+ prepareRecords(testTopicName)
+
+ val streams = createStreamsGroup(
+ inputTopics = Set(testTopicName),
+ streamsGroupId = streamsGroupId
+ )
+ streams.poll(JDuration.ofMillis(500L))
+
+ try {
+ TestUtils.waitUntilTrue(() => {
+ val firstGroup =
client.listGroups().all().get().stream().findFirst().orElse(null)
+ firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
+ }, "Streams group did not transition to STABLE before timeout")
+
+ // Verify the describe call works correctly
+ val describedGroups =
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
+ val group = describedGroups.get(streamsGroupId)
+ assertNotNull(group)
+ assertEquals(streamsGroupId, group.groupId())
+ assertFalse(group.members().isEmpty)
+ assertNotNull(group.subtopologies())
+ assertFalse(group.subtopologies().isEmpty)
+
+ // Verify the topology contains the expected source and sink topics
+ val subtopologies = group.subtopologies().asScala
+ assertTrue(subtopologies.exists(subtopology =>
+ subtopology.sourceTopics().contains(testTopicName)))
+
+ // Test describing a non-existing group
+ val nonExistingGroup = "non_existing_stream_group"
+ val describedNonExistingGroupResponse =
client.describeStreamsGroups(util.List.of(nonExistingGroup))
+ assertFutureThrows(classOf[GroupIdNotFoundException],
describedNonExistingGroupResponse.all())
+
+ } finally {
+ Utils.closeQuietly(streams, "streams")
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
@Test
def testDeleteStreamsGroups(): Unit = {
val testTopicName = "test_topic"
@@ -4512,7 +4563,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val streamsGroupId = s"stream_group_id_$i"
val streams = createStreamsGroup(
- inputTopic = testTopicName,
+ inputTopics = Set(testTopicName),
+ changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId,
)
streams.poll(JDuration.ofMillis(500L))
@@ -4595,7 +4647,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
val streams = createStreamsGroup(
- inputTopic = testTopicName,
+ inputTopics = Set(testTopicName),
+ changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId,
)
@@ -4611,7 +4664,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
TestUtils.waitUntilTrue(() => {
val firstGroup =
client.listGroups().all().get().stream().findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
- }, "Stream group not stable yet")
+ }, "Streams group did not transition to STABLE before timeout")
val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
@@ -4655,7 +4708,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
val streams = createStreamsGroup(
- inputTopic = testTopicName,
+ inputTopics = Set(testTopicName),
+ changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId,
)
@@ -4732,7 +4786,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
val streams = createStreamsGroup(
- inputTopic = testTopicName,
+ inputTopics = Set(testTopicName),
+ changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId,
)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index be9c4f9abc3..95fc9bbb466 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -250,9 +250,9 @@ import static
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMe
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
-import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord;
+import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedTasksChanged;
@@ -1960,19 +1960,26 @@ public class GroupMetadataManager {
updatedConfiguredTopology = group.configuredTopology().get();
}
+ // 3b. If the topology is validated, persist the fact that it is
validated.
+ int validatedTopologyEpoch = -1;
if (updatedConfiguredTopology.isReady()) {
+ validatedTopologyEpoch = updatedTopology.topologyEpoch();
SortedMap<String, ConfiguredSubtopology> subtopologySortedMap =
updatedConfiguredTopology.subtopologies().get();
throwIfRequestContainsInvalidTasks(subtopologySortedMap,
ownedActiveTasks);
throwIfRequestContainsInvalidTasks(subtopologySortedMap,
ownedStandbyTasks);
throwIfRequestContainsInvalidTasks(subtopologySortedMap,
ownedWarmupTasks);
}
+ // We validated a topology that was not validated before, so bump the
group epoch as we may have to reassign tasks.
+ if (validatedTopologyEpoch != group.validatedTopologyEpoch()) {
+ bumpGroupEpoch = true;
+ }
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
groupEpoch += 1;
- records.add(newStreamsGroupEpochRecord(groupId, groupEpoch,
metadataHash));
- log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to
{} with metadata hash {}.", groupId, memberId, groupEpoch, metadataHash);
+ records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch,
metadataHash, validatedTopologyEpoch));
+ log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId,
groupEpoch, metadataHash, validatedTopologyEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs +
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
}
@@ -4291,7 +4298,7 @@ public class GroupMetadataManager {
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
- records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch,
0));
+ records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch,
group.metadataHash(), group.validatedTopologyEpoch()));
cancelTimers(group.groupId(), member.memberId());
@@ -5411,6 +5418,7 @@ public class GroupMetadataManager {
StreamsGroup streamsGroup =
getOrMaybeCreatePersistedStreamsGroup(groupId, true);
streamsGroup.setGroupEpoch(value.epoch());
streamsGroup.setMetadataHash(value.metadataHash());
+
streamsGroup.setValidatedTopologyEpoch(value.validatedTopologyEpoch());
} else {
StreamsGroup streamsGroup;
try {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
index d54f7273eb0..4302d65b6c7 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -96,10 +96,11 @@ public class StreamsCoordinatorRecordHelpers {
);
}
- public static CoordinatorRecord newStreamsGroupEpochRecord(
+ public static CoordinatorRecord newStreamsGroupMetadataRecord(
String groupId,
int newGroupEpoch,
- long metadataHash
+ long metadataHash,
+ int validatedTopologyEpoch
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
@@ -109,7 +110,8 @@ public class StreamsCoordinatorRecordHelpers {
new ApiMessageAndVersion(
new StreamsGroupMetadataValue()
.setEpoch(newGroupEpoch)
- .setMetadataHash(metadataHash),
+ .setMetadataHash(metadataHash)
+ .setValidatedTopologyEpoch(validatedTopologyEpoch),
(short) 0
)
);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 7ec3596628e..c0a4fb1f13b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -147,9 +147,9 @@ public class StreamsGroup implements Group {
private final TimelineHashMap<String, String> staticMembers;
/**
- * The metadata associated with each subscribed topic name.
+ * The topology epoch for which the subscribed topics identified by
metadataHash are validated.
*/
- private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+ private final TimelineInteger validatedTopologyEpoch;
/**
* The metadata hash which is computed based on the all subscribed topics.
@@ -222,7 +222,7 @@ public class StreamsGroup implements Group {
this.groupEpoch = new TimelineInteger(snapshotRegistry);
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
- this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry);
this.metadataHash = new TimelineLong(snapshotRegistry);
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -282,7 +282,6 @@ public class StreamsGroup implements Group {
public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
this.configuredTopology.set(Optional.ofNullable(configuredTopology));
- maybeUpdateGroupState();
}
/**
@@ -598,6 +597,23 @@ public class StreamsGroup implements Group {
this.metadataHash.set(metadataHash);
}
+ /**
+ * @return The validated topology epoch.
+ */
+ public int validatedTopologyEpoch() {
+ return validatedTopologyEpoch.get();
+ }
+
+ /**
+ * Updates the validated topology epoch.
+ *
+ * @param validatedTopologyEpoch The validated topology epoch
+ */
+ public void setValidatedTopologyEpoch(int validatedTopologyEpoch) {
+ this.validatedTopologyEpoch.set(validatedTopologyEpoch);
+ maybeUpdateGroupState();
+ }
+
/**
* Computes the metadata hash based on the current topology and the
current metadata image.
*
@@ -835,7 +851,7 @@ public class StreamsGroup implements Group {
if (members.isEmpty()) {
newState = EMPTY;
clearShutdownRequestMemberId();
- } else if (topology().isEmpty() || configuredTopology().isEmpty() ||
!configuredTopology().get().isReady()) {
+ } else if (topology().filter(t -> t.topologyEpoch() ==
validatedTopologyEpoch.get()).isEmpty()) {
newState = NOT_READY;
} else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
newState = ASSIGNING;
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
index 0d06b0c7f49..b66f8181af9 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
@@ -24,6 +24,8 @@
{ "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." },
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
- "about": "The hash of all topics in the group." }
+ "about": "The hash of all topics in the group." },
+ { "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions":
"0+", "tag": 0, "default": -1, "type": "int32",
+ "about": "The topology epoch whose topics where validated to be present
in a valid configuration in the metadata." }
]
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6521c48532c..65e7e87e4fd 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -139,7 +139,6 @@ import
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRol
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
-import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@@ -4434,23 +4433,27 @@ public class GroupMetadataManagerTest {
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(1), 10) //
Stable group
.withTargetAssignmentEpoch(10)
.withTopology(new StreamsTopology(1, Map.of()))
+ .withValidatedTopologyEpoch(1)
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(0))
.setMemberEpoch(10)
.build()))
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(2), 10) //
Assigning group
.withTargetAssignmentEpoch(9)
.withTopology(new StreamsTopology(1, Map.of()))
+ .withValidatedTopologyEpoch(1)
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(1))
.setMemberEpoch(9)
.build()))
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(3), 10) //
Reconciling group
.withTargetAssignmentEpoch(10)
.withTopology(new StreamsTopology(1, Map.of()))
+ .withValidatedTopologyEpoch(1)
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(2))
.setMemberEpoch(9)
.build()))
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(4), 10) //
NotReady group
.withTargetAssignmentEpoch(10)
+ .withTopology(new StreamsTopology(1, Map.of()))
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(3))
.build()))
.build();
@@ -9736,7 +9739,7 @@ public class GroupMetadataManagerTest {
StreamsGroupMember.Builder memberBuilder1 =
streamsGroupMemberBuilderWithDefaults(memberId1);
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
memberBuilder1.build()));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId,
epoch + 1, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
epoch + 1, 0, -1));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(streamsGroupId,
topology));
TasksTuple assignment = new TasksTuple(
@@ -9749,7 +9752,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId,
memberId2, assignment));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
memberBuilder2.build()));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId,
epoch + 2, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
epoch + 2, 0, 0));
List<StreamsGroupDescribeResponseData.DescribedGroup> actual =
context.groupMetadataManager.streamsGroupDescribe(List.of(streamsGroupId),
context.lastCommittedOffset);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new
StreamsGroupDescribeResponseData.DescribedGroup()
@@ -9780,7 +9783,7 @@ public class GroupMetadataManagerTest {
)
)
)
- .setGroupState(StreamsGroup.StreamsGroupState.NOT_READY.toString())
+ .setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString())
.setGroupEpoch(epoch + 2);
assertEquals(1, actual.size());
assertEquals(describedGroup, actual.get(0));
@@ -16035,7 +16038,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
member));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
100, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
100, 0, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology));
@@ -16283,7 +16286,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1,
groupMetadataHash),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
groupMetadataHash, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -16296,6 +16299,98 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testJoinEmptyStreamsGroupAndDescribe() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+ )));
+
+ assertThrows(GroupIdNotFoundException.class, () ->
+ context.groupMetadataManager.streamsGroup(groupId));
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setProcessId("process-id")
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ result.response().data()
+ );
+
+ StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(1500)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4,
5)))
+ .build();
+
+ // Commit the offset, so that the latest state will be described below
+ context.commit();
+
+ List<StreamsGroupDescribeResponseData.DescribedGroup>
actualDescribedGroups =
context.groupMetadataManager.streamsGroupDescribe(List.of(groupId),
context.lastCommittedOffset);
+ StreamsGroupDescribeResponseData.DescribedGroup expectedDescribedGroup
= new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setAssignmentEpoch(1)
+ .setTopology(
+ new StreamsGroupDescribeResponseData.Topology()
+ .setEpoch(0)
+ .setSubtopologies(List.of(
+ new StreamsGroupDescribeResponseData.Subtopology()
+ .setSubtopologyId(subtopology1)
+ .setSourceTopics(List.of(fooTopicName))
+ ))
+ )
+ .setMembers(Collections.singletonList(
+
expectedMember.asStreamsGroupDescribeMember(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
+ ))
+ .setGroupState(StreamsGroupState.STABLE.toString())
+ .setGroupEpoch(1);
+ assertEquals(1, actualDescribedGroups.size());
+ assertEquals(expectedDescribedGroup, actualDescribedGroups.get(0));
+ }
+
@Test
public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
String groupId = "fooup";
@@ -16364,9 +16459,9 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1,
computeGroupHash(Map.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- ))),
+ )), -1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -16450,14 +16545,15 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1,
computeGroupHash(Map.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- ))),
+ )), -1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
+ assertEquals(StreamsGroupState.NOT_READY,
context.streamsGroupState(groupId));
assertRecordsEquals(expectedRecords, result.records());
}
@@ -16532,15 +16628,16 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1,
computeGroupHash(Map.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
- ))),
+ )), -1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
+ assertEquals(StreamsGroupState.NOT_READY,
context.streamsGroupState(groupId));
assertRecordsEquals(expectedRecords, result.records());
}
@@ -16577,6 +16674,7 @@ public class GroupMetadataManagerTest {
.withStreamsGroup(
new StreamsGroupBuilder(groupId, 10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology1))
+ .withValidatedTopologyEpoch(1)
)
.build();
@@ -16627,10 +16725,10 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
- ))),
+ )), 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -16682,6 +16780,7 @@ public class GroupMetadataManagerTest {
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withMetadataHash(groupMetadataHash)
+ .withValidatedTopologyEpoch(0)
)
.build();
@@ -16751,6 +16850,8 @@ public class GroupMetadataManagerTest {
.buildCoordinatorMetadataImage();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ long metadataHash = computeGroupHash(Map.of(fooTopicName,
computeTopicHash(fooTopicName, metadataImage)));
+
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
@@ -16767,7 +16868,8 @@ public class GroupMetadataManagerTest {
.build())
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
- .withMetadataHash(computeGroupHash(Map.of(fooTopicName,
computeTopicHash(fooTopicName, metadataImage))))
+ .withValidatedTopologyEpoch(0)
+ .withMetadataHash(metadataHash)
)
.build();
@@ -16792,17 +16894,12 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId1),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0)
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
metadataHash, 0)
),
result1.records()
);
- for (CoordinatorRecord record : result1.records()) {
- context.replay(record);
- }
- assignor.prepareGroupAssignment(
- Map.of(memberId1, TasksTuple.EMPTY)
- );
+ assignor.prepareGroupAssignment(Map.of());
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result2 = context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
@@ -16814,7 +16911,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId2)
- .setMemberEpoch(12)
+ .setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setStatus(List.of(
new StreamsGroupHeartbeatResponseData.Status()
@@ -16916,7 +17013,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11,
groupMetadataHash),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
groupMetadataHash, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -17022,10 +17119,10 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, newMetadataImage),
barTopicName, computeTopicHash(barTopicName, newMetadataImage)
- ))),
+ )), 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -17206,7 +17303,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId2),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0)
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 0,
-1)
);
assertRecordsEquals(expectedRecords, result.records());
@@ -17334,6 +17431,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
.withTargetAssignmentEpoch(10)
.withMetadataHash(groupMetadataHash)
+ .withValidatedTopologyEpoch(0)
)
.build();
@@ -17776,16 +17874,12 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberId1)
.build()));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
11, groupMetadataHash));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
11, groupMetadataHash, -1));
assertEquals(StreamsGroupState.NOT_READY,
context.streamsGroupState(groupId));
context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
- .setConfiguredTopology(InternalTopicManager.configureTopics(
- new LogContext(),
- groupMetadataHash,
-
StreamsTopology.fromRecord(StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(topology)),
- metadataImage));
+ .setValidatedTopologyEpoch(0);
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
context.streamsGroupState(groupId));
@@ -17942,9 +18036,9 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- ))),
+ )), 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)
@@ -18064,9 +18158,9 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- ))),
+ )), 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)
@@ -18169,13 +18263,17 @@ public class GroupMetadataManagerTest {
Topology topology = new Topology().setSubtopologies(List.of(
new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+ long groupMetadataHash = computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .buildCoordinatorMetadataImage())
+ .withMetadataImage(metadataImage)
.build();
assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
@@ -18211,7 +18309,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 2, 0)
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
groupMetadataHash, 0)
)
)
)),
@@ -18393,13 +18491,17 @@ public class GroupMetadataManagerTest {
Topology topology = new Topology().setSubtopologies(List.of(
new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+ long groupMetadataHash = computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 3)
- .buildCoordinatorMetadataImage())
+ .withMetadataImage(metadataImage)
.build();
assignor.prepareGroupAssignment(
@@ -18511,7 +18613,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId1),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 3, 0)
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3,
groupMetadataHash, 0)
)
)
)),
@@ -18799,7 +18901,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(classicGroupId, 1,
0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId,
1, 0, -1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId,
1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId,
expectedMember)
@@ -19383,7 +19485,7 @@ public class GroupMetadataManagerTest {
// The group still exists but the member is already gone. Replaying the
// StreamsGroupMemberMetadata tombstone should be a no-op.
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
"m1"));
assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
@@ -19439,7 +19541,7 @@ public class GroupMetadataManagerTest {
.build();
// The group is created if it does not exist.
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0));
assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
}
@@ -19631,7 +19733,7 @@ public class GroupMetadataManagerTest {
// The group still exists, but the member is already gone. Replaying
the
// StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo",
"m1"));
assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
@@ -19707,7 +19809,7 @@ public class GroupMetadataManagerTest {
// The group still exists, but the member is already gone. Replaying
the
// StreamsGroupTopology tombstone should be a no-op.
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
10, 0, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo"));
assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
index 2485cb65e6f..457bd55c602 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -250,19 +250,20 @@ class StreamsCoordinatorRecordHelpersTest {
}
@Test
- public void testNewStreamsGroupEpochRecord() {
+ public void testNewStreamsGroupMetadataRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupMetadataKey()
.setGroupId(GROUP_ID),
new ApiMessageAndVersion(
new StreamsGroupMetadataValue()
.setEpoch(42)
- .setMetadataHash(42),
+ .setMetadataHash(43)
+ .setValidatedTopologyEpoch(44),
(short) 0
)
);
- assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42, 42));
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 43,
44));
}
@Test
@@ -676,7 +677,7 @@ class StreamsCoordinatorRecordHelpersTest {
@Test
public void testNewStreamsGroupEpochRecordNullGroupId() {
NullPointerException exception =
assertThrows(NullPointerException.class, () ->
- StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(null,
1, 1));
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(null, 1, 1, 1));
assertEquals("groupId should not be null here",
exception.getMessage());
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index 5d291d9884d..b7ffdd82def 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -35,6 +35,7 @@ public class StreamsGroupBuilder {
private final Map<String, StreamsGroupMember> members = new HashMap<>();
private final Map<String, TasksTuple> targetAssignments = new HashMap<>();
private long metadataHash = 0L;
+ private int validatedTopologyEpoch = -1;
public StreamsGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
@@ -53,6 +54,11 @@ public class StreamsGroupBuilder {
return this;
}
+ public StreamsGroupBuilder withValidatedTopologyEpoch(int
validatedTopologyEpoch) {
+ this.validatedTopologyEpoch = validatedTopologyEpoch;
+ return this;
+ }
+
public StreamsGroupBuilder withTopology(StreamsTopology streamsTopology) {
this.topology = streamsTopology;
return this;
@@ -79,7 +85,7 @@ public class StreamsGroupBuilder {
// Add group epoch record.
records.add(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch,
metadataHash));
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
groupEpoch, metadataHash, validatedTopologyEpoch));
// Add target assignment records.
targetAssignments.forEach((memberId, assignment) ->
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index ba24abd2b80..94f78006426 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -498,6 +498,7 @@ public class StreamsGroupTest {
streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0,
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
+ streamsGroup.setValidatedTopologyEpoch(1);
assertEquals(MemberState.STABLE, member1.state());
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
streamsGroup.state());
@@ -694,6 +695,7 @@ public class StreamsGroupTest {
);
group.setGroupEpoch(1);
group.setTopology(new StreamsTopology(1, Map.of()));
+ group.setValidatedTopologyEpoch(1);
group.setConfiguredTopology(new ConfiguredTopology(1, 0,
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
group.setTargetAssignmentEpoch(1);
group.updateMember(new StreamsGroupMember.Builder("member1")
@@ -760,6 +762,7 @@ public class StreamsGroupTest {
streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0,
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
+ streamsGroup.setValidatedTopologyEpoch(1);
assertEquals(StreamsGroup.StreamsGroupState.RECONCILING,
streamsGroup.state());
assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);
@@ -805,6 +808,7 @@ public class StreamsGroupTest {
group.setGroupEpoch(1);
group.setTopology(new StreamsTopology(1, Map.of()));
group.setConfiguredTopology(new ConfiguredTopology(1, 0,
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
+ group.setValidatedTopologyEpoch(1);
group.setTargetAssignmentEpoch(1);
group.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(1)