This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 425f0285569 KAFKA-17747: [5/N] Replace subscription metadata with
metadata hash in stream group (#19802)
425f0285569 is described below
commit 425f0285569c67e6553d33822cb42370a4948010
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Jun 3 19:21:34 2025 +0800
KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in
stream group (#19802)
* Use metadata hash to replace subscription metadata.
* Remove `StreamsGroupPartitionMetadataKey` and
`StreamsGroupPartitionMetadataValue`.
* Check whether `configuredTopology` is empty. If it's, call
`InternalTopicManager.configureTopics` and set the result to the group.
Reviewers: Lucas Brutschy <[email protected]>
---------
Signed-off-by: PoAn Yang <[email protected]>
---
.../coordinator/group/GroupCoordinatorShard.java | 9 -
.../coordinator/group/GroupMetadataManager.java | 75 ++---
.../streams/StreamsCoordinatorRecordHelpers.java | 54 ----
.../coordinator/group/streams/StreamsGroup.java | 81 +++---
.../group/streams/TargetAssignmentBuilder.java | 17 +-
.../coordinator/group/streams/TopicMetadata.java | 8 -
.../group/streams/TopologyMetadata.java | 26 +-
.../group/streams/topics/ConfiguredTopology.java | 2 +
.../topics/EndpointToPartitionsManager.java | 22 +-
.../group/streams/topics/InternalTopicManager.java | 57 ++--
.../message/StreamsGroupPartitionMetadataKey.json | 27 --
.../StreamsGroupPartitionMetadataValue.json | 34 ---
.../group/GroupCoordinatorShardTest.java | 56 ----
.../group/GroupMetadataManagerTest.java | 305 ++++++++++-----------
.../group/GroupMetadataManagerTestContext.java | 23 +-
.../StreamsCoordinatorRecordHelpersTest.java | 65 -----
.../group/streams/StreamsGroupBuilder.java | 18 +-
.../group/streams/StreamsGroupTest.java | 178 +++---------
.../group/streams/TargetAssignmentBuilderTest.java | 14 +-
.../group/streams/TopicMetadataTest.java | 15 -
.../group/streams/TopologyMetadataTest.java | 24 +-
.../streams/topics/ConfiguredTopologyTest.java | 11 +-
.../topics/EndpointToPartitionsManagerTest.java | 23 +-
.../streams/topics/InternalTopicManagerTest.java | 23 +-
24 files changed, 380 insertions(+), 787 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 27d2183b5a0..8016378943c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -109,8 +109,6 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -1299,13 +1297,6 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
);
break;
- case STREAMS_GROUP_PARTITION_METADATA:
- groupMetadataManager.replay(
- (StreamsGroupPartitionMetadataKey) key,
- (StreamsGroupPartitionMetadataValue)
Utils.messageOrNull(value)
- );
- break;
-
case STREAMS_GROUP_MEMBER_METADATA:
groupMetadataManager.replay(
(StreamsGroupMemberMetadataKey) key,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index bb2106d6ce0..9a5fdf2bdab 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -129,8 +129,6 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -260,7 +258,6 @@ import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecor
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord;
-import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedTasksChanged;
@@ -1894,43 +1891,42 @@ public class GroupMetadataManager {
StreamsTopology updatedTopology = maybeUpdateTopology(groupId,
memberId, topology, group, records);
maybeSetTopologyStaleStatus(group, updatedMember, returnedStatus);
- // 3. Determine the partition metadata and any internal topics if
needed.
+ // 3. Determine any internal topics if needed.
ConfiguredTopology updatedConfiguredTopology;
- Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
updatedPartitionMetadata;
boolean reconfigureTopology = group.topology().isEmpty();
- if (reconfigureTopology || group.hasMetadataExpired(currentTimeMs)) {
+ long metadataHash = group.metadataHash();
+ if (reconfigureTopology || group.configuredTopology().isEmpty() ||
group.hasMetadataExpired(currentTimeMs)) {
- updatedPartitionMetadata = group.computePartitionMetadata(
- metadataImage.topics(),
+ metadataHash = group.computeMetadataHash(
+ metadataImage,
+ topicHashCache,
updatedTopology
);
- if (!updatedPartitionMetadata.equals(group.partitionMetadata())) {
- log.info("[GroupId {}][MemberId {}] Computed new partition
metadata: {}.",
- groupId, memberId, updatedPartitionMetadata);
+ if (metadataHash != group.metadataHash()) {
+ log.info("[GroupId {}][MemberId {}] Computed new metadata
hash: {}.",
+ groupId, memberId, metadataHash);
bumpGroupEpoch = true;
reconfigureTopology = true;
- records.add(newStreamsGroupPartitionMetadataRecord(groupId,
updatedPartitionMetadata));
- group.setPartitionMetadata(updatedPartitionMetadata);
}
if (reconfigureTopology || group.configuredTopology().isEmpty()) {
log.info("[GroupId {}][MemberId {}] Configuring the topology
{}", groupId, memberId, updatedTopology);
- updatedConfiguredTopology =
InternalTopicManager.configureTopics(logContext, updatedTopology,
updatedPartitionMetadata);
+ updatedConfiguredTopology =
InternalTopicManager.configureTopics(logContext, metadataHash, updatedTopology,
metadataImage.topics());
+ group.setConfiguredTopology(updatedConfiguredTopology);
} else {
updatedConfiguredTopology = group.configuredTopology().get();
}
} else {
updatedConfiguredTopology = group.configuredTopology().get();
- updatedPartitionMetadata = group.partitionMetadata();
}
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
groupEpoch += 1;
- records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, 0));
- log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to
{}.", groupId, memberId, groupEpoch);
+ records.add(newStreamsGroupEpochRecord(groupId, groupEpoch,
metadataHash));
+ log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to
{} with metadata hash {}.", groupId, memberId, groupEpoch, metadataHash);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs +
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
}
@@ -1946,7 +1942,7 @@ public class GroupMetadataManager {
groupEpoch,
updatedMember,
updatedConfiguredTopology,
- updatedPartitionMetadata,
+ metadataImage,
records
);
targetAssignmentEpoch = groupEpoch;
@@ -2111,7 +2107,7 @@ public class GroupMetadataManager {
final StreamsGroupHeartbeatResponseData.Endpoint
responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
responseEndpoint.setHost(endpoint.host());
responseEndpoint.setPort(endpoint.port());
- StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions =
EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint,
group);
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions =
EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint,
group, metadataImage);
endpointToPartitionsList.add(endpointToPartitions);
}
}
@@ -3795,12 +3791,12 @@ public class GroupMetadataManager {
}
/**
- * Updates the target assignment according to the updated member and
subscription metadata.
+ * Updates the target assignment according to the updated member and
metadata image.
*
* @param group The StreamsGroup.
* @param groupEpoch The group epoch.
* @param updatedMember The updated member.
- * @param subscriptionMetadata The subscription metadata.
+ * @param metadataImage The metadata image.
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
@@ -3809,7 +3805,7 @@ public class GroupMetadataManager {
int groupEpoch,
StreamsGroupMember updatedMember,
ConfiguredTopology configuredTopology,
- Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
subscriptionMetadata,
+ MetadataImage metadataImage,
List<CoordinatorRecord> records
) {
TaskAssignor assignor = streamsGroupAssignor(group.groupId());
@@ -3825,7 +3821,7 @@ public class GroupMetadataManager {
.withMembers(group.members())
.withTopology(configuredTopology)
.withStaticMembers(group.staticMembers())
- .withPartitionMetadata(subscriptionMetadata)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(group.targetAssignment())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
@@ -5282,6 +5278,7 @@ public class GroupMetadataManager {
if (value != null) {
StreamsGroup streamsGroup =
getOrMaybeCreatePersistedStreamsGroup(groupId, true);
streamsGroup.setGroupEpoch(value.epoch());
+ streamsGroup.setMetadataHash(value.metadataHash());
} else {
StreamsGroup streamsGroup;
try {
@@ -5304,38 +5301,6 @@ public class GroupMetadataManager {
}
- /**
- * Replays StreamsGroupPartitionMetadataKey/Value to update the hard state
of
- * the streams group. It updates the subscription metadata of the streams
- * group.
- *
- * @param key A StreamsGroupPartitionMetadataKey key.
- * @param value A StreamsGroupPartitionMetadataValue record.
- */
- public void replay(
- StreamsGroupPartitionMetadataKey key,
- StreamsGroupPartitionMetadataValue value
- ) {
- String groupId = key.groupId();
- StreamsGroup streamsGroup;
- try {
- streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId,
value != null);
- } catch (GroupIdNotFoundException ex) {
- // If the group does not exist, we can ignore the tombstone.
- return;
- }
-
- if (value != null) {
- Map<String,
org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata =
new HashMap<>();
- value.topics().forEach(topicMetadata -> {
- partitionMetadata.put(topicMetadata.topicName(),
org.apache.kafka.coordinator.group.streams.TopicMetadata.fromRecord(topicMetadata));
- });
- streamsGroup.setPartitionMetadata(partitionMetadata);
- } else {
- streamsGroup.setPartitionMetadata(Map.of());
- }
- }
-
/**
* Replays ShareGroupMemberMetadataKey/Value to update the hard state of
* the share group. It updates the subscription part of the member or
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
index ea002d2e130..d54f7273eb0 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -24,8 +24,6 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -98,58 +96,6 @@ public class StreamsCoordinatorRecordHelpers {
);
}
- /**
- * Creates a StreamsGroupPartitionMetadata record.
- *
- * @param groupId The streams group id.
- * @param newPartitionMetadata The partition metadata.
- * @return The record.
- */
- public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
- String groupId,
- Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
newPartitionMetadata
- ) {
- Objects.requireNonNull(groupId, "groupId should not be null here");
- Objects.requireNonNull(newPartitionMetadata, "newPartitionMetadata
should not be null here");
-
- StreamsGroupPartitionMetadataValue value = new
StreamsGroupPartitionMetadataValue();
- newPartitionMetadata.forEach((topicName, topicMetadata) -> {
- value.topics().add(new
StreamsGroupPartitionMetadataValue.TopicMetadata()
- .setTopicId(topicMetadata.id())
- .setTopicName(topicMetadata.name())
- .setNumPartitions(topicMetadata.numPartitions())
- );
- });
-
-
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
-
- return CoordinatorRecord.record(
- new StreamsGroupPartitionMetadataKey()
- .setGroupId(groupId),
- new ApiMessageAndVersion(
- value,
- (short) 0
- )
- );
- }
-
- /**
- * Creates a StreamsGroupPartitionMetadata tombstone.
- *
- * @param groupId The streams group id.
- * @return The record.
- */
- public static CoordinatorRecord
newStreamsGroupPartitionMetadataTombstoneRecord(
- String groupId
- ) {
- Objects.requireNonNull(groupId, "groupId should not be null here");
-
- return CoordinatorRecord.tombstone(
- new StreamsGroupPartitionMetadataKey()
- .setGroupId(groupId)
- );
- }
-
public static CoordinatorRecord newStreamsGroupEpochRecord(
String groupId,
int newGroupEpoch,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 3a38e1d0a1d..061f816296a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -29,15 +29,16 @@ import
org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
-import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
-import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineLong;
import org.apache.kafka.timeline.TimelineObject;
import org.slf4j.Logger;
@@ -152,6 +153,11 @@ public class StreamsGroup implements Group {
*/
private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+ /**
+ * The metadata hash which is computed based on the all subscribed topics.
+ */
+ protected final TimelineLong metadataHash;
+
/**
* The target assignment epoch. An assignment epoch smaller than the group
epoch means that a new assignment is required. The assignment
* epoch is updated when a new assignment is installed.
@@ -226,6 +232,7 @@ public class StreamsGroup implements Group {
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.metadataHash = new TimelineLong(snapshotRegistry);
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentActiveTaskToProcessId = new
TimelineHashMap<>(snapshotRegistry, 0);
@@ -280,7 +287,11 @@ public class StreamsGroup implements Group {
public void setTopology(StreamsTopology topology) {
this.topology.set(Optional.ofNullable(topology));
- maybeUpdateConfiguredTopology();
+ maybeUpdateGroupState();
+ }
+
+ public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
+ this.configuredTopology.set(Optional.ofNullable(configuredTopology));
maybeUpdateGroupState();
}
@@ -582,54 +593,47 @@ public class StreamsGroup implements Group {
}
/**
- * @return An immutable map of partition metadata for each topic that are
inputs for this streams group.
+ * @return The metadata hash.
*/
- public Map<String, TopicMetadata> partitionMetadata() {
- return Collections.unmodifiableMap(partitionMetadata);
+ public long metadataHash() {
+ return metadataHash.get();
}
/**
- * Updates the partition metadata. This replaces the previous one.
+ * Updates the metadata hash.
*
- * @param partitionMetadata The new partition metadata.
+ * @param metadataHash The new metadata hash.
*/
- public void setPartitionMetadata(
- Map<String, TopicMetadata> partitionMetadata
- ) {
- this.partitionMetadata.clear();
- this.partitionMetadata.putAll(partitionMetadata);
- maybeUpdateConfiguredTopology();
- maybeUpdateGroupState();
+ public void setMetadataHash(long metadataHash) {
+ this.metadataHash.set(metadataHash);
}
/**
- * Computes the partition metadata based on the current topology and the
current topics image.
+ * Computes the metadata hash based on the current topology and the
current metadata image.
*
- * @param topicsImage The current metadata for all available topics.
- * @param topology The current metadata for the Streams topology
- * @return An immutable map of partition metadata for each topic that the
Streams topology is using (besides non-repartition sink topics)
- */
- public Map<String, TopicMetadata> computePartitionMetadata(
- TopicsImage topicsImage,
+ * @param metadataImage The current metadata image.
+ * @param topicHashCache The cache for the topic hashes.
+ * @param topology The current metadata for the Streams topology
+ * @return The metadata hash.
+ */
+ public long computeMetadataHash(
+ MetadataImage metadataImage,
+ Map<String, Long> topicHashCache,
StreamsTopology topology
) {
Set<String> requiredTopicNames = topology.requiredTopics();
- // Create the topic metadata for each subscribed topic.
- Map<String, TopicMetadata> newPartitionMetadata = new
HashMap<>(requiredTopicNames.size());
-
+ Map<String, Long> topicHash = new HashMap<>(requiredTopicNames.size());
requiredTopicNames.forEach(topicName -> {
- TopicImage topicImage = topicsImage.getTopic(topicName);
+ TopicImage topicImage = metadataImage.topics().getTopic(topicName);
if (topicImage != null) {
- newPartitionMetadata.put(topicName, new TopicMetadata(
- topicImage.id(),
- topicImage.name(),
- topicImage.partitions().size())
+ topicHash.put(
+ topicName,
+ topicHashCache.computeIfAbsent(topicName, k ->
Utils.computeTopicHash(topicName, metadataImage))
);
}
});
-
- return Collections.unmodifiableMap(newPartitionMetadata);
+ return Utils.computeGroupHash(topicHash);
}
/**
@@ -793,7 +797,6 @@ public class StreamsGroup implements Group {
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId(),
memberId))
);
-
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(groupId()));
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(groupId()));
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId()));
}
@@ -855,18 +858,6 @@ public class StreamsGroup implements Group {
state.set(newState);
}
- private void maybeUpdateConfiguredTopology() {
- if (topology.get().isPresent()) {
- final StreamsTopology streamsTopology = topology.get().get();
-
- log.info("[GroupId {}] Configuring the topology {}", groupId,
streamsTopology);
-
this.configuredTopology.set(Optional.of(InternalTopicManager.configureTopics(logContext,
streamsTopology, partitionMetadata)));
-
- } else {
- configuredTopology.set(Optional.empty());
- }
- }
-
/**
* Updates the tasks process IDs based on the old and the new member.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
index 4c1adeec839..7f8b504bab9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
@@ -24,6 +24,7 @@ import
org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.image.MetadataImage;
import java.util.ArrayList;
import java.util.Collections;
@@ -75,9 +76,9 @@ public class TargetAssignmentBuilder {
private Map<String, StreamsGroupMember> members = Map.of();
/**
- * The partition metadata.
+ * The metadata image.
*/
- private Map<String,
org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata =
Map.of();
+ private MetadataImage metadataImage = MetadataImage.EMPTY;
/**
* The existing target assignment.
@@ -157,15 +158,15 @@ public class TargetAssignmentBuilder {
}
/**
- * Adds the partition metadata to use.
+ * Adds the metadata image to use.
*
- * @param partitionMetadata The partition metadata.
+ * @param metadataImage The metadata image.
* @return This object.
*/
- public TargetAssignmentBuilder withPartitionMetadata(
- Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
partitionMetadata
+ public TargetAssignmentBuilder withMetadataImage(
+ MetadataImage metadataImage
) {
- this.partitionMetadata = partitionMetadata;
+ this.metadataImage = metadataImage;
return this;
}
@@ -273,7 +274,7 @@ public class TargetAssignmentBuilder {
Collections.unmodifiableMap(memberSpecs),
assignmentConfigs
),
- new TopologyMetadata(partitionMetadata,
topology.subtopologies().get())
+ new TopologyMetadata(metadataImage,
topology.subtopologies().get())
);
} else {
newGroupAssignment = new GroupAssignment(
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
index f4fa3dc7aa7..bd07156041f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
@@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import java.util.Objects;
@@ -46,11 +45,4 @@ public record TopicMetadata(Uuid id, String name, int
numPartitions) {
throw new IllegalArgumentException("Number of partitions must be
positive.");
}
}
-
- public static TopicMetadata
fromRecord(StreamsGroupPartitionMetadataValue.TopicMetadata record) {
- return new TopicMetadata(
- record.topicId(),
- record.topicName(),
- record.numPartitions());
- }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
index d1119cfe011..0241083233b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
@@ -18,10 +18,11 @@ package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.SortedMap;
@@ -31,25 +32,22 @@ import java.util.stream.Stream;
* The topology metadata class is used by the {@link
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic
and
* partition metadata for the topology that the streams group using.
*
- * @param topicMetadata The topic Ids mapped to their corresponding {@link
TopicMetadata} object, which contains topic and partition
- * metadata.
+ * @param metadataImage The metadata image
* @param subtopologyMap The configured subtopologies
*/
-public record TopologyMetadata(Map<String, TopicMetadata> topicMetadata,
SortedMap<String, ConfiguredSubtopology> subtopologyMap) implements
TopologyDescriber {
+public record TopologyMetadata(MetadataImage metadataImage, SortedMap<String,
ConfiguredSubtopology> subtopologyMap) implements TopologyDescriber {
public TopologyMetadata {
- topicMetadata =
Objects.requireNonNull(Collections.unmodifiableMap(topicMetadata));
+ metadataImage = Objects.requireNonNull(metadataImage);
subtopologyMap =
Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap));
}
/**
- * Map of topic names to topic metadata.
- *
- * @return The map of topic Ids to topic metadata.
+ * @return The metadata image in topology metadata.
*/
@Override
- public Map<String, TopicMetadata> topicMetadata() {
- return this.topicMetadata;
+ public MetadataImage metadataImage() {
+ return this.metadataImage;
}
/**
@@ -90,7 +88,13 @@ public record TopologyMetadata(Map<String, TopicMetadata>
topicMetadata, SortedM
return Stream.concat(
subtopology.sourceTopics().stream(),
subtopology.repartitionSourceTopics().keySet().stream()
- ).map(topic ->
this.topicMetadata.get(topic).numPartitions()).max(Integer::compareTo).orElseThrow(
+ ).map(topic -> {
+ TopicImage topicImage = metadataImage.topics().getTopic(topic);
+ if (topicImage == null) {
+ throw new IllegalStateException("Topic " + topic + " not found
in metadata image");
+ }
+ return topicImage.partitions().size();
+ }).max(Integer::compareTo).orElseThrow(
() -> new IllegalStateException("Subtopology does not contain any
source topics")
);
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
index 93e03050b4d..85e3ba53db4 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
@@ -31,6 +31,7 @@ import java.util.SortedMap;
*
* @param topologyEpoch The epoch of the topology. Same as the
topology epoch in the heartbeat request that last initialized
* the topology.
+ * @param metadataHash The metadata hash of the group.
* @param subtopologies Contains the subtopologies that have
been configured. This can be used by the task assignors, since it
* specifies the number of tasks available
for every subtopology. Undefined if topology configuration
* failed.
@@ -41,6 +42,7 @@ import java.util.SortedMap;
* reported back to the client.
*/
public record ConfiguredTopology(int topologyEpoch,
+ long metadataHash,
Optional<SortedMap<String,
ConfiguredSubtopology>> subtopologies,
Map<String, CreatableTopic>
internalTopicsToBeCreated,
Optional<TopicConfigurationException>
topicConfigurationException) {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
index ea3eca20935..09876efd804 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
@@ -20,7 +20,8 @@ package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
-import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
import java.util.ArrayList;
import java.util.Collections;
@@ -36,14 +37,15 @@ public class EndpointToPartitionsManager {
public static StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions(final StreamsGroupMember streamsGroupMember,
final StreamsGroupHeartbeatResponseData.Endpoint
responseEndpoint,
-
final StreamsGroup streamsGroup) {
+
final StreamsGroup streamsGroup,
+
final MetadataImage metadataImage) {
StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions = new
StreamsGroupHeartbeatResponseData.EndpointToPartitions();
Map<String, Set<Integer>> activeTasks =
streamsGroupMember.assignedTasks().activeTasks();
Map<String, Set<Integer>> standbyTasks =
streamsGroupMember.assignedTasks().standbyTasks();
endpointToPartitions.setUserEndpoint(responseEndpoint);
Map<String, ConfiguredSubtopology> configuredSubtopologies =
streamsGroup.configuredTopology().flatMap(ConfiguredTopology::subtopologies).get();
- List<StreamsGroupHeartbeatResponseData.TopicPartition>
activeTopicPartitions = topicPartitions(activeTasks, configuredSubtopologies,
streamsGroup.partitionMetadata());
- List<StreamsGroupHeartbeatResponseData.TopicPartition>
standbyTopicPartitions = topicPartitions(standbyTasks, configuredSubtopologies,
streamsGroup.partitionMetadata());
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
activeTopicPartitions = topicPartitions(activeTasks, configuredSubtopologies,
metadataImage);
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
standbyTopicPartitions = topicPartitions(standbyTasks, configuredSubtopologies,
metadataImage);
endpointToPartitions.setActivePartitions(activeTopicPartitions);
endpointToPartitions.setStandbyPartitions(standbyTopicPartitions);
return endpointToPartitions;
@@ -51,7 +53,7 @@ public class EndpointToPartitionsManager {
private static List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitions(final Map<String, Set<Integer>> tasks,
final Map<String, ConfiguredSubtopology> configuredSubtopologies,
-
final Map<String, TopicMetadata> groupTopicMetadata) {
+
final MetadataImage metadataImage) {
List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitionsForTasks = new ArrayList<>();
for (Map.Entry<String, Set<Integer>> taskEntry : tasks.entrySet()) {
String subtopologyId = taskEntry.getKey();
@@ -60,7 +62,7 @@ public class EndpointToPartitionsManager {
Set<String> repartitionSourceTopics =
configuredSubtopology.repartitionSourceTopics().keySet();
Set<String> allSourceTopic = new HashSet<>(sourceTopics);
allSourceTopic.addAll(repartitionSourceTopics);
- List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitionList = topicPartitionListForTask(taskEntry.getValue(),
allSourceTopic, groupTopicMetadata);
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitionList = topicPartitionListForTask(taskEntry.getValue(),
allSourceTopic, metadataImage);
topicPartitionsForTasks.addAll(topicPartitionList);
}
return topicPartitionsForTasks;
@@ -68,9 +70,13 @@ public class EndpointToPartitionsManager {
private static List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitionListForTask(final Set<Integer> taskSet,
final Set<String> topicNames,
-
final Map<String, TopicMetadata> groupTopicMetadata) {
+
final MetadataImage metadataImage) {
return topicNames.stream().map(topic -> {
- int numPartitionsForTopic =
groupTopicMetadata.get(topic).numPartitions();
+ TopicImage topicImage = metadataImage.topics().getTopic(topic);
+ if (topicImage == null) {
+ throw new IllegalStateException("Topic " + topic + " not found
in metadata image");
+ }
+ int numPartitionsForTopic = topicImage.partitions().size();
StreamsGroupHeartbeatResponseData.TopicPartition tp = new
StreamsGroupHeartbeatResponseData.TopicPartition();
tp.setTopic(topic);
List<Integer> tpPartitions = new ArrayList<>(taskSet);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
index 1d14d9a8477..490289c2c85 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
@@ -22,7 +22,8 @@ import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCon
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
-import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
import org.slf4j.Logger;
@@ -46,17 +47,19 @@ import java.util.stream.Stream;
public class InternalTopicManager {
/**
- * Configures the internal topics for the given topology. Given a topology
and the topic metadata, this method determines the number of
+ * Configures the internal topics for the given topology. Given a topology
and the topics image, this method determines the number of
* partitions for all internal topics and returns a {@link
ConfiguredTopology} object.
*
- * @param logContext The log context.
- * @param topology The topology.
- * @param topicMetadata The topic metadata.
+ * @param logContext The log context.
+ * @param metadataHash The metadata hash of the group.
+ * @param topology The topology.
+ * @param topicsImage The topics image.
* @return The configured topology.
*/
public static ConfiguredTopology configureTopics(LogContext logContext,
+ long metadataHash,
StreamsTopology topology,
- Map<String,
TopicMetadata> topicMetadata) {
+ TopicsImage topicsImage) {
final Logger log = logContext.logger(InternalTopicManager.class);
final Collection<StreamsGroupTopologyValue.Subtopology> subtopologies
= topology.subtopologies().values();
@@ -70,10 +73,10 @@ public class InternalTopicManager {
try {
Optional<TopicConfigurationException> topicConfigurationException
= Optional.empty();
- throwOnMissingSourceTopics(topology, topicMetadata);
+ throwOnMissingSourceTopics(topology, topicsImage);
Map<String, Integer> decidedPartitionCountsForInternalTopics =
- decidePartitionCounts(logContext, topology, topicMetadata,
copartitionGroupsBySubtopology, log);
+ decidePartitionCounts(logContext, topology, topicsImage,
copartitionGroupsBySubtopology, log);
final SortedMap<String, ConfiguredSubtopology>
configuredSubtopologies =
subtopologies.stream()
@@ -86,7 +89,7 @@ public class InternalTopicManager {
TreeMap::new
));
- Map<String, CreatableTopic> internalTopicsToCreate =
missingInternalTopics(configuredSubtopologies, topicMetadata);
+ Map<String, CreatableTopic> internalTopicsToCreate =
missingInternalTopics(configuredSubtopologies, topology, topicsImage);
if (!internalTopicsToCreate.isEmpty()) {
topicConfigurationException =
Optional.of(TopicConfigurationException.missingInternalTopics(
"Internal topics are missing: " +
internalTopicsToCreate.keySet()
@@ -99,6 +102,7 @@ public class InternalTopicManager {
return new ConfiguredTopology(
topology.topologyEpoch(),
+ metadataHash,
Optional.of(configuredSubtopologies),
internalTopicsToCreate,
topicConfigurationException
@@ -109,6 +113,7 @@ public class InternalTopicManager {
topology.topologyEpoch(), e.toString());
return new ConfiguredTopology(
topology.topologyEpoch(),
+ metadataHash,
Optional.empty(),
Map.of(),
Optional.of(e)
@@ -117,11 +122,11 @@ public class InternalTopicManager {
}
private static void throwOnMissingSourceTopics(final StreamsTopology
topology,
- final Map<String,
TopicMetadata> topicMetadata) {
+ final TopicsImage
topicsImage) {
TreeSet<String> sortedMissingTopics = new TreeSet<>();
for (StreamsGroupTopologyValue.Subtopology subtopology :
topology.subtopologies().values()) {
for (String sourceTopic : subtopology.sourceTopics()) {
- if (!topicMetadata.containsKey(sourceTopic)) {
+ if (topicsImage.getTopic(sourceTopic) == null) {
sortedMissingTopics.add(sourceTopic);
}
}
@@ -134,12 +139,12 @@ public class InternalTopicManager {
private static Map<String, Integer> decidePartitionCounts(final LogContext
logContext,
final
StreamsTopology topology,
- final
Map<String, TopicMetadata> topicMetadata,
+ final
TopicsImage topicsImage,
final
Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology,
final Logger
log) {
final Map<String, Integer> decidedPartitionCountsForInternalTopics =
new HashMap<>();
final Function<String, OptionalInt> topicPartitionCountProvider =
- topic -> getPartitionCount(topicMetadata, topic,
decidedPartitionCountsForInternalTopics);
+ topic -> getPartitionCount(topicsImage, topic,
decidedPartitionCountsForInternalTopics);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
logContext,
topology.subtopologies().values(),
@@ -190,7 +195,8 @@ public class InternalTopicManager {
}
private static Map<String, CreatableTopic>
missingInternalTopics(Map<String, ConfiguredSubtopology> subtopologyMap,
-
Map<String, TopicMetadata> topicMetadata) {
+
StreamsTopology topology,
+
TopicsImage topicsImage) {
final Map<String, CreatableTopic> topicsToCreate = new HashMap<>();
for (ConfiguredSubtopology subtopology : subtopologyMap.values()) {
@@ -199,31 +205,34 @@ public class InternalTopicManager {
subtopology.stateChangelogTopics().values()
.forEach(x -> topicsToCreate.put(x.name(),
toCreatableTopic(x)));
}
- for (Map.Entry<String, TopicMetadata> topic :
topicMetadata.entrySet()) {
- final TopicMetadata existingTopic = topic.getValue();
- final CreatableTopic expectedTopic =
topicsToCreate.remove(topic.getKey());
+ for (String topic : topology.requiredTopics()) {
+ TopicImage topicImage = topicsImage.getTopic(topic);
+ if (topicImage == null) {
+ continue;
+ }
+ final CreatableTopic expectedTopic = topicsToCreate.remove(topic);
if (expectedTopic != null) {
- if (existingTopic.numPartitions() !=
expectedTopic.numPartitions()) {
- throw
TopicConfigurationException.incorrectlyPartitionedTopics("Existing topic " +
topic.getKey() + " has different"
- + " number of partitions: expected " +
expectedTopic.numPartitions() + ", found " + existingTopic.numPartitions());
+ if (topicImage.partitions().size() !=
expectedTopic.numPartitions()) {
+ throw
TopicConfigurationException.incorrectlyPartitionedTopics("Existing topic " +
topic + " has different"
+ + " number of partitions: expected " +
expectedTopic.numPartitions() + ", found " + topicImage.partitions().size());
}
}
}
return topicsToCreate;
}
- private static OptionalInt getPartitionCount(Map<String, TopicMetadata>
topicMetadata,
+ private static OptionalInt getPartitionCount(TopicsImage topicsImage,
String topic,
Map<String, Integer>
decidedPartitionCountsForInternalTopics) {
- final TopicMetadata metadata = topicMetadata.get(topic);
- if (metadata == null) {
+ final TopicImage topicImage = topicsImage.getTopic(topic);
+ if (topicImage == null) {
if (decidedPartitionCountsForInternalTopics.containsKey(topic)) {
return
OptionalInt.of(decidedPartitionCountsForInternalTopics.get(topic));
} else {
return OptionalInt.empty();
}
} else {
- return OptionalInt.of(metadata.numPartitions());
+ return OptionalInt.of(topicImage.partitions().size());
}
}
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
deleted file mode 100644
index cb82e930a09..00000000000
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
+++ /dev/null
@@ -1,27 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
-{
- "apiKey": 18,
- "type": "coordinator-key",
- "name": "StreamsGroupPartitionMetadataKey",
- "validVersions": "0",
- "flexibleVersions": "none",
- "fields": [
- { "name": "GroupId", "type": "string", "versions": "0",
- "about": "The group ID." }
- ]
-}
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
deleted file mode 100644
index f9be55b9e42..00000000000
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// The streams rebalance protocol is in development. This schema is subject to
non-backwards-compatible changes.
-{
- "apiKey": 18,
- "type": "coordinator-value",
- "name": "StreamsGroupPartitionMetadataValue",
- "validVersions": "0",
- "flexibleVersions": "0+",
- "fields": [
- { "name": "Topics", "versions": "0+", "type": "[]TopicMetadata",
- "about": "The list of topic metadata.", "fields": [
- { "name": "TopicId", "versions": "0+", "type": "uuid",
- "about": "The topic ID." },
- { "name": "TopicName", "versions": "0+", "type": "string",
- "about": "The topic name." },
- { "name": "NumPartitions", "versions": "0+", "type": "int32",
- "about": "The number of partitions of the topic." }
- ]}
- ]
-}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index a90ffd73a04..d58c6b6b1ac 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -77,8 +77,6 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -1006,60 +1004,6 @@ public class GroupCoordinatorShardTest {
verify(groupMetadataManager).replay(key, null);
}
- @Test
- public void testReplayStreamsGroupPartitionMetadata() {
- GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
- OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
- CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
- CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
- GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
- new LogContext(),
- groupMetadataManager,
- offsetMetadataManager,
- Time.SYSTEM,
- new MockCoordinatorTimer<>(Time.SYSTEM),
- mock(GroupCoordinatorConfig.class),
- coordinatorMetrics,
- metricsShard
- );
-
- StreamsGroupPartitionMetadataKey key = new
StreamsGroupPartitionMetadataKey();
- StreamsGroupPartitionMetadataValue value = new
StreamsGroupPartitionMetadataValue();
-
- coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
- key,
- new ApiMessageAndVersion(value, (short) 0)
- ));
-
- verify(groupMetadataManager).replay(key, value);
- }
-
- @Test
- public void testReplayStreamsGroupPartitionMetadataWithNullValue() {
- GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
- OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
- CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
- CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
- GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
- new LogContext(),
- groupMetadataManager,
- offsetMetadataManager,
- Time.SYSTEM,
- new MockCoordinatorTimer<>(Time.SYSTEM),
- mock(GroupCoordinatorConfig.class),
- coordinatorMetrics,
- metricsShard
- );
-
- StreamsGroupPartitionMetadataKey key = new
StreamsGroupPartitionMetadataKey();
-
- coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.tombstone(
- key
- ));
-
- verify(groupMetadataManager).replay(key, null);
- }
-
@Test
public void testReplayStreamsGroupMemberMetadata() {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index fc95e78e9eb..55cc5f7e48a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -125,6 +125,7 @@ import
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRol
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@@ -16030,7 +16031,6 @@ public class GroupMetadataManagerTest {
.build())
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
- .withPartitionMetadata(Map.of())
)
.build();
@@ -16066,12 +16066,17 @@ public class GroupMetadataManagerTest {
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+ long groupMetadataHash = computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ ));
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .addTopic(barTopicId, barTopicName, 3)
- .build())
+ .withMetadataImage(metadataImage)
.build();
assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
@@ -16127,11 +16132,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- )),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1,
groupMetadataHash),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -16158,12 +16159,14 @@ public class GroupMetadataManagerTest {
new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build();
+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .build())
+ .withMetadataImage(metadataImage)
.build();
// Member joins the streams group.
@@ -16210,12 +16213,9 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
- Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6)
- )
- ),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -16240,12 +16240,14 @@ public class GroupMetadataManagerTest {
)
);
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build();
+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .build())
+ .withMetadataImage(metadataImage)
.build();
// Member joins the streams group.
@@ -16297,10 +16299,9 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6)
- )),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -16326,13 +16327,14 @@ public class GroupMetadataManagerTest {
)
);
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .addTopic(barTopicId, barTopicName, 3)
- .build())
+ .withMetadataImage(metadataImage)
.build();
// Member joins the streams group.
@@ -16379,11 +16381,10 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- )),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ ))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -16414,13 +16415,14 @@ public class GroupMetadataManagerTest {
)
);
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .addTopic(barTopicId, barTopicName, 3)
- .build())
+ .withMetadataImage(metadataImage)
.withStreamsGroup(
new StreamsGroupBuilder(groupId, 10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology1))
@@ -16474,11 +16476,10 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- )),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ ))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -16499,12 +16500,15 @@ public class GroupMetadataManagerTest {
new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build();
+ long groupMetadataHash = computeGroupHash(Map.of(fooTopicName,
computeTopicHash(fooTopicName, metadataImage)));
+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .build())
+ .withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@@ -16526,9 +16530,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
- .withPartitionMetadata(Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6)
- ))
+ .withMetadataHash(groupMetadataHash)
)
.build();
@@ -16593,12 +16595,14 @@ public class GroupMetadataManagerTest {
new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .build();
+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 2)
- .build())
+ .withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@@ -16612,9 +16616,7 @@ public class GroupMetadataManagerTest {
.build())
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
- .withPartitionMetadata(Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6)
- ))
+ .withMetadataHash(computeGroupHash(Map.of(fooTopicName,
computeTopicHash(fooTopicName, metadataImage))))
)
.build();
@@ -16688,13 +16690,19 @@ public class GroupMetadataManagerTest {
new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+ long groupMetadataHash = computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ ));
+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .addTopic(barTopicId, barTopicName, 3)
- .build())
+ .withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@@ -16707,10 +16715,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
- .withPartitionMetadata(Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- ))
+ .withMetadataHash(groupMetadataHash)
)
.build();
@@ -16759,7 +16764,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11,
groupMetadataHash),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -16788,13 +16793,24 @@ public class GroupMetadataManagerTest {
new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
+ MetadataImage newMetadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, changedPartitionCount)
+ .build();
+
+ MetadataImage oldMetadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+ long oldGroupMetadataHash = computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, oldMetadataImage),
+ barTopicName, computeTopicHash(barTopicName, oldMetadataImage)
+ ));
+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .addTopic(barTopicId, barTopicName, changedPartitionCount)
- .build())
+ .withMetadataImage(newMetadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@@ -16807,10 +16823,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
- .withPartitionMetadata(Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- ))
+ .withMetadataHash(oldGroupMetadataHash)
)
.build();
@@ -16857,11 +16870,10 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, changedPartitionCount)
- )),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, newMetadataImage),
+ barTopicName, computeTopicHash(barTopicName, newMetadataImage)
+ ))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -16891,6 +16903,11 @@ public class GroupMetadataManagerTest {
new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
@@ -16922,10 +16939,10 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
- .withPartitionMetadata(Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- ))
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName,
metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ )))
)
.build();
@@ -17128,13 +17145,19 @@ public class GroupMetadataManagerTest {
new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+ long groupMetadataHash = computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ ));
+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .addTopic(barTopicId, barTopicName, 3)
- .build())
+ .withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
.setMemberEpoch(10)
@@ -17158,10 +17181,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5),
TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
.withTargetAssignmentEpoch(10)
- .withPartitionMetadata(Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- ))
+ .withMetadataHash(groupMetadataHash)
)
.build();
@@ -17582,9 +17602,19 @@ public class GroupMetadataManagerTest {
new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+ long groupMetadataHash = computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ ));
+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10))
.build();
@@ -17593,16 +17623,16 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberId1)
.build()));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
11, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
11, groupMetadataHash));
assertEquals(StreamsGroupState.NOT_READY,
context.streamsGroupState(groupId));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
- Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- )
- ));
+ context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
+ .setConfiguredTopology(InternalTopicManager.configureTopics(
+ new LogContext(),
+ groupMetadataHash,
+
StreamsTopology.fromRecord(StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(topology)),
+ metadataImage.topics()));
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
context.streamsGroupState(groupId));
@@ -17688,12 +17718,14 @@ public class GroupMetadataManagerTest {
new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build();
+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .build())
+ .withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId)
.setMemberEpoch(10)
@@ -17705,11 +17737,11 @@ public class GroupMetadataManagerTest {
.withTargetAssignment(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
.withTargetAssignmentEpoch(10)
- .withPartitionMetadata(
+ .withMetadataHash(computeGroupHash(Map.of(
// foo only has 3 tasks stored in the metadata but foo has
// 6 partitions the metadata image.
- Map.of(fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 3))
- ))
+ fooTopicName, computeTopicHash(fooTopicName, new
MetadataImageBuilder().addTopic(fooTopicId, fooTopicName, 3).build())
+ ))))
.build();
// The metadata refresh flag should be true.
@@ -17753,10 +17785,9 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
- Map.of(fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6))
- ),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)
@@ -17784,12 +17815,14 @@ public class GroupMetadataManagerTest {
new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build();
+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
- .withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
- .build())
+ .withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId)
.setMemberEpoch(10)
@@ -17801,11 +17834,11 @@ public class GroupMetadataManagerTest {
.withTargetAssignment(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
.withTargetAssignmentEpoch(10)
- .withPartitionMetadata(
+ .withMetadataHash(computeGroupHash(Map.of(
// foo only has 3 partitions stored in the metadata but
foo has
// 6 partitions the metadata image.
- Map.of(fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 3))
- ))
+ fooTopicName, computeTopicHash(fooTopicName, new
MetadataImageBuilder().addTopic(fooTopicId, fooTopicName, 3).build())
+ ))))
.build();
// The metadata refresh flag should be true.
@@ -17870,10 +17903,9 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
- Map.of(fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6))
- ),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)
@@ -19156,45 +19188,6 @@ public class GroupMetadataManagerTest {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
}
- @Test
- public void testReplayStreamsGroupPartitionMetadata() {
- GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
- .build();
-
- Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
metadata = Map.of(
- "bar",
- new
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(),
"bar", 10)
- );
-
- // The group is created if it does not exist.
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
metadata));
- assertEquals(metadata,
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
- }
-
- @Test
- public void testReplayStreamsGroupPartitionMetadataTombstoneNotExisting() {
- GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
- .build();
-
- // The group may not exist at all. Replaying the
StreamsGroupPartitionMetadata tombstone
- // should be a no-op.
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
- assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
- }
-
- @Test
- public void testReplayStreamsGroupPartitionMetadataTombstoneExisting() {
- GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
- .withStreamsGroup(new StreamsGroupBuilder("foo",
10).withPartitionMetadata(
- Map.of("topic1", new
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(),
"topic1", 10))
- ))
- .build();
-
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
-
-
assertTrue(context.groupMetadataManager.streamsGroup("foo").partitionMetadata().isEmpty());
- }
-
@Test
public void testReplayStreamsGroupTargetAssignmentMember() {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index d1bbc012717..c71225f86fd 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -97,8 +97,6 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -118,6 +116,7 @@ import
org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -558,7 +557,18 @@ public class GroupMetadataManagerTestContext {
consumerGroupBuilders.forEach(builder ->
builder.build().forEach(context::replay));
shareGroupBuilders.forEach(builder ->
builder.build(metadataImage.topics()).forEach(context::replay));
- streamsGroupBuilders.forEach(builder ->
builder.build().forEach(context::replay));
+ streamsGroupBuilders.forEach(builder -> {
+ builder.build().forEach(context::replay);
+ StreamsGroup group =
context.groupMetadataManager.getStreamsGroupOrThrow(builder.groupId());
+ if (group.topology().isPresent()) {
+
group.setConfiguredTopology(InternalTopicManager.configureTopics(
+ new LogContext(),
+ 0,
+ group.topology().get(),
+ metadataImage.topics())
+ );
+ }
+ });
context.commit();
@@ -1744,13 +1754,6 @@ public class GroupMetadataManagerTestContext {
);
break;
- case STREAMS_GROUP_PARTITION_METADATA:
- groupMetadataManager.replay(
- (StreamsGroupPartitionMetadataKey) key,
- (StreamsGroupPartitionMetadataValue) messageOrNull(value)
- );
- break;
-
case STREAMS_GROUP_TARGET_ASSIGNMENT_MEMBER:
groupMetadataManager.replay(
(StreamsGroupTargetAssignmentMemberKey) key,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
index 0509b74d330..2485cb65e6f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.coordinator.group.streams;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
@@ -26,8 +25,6 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataVa
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue.TaskIds;
@@ -252,47 +249,6 @@ class StreamsCoordinatorRecordHelpersTest {
assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(GROUP_ID,
MEMBER_ID));
}
- @Test
- public void testNewStreamsGroupPartitionMetadataRecord() {
- Uuid uuid1 = Uuid.randomUuid();
- Uuid uuid2 = Uuid.randomUuid();
- Map<String, TopicMetadata> newPartitionMetadata = Map.of(
- TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1),
- TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2)
- );
-
- StreamsGroupPartitionMetadataValue value = new
StreamsGroupPartitionMetadataValue();
- value.topics().add(new
StreamsGroupPartitionMetadataValue.TopicMetadata()
- .setTopicId(uuid1)
- .setTopicName(TOPIC_1)
- .setNumPartitions(1)
- );
- value.topics().add(new
StreamsGroupPartitionMetadataValue.TopicMetadata()
- .setTopicId(uuid2)
- .setTopicName(TOPIC_2)
- .setNumPartitions(2)
- );
-
- CoordinatorRecord expectedRecord = CoordinatorRecord.record(
- new StreamsGroupPartitionMetadataKey()
- .setGroupId(GROUP_ID),
- new ApiMessageAndVersion(value, (short) 0)
- );
-
- assertEquals(expectedRecord,
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(GROUP_ID,
newPartitionMetadata));
- }
-
- @Test
- public void testNewStreamsGroupPartitionMetadataTombstoneRecord() {
- CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
- new StreamsGroupPartitionMetadataKey()
- .setGroupId(GROUP_ID)
- );
-
- assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(GROUP_ID));
- }
-
@Test
public void testNewStreamsGroupEpochRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
@@ -717,27 +673,6 @@ class StreamsCoordinatorRecordHelpersTest {
assertEquals("memberId should not be null here",
exception.getMessage());
}
- @Test
- public void testNewStreamsGroupPartitionMetadataRecordNullGroupId() {
- NullPointerException exception =
assertThrows(NullPointerException.class, () ->
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(null,
Map.of()));
- assertEquals("groupId should not be null here",
exception.getMessage());
- }
-
- @Test
- public void
testNewStreamsGroupPartitionMetadataRecordNullNewPartitionMetadata() {
- NullPointerException exception =
assertThrows(NullPointerException.class, () ->
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("groupId",
null));
- assertEquals("newPartitionMetadata should not be null here",
exception.getMessage());
- }
-
- @Test
- public void
testNewStreamsGroupPartitionMetadataTombstoneRecordNullGroupId() {
- NullPointerException exception =
assertThrows(NullPointerException.class, () ->
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(null));
- assertEquals("groupId should not be null here",
exception.getMessage());
- }
-
@Test
public void testNewStreamsGroupEpochRecordNullGroupId() {
NullPointerException exception =
assertThrows(NullPointerException.class, () ->
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index a3dc088badc..5d291d9884d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -34,7 +34,7 @@ public class StreamsGroupBuilder {
private StreamsTopology topology;
private final Map<String, StreamsGroupMember> members = new HashMap<>();
private final Map<String, TasksTuple> targetAssignments = new HashMap<>();
- private Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
+ private long metadataHash = 0L;
public StreamsGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
@@ -48,8 +48,8 @@ public class StreamsGroupBuilder {
return this;
}
- public StreamsGroupBuilder withPartitionMetadata(Map<String,
TopicMetadata> partitionMetadata) {
- this.partitionMetadata = partitionMetadata;
+ public StreamsGroupBuilder withMetadataHash(long metadataHash) {
+ this.metadataHash = metadataHash;
return this;
}
@@ -77,15 +77,9 @@ public class StreamsGroupBuilder {
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member))
);
- if (!partitionMetadata.isEmpty()) {
- records.add(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
- partitionMetadata));
- }
-
// Add group epoch record.
records.add(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch,
0));
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch,
metadataHash));
// Add target assignment records.
targetAssignments.forEach((memberId, assignment) ->
@@ -115,4 +109,8 @@ public class StreamsGroupBuilder {
return records;
}
+
+ public String groupId() {
+ return groupId;
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 99e13bbf155..0bd8caf3bbd 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
@@ -37,7 +38,6 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAss
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
@@ -47,18 +47,15 @@ import
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState
import
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
-import org.apache.kafka.image.TopicImage;
-import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
-import org.mockito.MockedStatic;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -66,6 +63,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.TreeMap;
import java.util.stream.Collectors;
import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
@@ -79,11 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
public class StreamsGroupTest {
@@ -505,6 +499,7 @@ public class StreamsGroupTest {
assertEquals(StreamsGroup.StreamsGroupState.NOT_READY,
streamsGroup.state());
streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
+ streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0,
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
assertEquals(MemberState.STABLE, member1.state());
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
streamsGroup.state());
@@ -702,6 +697,7 @@ public class StreamsGroupTest {
);
group.setGroupEpoch(1);
group.setTopology(new StreamsTopology(1, Map.of()));
+ group.setConfiguredTopology(new ConfiguredTopology(1, 0,
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
group.setTargetAssignmentEpoch(1);
group.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(1)
@@ -767,6 +763,7 @@ public class StreamsGroupTest {
assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);
streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
+ streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0,
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
assertEquals(StreamsGroup.StreamsGroupState.RECONCILING,
streamsGroup.state());
assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);
@@ -811,6 +808,7 @@ public class StreamsGroupTest {
group.setGroupEpoch(1);
group.setTopology(new StreamsTopology(1, Map.of()));
+ group.setConfiguredTopology(new ConfiguredTopology(1, 0,
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
group.setTargetAssignmentEpoch(1);
group.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(1)
@@ -907,109 +905,7 @@ public class StreamsGroupTest {
}
@Test
- public void testSetTopologyUpdatesStateAndConfiguredTopology() {
- SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
- GroupCoordinatorMetricsShard metricsShard =
mock(GroupCoordinatorMetricsShard.class);
- StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT,
snapshotRegistry, "test-group", metricsShard);
-
- StreamsTopology topology = new StreamsTopology(1, Map.of());
-
- ConfiguredTopology topo = mock(ConfiguredTopology.class);
- when(topo.isReady()).thenReturn(true);
-
- try (MockedStatic<InternalTopicManager> mocked =
mockStatic(InternalTopicManager.class)) {
- mocked.when(() -> InternalTopicManager.configureTopics(any(),
eq(topology), eq(Map.of()))).thenReturn(topo);
- streamsGroup.setTopology(topology);
- mocked.verify(() -> InternalTopicManager.configureTopics(any(),
eq(topology), eq(Map.of())));
- }
-
- Optional<ConfiguredTopology> configuredTopology =
streamsGroup.configuredTopology();
- assertTrue(configuredTopology.isPresent(), "Configured topology should
be present");
- assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
-
- streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
- .setMemberEpoch(1)
- .build());
-
- assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
- }
-
- @Test
- public void
testSetTopologyUpdatesStateAndConfiguredTopologyWithPreviousCallToSetMetadata()
{
- Uuid topicUuid = Uuid.randomUuid();
- SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
- GroupCoordinatorMetricsShard metricsShard =
mock(GroupCoordinatorMetricsShard.class);
- StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT,
snapshotRegistry, "test-group", metricsShard);
-
- assertEquals(StreamsGroup.StreamsGroupState.EMPTY,
streamsGroup.state());
-
- Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
- partitionMetadata.put("topic1", new TopicMetadata(topicUuid, "topic1",
1));
-
- try (MockedStatic<InternalTopicManager> mocked =
mockStatic(InternalTopicManager.class)) {
- streamsGroup.setPartitionMetadata(partitionMetadata);
- mocked.verify(() -> InternalTopicManager.configureTopics(any(),
any(), any()), never());
- }
-
- assertTrue(streamsGroup.configuredTopology().isEmpty(), "Configured
topology should not be present");
- assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
-
- StreamsTopology topology = new StreamsTopology(1, Map.of());
- ConfiguredTopology topo = mock(ConfiguredTopology.class);
- when(topo.isReady()).thenReturn(true);
- try (MockedStatic<InternalTopicManager> mocked =
mockStatic(InternalTopicManager.class)) {
- mocked.when(() -> InternalTopicManager.configureTopics(any(),
eq(topology), eq(partitionMetadata))).thenReturn(topo);
- streamsGroup.setTopology(topology);
- mocked.verify(() -> InternalTopicManager.configureTopics(any(),
eq(topology), eq(partitionMetadata)));
- }
- }
-
- @Test
- public void testSetPartitionMetadataUpdatesStateAndConfiguredTopology() {
- Uuid topicUuid = Uuid.randomUuid();
- SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
- GroupCoordinatorMetricsShard metricsShard =
mock(GroupCoordinatorMetricsShard.class);
- StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT,
snapshotRegistry, "test-group", metricsShard);
-
- assertEquals(StreamsGroup.StreamsGroupState.EMPTY,
streamsGroup.state());
-
- Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
- partitionMetadata.put("topic1", new TopicMetadata(topicUuid, "topic1",
1));
-
- try (MockedStatic<InternalTopicManager> mocked =
mockStatic(InternalTopicManager.class)) {
- streamsGroup.setPartitionMetadata(partitionMetadata);
- mocked.verify(() -> InternalTopicManager.configureTopics(any(),
any(), any()), never());
- }
-
- assertTrue(streamsGroup.configuredTopology().isEmpty(), "Configured
topology should not be present");
- assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
-
- StreamsTopology topology = new StreamsTopology(1, Map.of());
- streamsGroup.setTopology(topology);
- ConfiguredTopology topo = mock(ConfiguredTopology.class);
- when(topo.isReady()).thenReturn(true);
-
- try (MockedStatic<InternalTopicManager> mocked =
mockStatic(InternalTopicManager.class)) {
- mocked.when(() -> InternalTopicManager.configureTopics(any(),
eq(topology), eq(partitionMetadata))).thenReturn(topo);
- streamsGroup.setPartitionMetadata(partitionMetadata);
- mocked.verify(() -> InternalTopicManager.configureTopics(any(),
eq(topology), eq(partitionMetadata)));
- }
-
- Optional<ConfiguredTopology> configuredTopology =
streamsGroup.configuredTopology();
- assertTrue(configuredTopology.isPresent(), "Configured topology should
be present");
- assertEquals(topo, configuredTopology.get());
- assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
- assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
-
- streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
- .setMemberEpoch(1)
- .build());
-
- assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
- }
-
- @Test
- public void testComputePartitionMetadata() {
+ public void testComputeMetadataHash() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
StreamsGroup streamsGroup = new StreamsGroup(
LOG_CONTEXT,
@@ -1017,24 +913,17 @@ public class StreamsGroupTest {
"group-foo",
mock(GroupCoordinatorMetricsShard.class)
);
- TopicsImage topicsImage = mock(TopicsImage.class);
- TopicImage topicImage = mock(TopicImage.class);
- when(topicImage.id()).thenReturn(Uuid.randomUuid());
- when(topicImage.name()).thenReturn("topic1");
- when(topicImage.partitions()).thenReturn(Collections.singletonMap(0,
null));
- when(topicsImage.getTopic("topic1")).thenReturn(topicImage);
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "topic1", 1)
+ .build();
+
StreamsTopology topology = mock(StreamsTopology.class);
when(topology.requiredTopics()).thenReturn(Set.of("topic1"));
- Map<String, TopicMetadata> partitionMetadata =
streamsGroup.computePartitionMetadata(topicsImage, topology);
-
- assertEquals(1, partitionMetadata.size());
- assertTrue(partitionMetadata.containsKey("topic1"));
- TopicMetadata topicMetadata = partitionMetadata.get("topic1");
- assertNotNull(topicMetadata);
- assertEquals(topicImage.id(), topicMetadata.id());
- assertEquals("topic1", topicMetadata.name());
- assertEquals(1, topicMetadata.numPartitions());
+ long metadataHash = streamsGroup.computeMetadataHash(metadataImage,
new HashMap<>(), topology);
+ // The metadata hash means no topic.
+ assertNotEquals(0, metadataHash);
}
@Test
@@ -1053,7 +942,7 @@ public class StreamsGroupTest {
streamsGroup.createGroupTombstoneRecords(records);
- assertEquals(7, records.size());
+ assertEquals(6, records.size());
for (CoordinatorRecord record : records) {
assertNotNull(record.key());
assertNull(record.value());
@@ -1061,7 +950,6 @@ public class StreamsGroupTest {
final Set<ApiMessage> keys =
records.stream().map(CoordinatorRecord::key).collect(Collectors.toSet());
assertTrue(keys.contains(new
StreamsGroupMetadataKey().setGroupId("test-group")));
assertTrue(keys.contains(new
StreamsGroupTargetAssignmentMetadataKey().setGroupId("test-group")));
- assertTrue(keys.contains(new
StreamsGroupPartitionMetadataKey().setGroupId("test-group")));
assertTrue(keys.contains(new
StreamsGroupTopologyKey().setGroupId("test-group")));
assertTrue(keys.contains(new
StreamsGroupMemberMetadataKey().setGroupId("test-group").setMemberId("member1")));
assertTrue(keys.contains(new
StreamsGroupTargetAssignmentMemberKey().setGroupId("test-group").setMemberId("member1")));
@@ -1079,28 +967,26 @@ public class StreamsGroupTest {
assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
- streamsGroup.setTopology(
- new StreamsTopology(1,
- Map.of("test-subtopology",
- new StreamsGroupTopologyValue.Subtopology()
- .setSubtopologyId("test-subtopology")
- .setSourceTopics(List.of("test-topic1"))
- .setRepartitionSourceTopics(List.of(new
StreamsGroupTopologyValue.TopicInfo().setName("test-topic2")))
- .setRepartitionSinkTopics(List.of("test-topic2"))
- )
- )
- );
+ StreamsTopology topology = new StreamsTopology(1,
+ Map.of("test-subtopology",
+ new StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("test-subtopology")
+ .setSourceTopics(List.of("test-topic1"))
+ .setRepartitionSourceTopics(List.of(new
StreamsGroupTopologyValue.TopicInfo().setName("test-topic2")))
+ .setRepartitionSinkTopics(List.of("test-topic2"))
+ ));
+ streamsGroup.setTopology(topology);
assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
- streamsGroup.setPartitionMetadata(
- Map.of(
- "test-topic1", new TopicMetadata(Uuid.randomUuid(),
"test-topic1", 1),
- "test-topic2", new TopicMetadata(Uuid.randomUuid(),
"test-topic2", 1)
- )
- );
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "test-topic1", 1)
+ .addTopic(Uuid.randomUuid(), "test-topic2", 1)
+ .build();
+
+
streamsGroup.setConfiguredTopology(InternalTopicManager.configureTopics(logContext,
0, topology, metadataImage.topics()));
assertTrue(streamsGroup.isSubscribedToTopic("test-topic1"));
assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index 8b51b1b58b4..b55b05d30d9 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -28,6 +28,7 @@ import
org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -659,7 +660,7 @@ public class TargetAssignmentBuilderTest {
private final int groupEpoch;
private final TaskAssignor assignor = mock(TaskAssignor.class);
private final SortedMap<String, ConfiguredSubtopology> subtopologies =
new TreeMap<>();
- private final ConfiguredTopology topology = new ConfiguredTopology(0,
Optional.of(subtopologies), new HashMap<>(),
+ private final ConfiguredTopology topology = new ConfiguredTopology(0,
0, Optional.of(subtopologies), new HashMap<>(),
Optional.empty());
private final Map<String, StreamsGroupMember> members = new
HashMap<>();
private final Map<String,
org.apache.kafka.coordinator.group.streams.TopicMetadata> subscriptionMetadata
= new HashMap<>();
@@ -711,11 +712,6 @@ public class TargetAssignmentBuilderTest {
) {
String subtopologyId = Uuid.randomUuid().toString();
Uuid topicId = Uuid.randomUuid();
- subscriptionMetadata.put(topicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(
- topicId,
- topicName,
- numTasks
- ));
topicsImageBuilder = topicsImageBuilder.addTopic(topicId,
topicName, numTasks);
subtopologies.put(subtopologyId, new
ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(),
Map.of()));
@@ -805,8 +801,10 @@ public class TargetAssignmentBuilderTest {
}
});
+ MetadataImage metadataImage = topicsImageBuilder.build();
+
// Prepare the expected topology metadata.
- TopologyMetadata topologyMetadata = new
TopologyMetadata(subscriptionMetadata, subtopologies);
+ TopologyMetadata topologyMetadata = new
TopologyMetadata(metadataImage, subtopologies);
// Prepare the expected assignment spec.
GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, new
HashMap<>());
@@ -822,7 +820,7 @@ public class TargetAssignmentBuilderTest {
.withMembers(members)
.withTopology(topology)
.withStaticMembers(staticMembers)
- .withPartitionMetadata(subscriptionMetadata)
+ .withMetadataImage(metadataImage)
.withTargetAssignment(targetAssignment);
// Add the updated members or delete the deleted members.
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
index 59712d5c954..6105a4dfe3b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
-import
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import org.junit.jupiter.api.Test;
@@ -72,18 +71,4 @@ public class TopicMetadataTest {
new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1));
assertEquals("Number of partitions must be positive.",
exception.getMessage());
}
-
- @Test
- public void testFromRecord() {
- StreamsGroupPartitionMetadataValue.TopicMetadata record = new
StreamsGroupPartitionMetadataValue.TopicMetadata()
- .setTopicId(Uuid.randomUuid())
- .setTopicName("test-topic")
- .setNumPartitions(3);
-
- TopicMetadata topicMetadata = TopicMetadata.fromRecord(record);
-
- assertEquals(record.topicId(), topicMetadata.id());
- assertEquals(record.topicName(), topicMetadata.name());
- assertEquals(record.numPartitions(), topicMetadata.numPartitions());
- }
}
\ No newline at end of file
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
index a5c18a6f0f2..a39914db1bc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
@@ -16,13 +16,15 @@
*/
package org.apache.kafka.coordinator.group.streams;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import
org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -40,20 +42,23 @@ import static org.mockito.Mockito.when;
class TopologyMetadataTest {
- private Map<String, TopicMetadata> topicMetadata;
+ private MetadataImage metadataImage;
private SortedMap<String, ConfiguredSubtopology> subtopologyMap;
private TopologyMetadata topologyMetadata;
@BeforeEach
void setUp() {
- topicMetadata = new HashMap<>();
+ metadataImage = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "source_topic", 3)
+ .addTopic(Uuid.randomUuid(), "repartition_source_topic", 4)
+ .build();
subtopologyMap = new TreeMap<>();
- topologyMetadata = new TopologyMetadata(topicMetadata, subtopologyMap);
+ topologyMetadata = new TopologyMetadata(metadataImage, subtopologyMap);
}
@Test
- void testTopicMetadata() {
- assertEquals(topicMetadata, topologyMetadata.topicMetadata());
+ void testMetadataImage() {
+ assertEquals(metadataImage, topologyMetadata.metadataImage());
}
@Test
@@ -83,13 +88,6 @@ class TopologyMetadataTest {
when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic"));
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic",
internalTopic));
- TopicMetadata topicMeta1 = mock(TopicMetadata.class);
- TopicMetadata topicMeta2 = mock(TopicMetadata.class);
- topicMetadata.put("source_topic", topicMeta1);
- topicMetadata.put("repartition_source_topic", topicMeta2);
- when(topicMeta1.numPartitions()).thenReturn(3);
- when(topicMeta2.numPartitions()).thenReturn(4);
-
assertEquals(4,
topologyMetadata.maxNumInputPartitions("subtopology1"));
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
index b298be7300a..eadaaeb1338 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
@@ -41,6 +41,7 @@ public class ConfiguredTopologyTest {
public void testConstructorWithNullSubtopologies() {
assertThrows(NullPointerException.class,
() -> new ConfiguredTopology(
+ 0,
0,
null,
Map.of(),
@@ -53,6 +54,7 @@ public class ConfiguredTopologyTest {
public void testConstructorWithNullInternalTopicsToBeCreated() {
assertThrows(NullPointerException.class,
() -> new ConfiguredTopology(
+ 0,
0,
Optional.of(new TreeMap<>()),
null,
@@ -65,6 +67,7 @@ public class ConfiguredTopologyTest {
public void testConstructorWithNullTopicConfigurationException() {
assertThrows(NullPointerException.class,
() -> new ConfiguredTopology(
+ 0,
0,
Optional.empty(),
Map.of(),
@@ -78,6 +81,7 @@ public class ConfiguredTopologyTest {
assertThrows(IllegalArgumentException.class,
() -> new ConfiguredTopology(
-1,
+ 0,
Optional.of(new TreeMap<>()),
Map.of(),
Optional.empty()
@@ -90,6 +94,7 @@ public class ConfiguredTopologyTest {
final IllegalArgumentException ex =
assertThrows(IllegalArgumentException.class,
() -> new ConfiguredTopology(
1,
+ 0,
Optional.empty(),
Map.of(),
Optional.empty()
@@ -101,11 +106,11 @@ public class ConfiguredTopologyTest {
@Test
public void testIsReady() {
ConfiguredTopology readyTopology = new ConfiguredTopology(
- 1, Optional.of(new TreeMap<>()), new HashMap<>(),
Optional.empty());
+ 1, 0, Optional.of(new TreeMap<>()), new HashMap<>(),
Optional.empty());
assertTrue(readyTopology.isReady());
ConfiguredTopology notReadyTopology = new ConfiguredTopology(
- 1, Optional.empty(), new HashMap<>(),
Optional.of(TopicConfigurationException.missingSourceTopics("missing")));
+ 1, 0, Optional.empty(), new HashMap<>(),
Optional.of(TopicConfigurationException.missingSourceTopics("missing")));
assertFalse(notReadyTopology.isReady());
}
@@ -120,7 +125,7 @@ public class ConfiguredTopologyTest {
Map<String, CreatableTopic> internalTopicsToBeCreated = new
HashMap<>();
Optional<TopicConfigurationException> topicConfigurationException =
Optional.empty();
ConfiguredTopology configuredTopology = new ConfiguredTopology(
- topologyEpoch, Optional.of(subtopologies),
internalTopicsToBeCreated, topicConfigurationException);
+ topologyEpoch, 0, Optional.of(subtopologies),
internalTopicsToBeCreated, topicConfigurationException);
StreamsGroupDescribeResponseData.Topology topology =
configuredTopology.asStreamsGroupDescribeTopology();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
index 2002774b60b..cba28e0163d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
@@ -19,10 +19,11 @@ package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TasksTuple;
-import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -79,16 +80,16 @@ class EndpointToPartitionsManagerTest {
@Test
void testEndpointToPartitionsWithStandbyTaskAssignments() {
- Map<String, TopicMetadata> topicMetadata = new HashMap<>();
- topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(),
"Topic-A", 3));
- topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(),
"Topic-B", 3));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "Topic-A", 3)
+ .addTopic(Uuid.randomUuid(), "Topic-B", 3)
+ .build();
activeTasks.put("0", Set.of(0, 1, 2));
standbyTasks.put("1", Set.of(0, 1, 2));
tasksTuple = new TasksTuple(activeTasks, standbyTasks,
Collections.emptyMap());
when(streamsGroupMember.assignedTasks()).thenReturn(tasksTuple);
//when(streamsGroupMember.assignedTasks().standbyTasks()).thenReturn(tasksTuple.standbyTasks());
- when((streamsGroup.partitionMetadata())).thenReturn(topicMetadata);
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
SortedMap<String, ConfiguredSubtopology> configuredSubtopologyMap =
new TreeMap<>();
configuredSubtopologyMap.put("0", configuredSubtopologyOne);
@@ -96,7 +97,7 @@ class EndpointToPartitionsManagerTest {
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyMap));
StreamsGroupHeartbeatResponseData.EndpointToPartitions result =
-
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember,
responseEndpoint, streamsGroup);
+
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember,
responseEndpoint, streamsGroup, metadataImage);
assertEquals(responseEndpoint, result.userEndpoint());
assertEquals(1, result.activePartitions().size());
@@ -123,20 +124,20 @@ class EndpointToPartitionsManagerTest {
List<Integer> topicBExpectedPartitions,
String
testName
) {
- Map<String, TopicMetadata> topicMetadata = new HashMap<>();
- topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(),
"Topic-A", topicAPartitions));
- topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(),
"Topic-B", topicBPartitions));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "Topic-A", topicAPartitions)
+ .addTopic(Uuid.randomUuid(), "Topic-B", topicBPartitions)
+ .build();
configuredSubtopologyOne = new ConfiguredSubtopology(Set.of("Topic-A",
"Topic-B"), new HashMap<>(), new HashSet<>(), new HashMap<>());
activeTasks.put("0", Set.of(0, 1, 2, 3, 4));
when(streamsGroupMember.assignedTasks()).thenReturn(new
TasksTuple(activeTasks, Collections.emptyMap(), Collections.emptyMap()));
- when(streamsGroup.partitionMetadata()).thenReturn(topicMetadata);
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap =
new TreeMap<>();
configuredSubtopologyOneMap.put("0", configuredSubtopologyOne);
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyOneMap));
- StreamsGroupHeartbeatResponseData.EndpointToPartitions result =
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember,
responseEndpoint, streamsGroup);
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions result =
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember,
responseEndpoint, streamsGroup, metadataImage);
assertEquals(responseEndpoint, result.userEndpoint());
assertEquals(2, result.activePartitions().size());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
index bc013ed875d..f3b40dc282b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -22,14 +22,14 @@ import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCon
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
-import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -54,12 +54,13 @@ class InternalTopicManagerTest {
@Test
void
testConfigureTopicsSetsConfigurationExceptionWhenSourceTopicIsMissing() {
- Map<String, TopicMetadata> topicMetadata = new HashMap<>();
- topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(),
SOURCE_TOPIC_1, 2));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), SOURCE_TOPIC_1, 2)
+ .build();
// SOURCE_TOPIC_2 is missing from topicMetadata
StreamsTopology topology = makeTestTopology();
- final ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata);
+ final ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(new LogContext(), 0, topology,
metadataImage.topics());
assertEquals(Optional.empty(), configuredTopology.subtopologies());
assertTrue(configuredTopology.topicConfigurationException().isPresent());
@@ -69,14 +70,14 @@ class InternalTopicManagerTest {
@Test
void testConfigureTopics() {
- Map<String, TopicMetadata> topicMetadata = new HashMap<>();
- topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(),
SOURCE_TOPIC_1, 2));
- topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(),
SOURCE_TOPIC_2, 2));
- topicMetadata.put(STATE_CHANGELOG_TOPIC_2,
- new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), SOURCE_TOPIC_1, 2)
+ .addTopic(Uuid.randomUuid(), SOURCE_TOPIC_2, 2)
+ .addTopic(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2)
+ .build();
StreamsTopology topology = makeTestTopology();
- ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata);
+ ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(new LogContext(), 0, topology,
metadataImage.topics());
final Map<String, CreatableTopic> internalTopicsToBeCreated =
configuredTopology.internalTopicsToBeCreated();
assertEquals(2, internalTopicsToBeCreated.size());