This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 98fbd8afc7f KAFKA-14462; [20/N] Refresh subscription metadata on new
metadata image (#13901)
98fbd8afc7f is described below
commit 98fbd8afc7f3ba806d742690536090936738f1e7
Author: David Jacot <[email protected]>
AuthorDate: Wed Jul 5 18:28:38 2023 +0200
KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
(#13901)
This patch adds (1) the logic to propagate a new MetadataImage to the
running coordinators; and (2) the logic to ensure that all the consumer groups
subscribed to topics with changes will refresh their subscriptions metadata on
the next heartbeat. In the mean time, it ensures that freshly loaded consumer
groups also refresh their subscriptions metadata on the next heartbeat.
Reviewers: Jeff Kim <[email protected]>, Justine Olshan
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../coordinator/group/GroupCoordinatorService.java | 1 +
.../coordinator/group/GroupMetadataManager.java | 223 +++++++--
.../group/ReplicatedGroupCoordinator.java | 24 +
.../coordinator/group/consumer/ConsumerGroup.java | 78 ++-
.../coordinator/group/runtime/Coordinator.java | 16 +-
.../group/runtime/CoordinatorRuntime.java | 40 +-
.../group/GroupMetadataManagerTest.java | 524 +++++++++++++++++++--
.../group/consumer/ConsumerGroupTest.java | 74 ++-
.../group/runtime/CoordinatorRuntimeTest.java | 61 ++-
10 files changed, 965 insertions(+), 78 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index d7e36890cdd..6e2a8c0ca4e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -323,7 +323,7 @@
<suppress checks="CyclomaticComplexity"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="MethodLength"
- files="(ConsumerGroupTest|GroupMetadataManagerTest).java"/>
+
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
<suppress checks="NPathComplexity"
files="(GroupMetadataManager).java"/>
<suppress checks="ParameterNumber"
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 88a70aca925..0c926bbf658 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -531,6 +531,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
MetadataDelta delta
) {
throwIfNotActive();
+ runtime.onNewMetadataImage(newImage, delta);
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index edda803d445..3f57faa19f7 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -29,6 +29,7 @@ import
org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.consumer.Assignment;
@@ -50,14 +51,18 @@ import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
-import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -88,10 +93,12 @@ public class GroupMetadataManager {
public static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
+ private Time time = null;
private List<PartitionAssignor> assignors = null;
- private TopicsImage topicsImage = null;
private int consumerGroupMaxSize = Integer.MAX_VALUE;
private int consumerGroupHeartbeatIntervalMs = 5000;
+ private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
+ private MetadataImage metadataImage = null;
Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
@@ -103,6 +110,11 @@ public class GroupMetadataManager {
return this;
}
+ Builder withTime(Time time) {
+ this.time = time;
+ return this;
+ }
+
Builder withAssignors(List<PartitionAssignor> assignors) {
this.assignors = assignors;
return this;
@@ -118,15 +130,21 @@ public class GroupMetadataManager {
return this;
}
- Builder withTopicsImage(TopicsImage topicsImage) {
- this.topicsImage = topicsImage;
+ Builder withConsumerGroupMetadataRefreshIntervalMs(int
consumerGroupMetadataRefreshIntervalMs) {
+ this.consumerGroupMetadataRefreshIntervalMs =
consumerGroupMetadataRefreshIntervalMs;
+ return this;
+ }
+
+ Builder withMetadataImage(MetadataImage metadataImage) {
+ this.metadataImage = metadataImage;
return this;
}
GroupMetadataManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new
SnapshotRegistry(logContext);
- if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
+ if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+ if (time == null) time = Time.SYSTEM;
if (assignors == null || assignors.isEmpty()) {
throw new IllegalStateException("Assignors must be set before
building.");
@@ -135,10 +153,12 @@ public class GroupMetadataManager {
return new GroupMetadataManager(
snapshotRegistry,
logContext,
+ time,
assignors,
- topicsImage,
+ metadataImage,
consumerGroupMaxSize,
- consumerGroupHeartbeatIntervalMs
+ consumerGroupHeartbeatIntervalMs,
+ consumerGroupMetadataRefreshIntervalMs
);
}
}
@@ -153,6 +173,11 @@ public class GroupMetadataManager {
*/
private final SnapshotRegistry snapshotRegistry;
+ /**
+ * The system time.
+ */
+ private final Time time;
+
/**
* The supported partition assignors keyed by their name.
*/
@@ -168,6 +193,11 @@ public class GroupMetadataManager {
*/
private final TimelineHashMap<String, Group> groups;
+ /**
+ * The group ids keyed by topic names.
+ */
+ private final TimelineHashMap<String, TimelineHashSet<String>>
groupsByTopics;
+
/**
* The maximum number of members allowed in a single consumer group.
*/
@@ -179,26 +209,43 @@ public class GroupMetadataManager {
private final int consumerGroupHeartbeatIntervalMs;
/**
- * The topics metadata (or image).
+ * The metadata refresh interval.
+ */
+ private final int consumerGroupMetadataRefreshIntervalMs;
+
+ /**
+ * The metadata image.
*/
- private TopicsImage topicsImage;
+ private MetadataImage metadataImage;
private GroupMetadataManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext,
+ Time time,
List<PartitionAssignor> assignors,
- TopicsImage topicsImage,
+ MetadataImage metadataImage,
int consumerGroupMaxSize,
- int consumerGroupHeartbeatIntervalMs
+ int consumerGroupHeartbeatIntervalMs,
+ int consumerGroupMetadataRefreshIntervalMs
) {
this.log = logContext.logger(GroupMetadataManager.class);
this.snapshotRegistry = snapshotRegistry;
- this.topicsImage = topicsImage;
+ this.time = time;
+ this.metadataImage = metadataImage;
this.assignors =
assignors.stream().collect(Collectors.toMap(PartitionAssignor::name,
Function.identity()));
this.defaultAssignor = assignors.get(0);
this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
this.consumerGroupMaxSize = consumerGroupMaxSize;
this.consumerGroupHeartbeatIntervalMs =
consumerGroupHeartbeatIntervalMs;
+ this.consumerGroupMetadataRefreshIntervalMs =
consumerGroupMetadataRefreshIntervalMs;
+ }
+
+ /**
+ * @return The current metadata image used by the group metadata manager.
+ */
+ public MetadataImage image() {
+ return metadataImage;
}
/**
@@ -472,7 +519,8 @@ public class GroupMetadataManager {
String assignorName,
List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions
) throws ApiException {
- List<Record> records = new ArrayList<>();
+ final long currentTimeMs = time.milliseconds();
+ final List<Record> records = new ArrayList<>();
// Get or create the consumer group.
boolean createIfNotExists = memberEpoch == 0;
@@ -506,30 +554,47 @@ public class GroupMetadataManager {
.setClientHost(clientHost)
.build();
+ boolean bumpGroupEpoch = false;
if (!updatedMember.equals(member)) {
records.add(newMemberSubscriptionRecord(groupId, updatedMember));
if
(!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
log.info("[GroupId " + groupId + "] Member " + memberId + "
updated its subscribed topics to: " +
updatedMember.subscribedTopicNames());
+ bumpGroupEpoch = true;
+ }
- subscriptionMetadata = group.computeSubscriptionMetadata(
- member,
- updatedMember,
- topicsImage
- );
+ if
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
+ log.info("[GroupId " + groupId + "] Member " + memberId + "
updated its subscribed regex to: " +
+ updatedMember.subscribedTopicRegex());
+ bumpGroupEpoch = true;
+ }
+ }
- if
(!subscriptionMetadata.equals(group.subscriptionMetadata())) {
- log.info("[GroupId " + groupId + "] Computed new
subscription metadata: "
- + subscriptionMetadata + ".");
- records.add(newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
- }
+ if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ subscriptionMetadata = group.computeSubscriptionMetadata(
+ member,
+ updatedMember,
+ metadataImage.topics()
+ );
+
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId " + groupId + "] Computed new subscription
metadata: "
+ + subscriptionMetadata + ".");
+ bumpGroupEpoch = true;
+ records.add(newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
+ }
+ if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newGroupEpochRecord(groupId, groupEpoch));
-
log.info("[GroupId " + groupId + "] Bumped group epoch to " +
groupEpoch + ".");
}
+
+ group.setMetadataRefreshDeadline(currentTimeMs +
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
}
// 2. Update the target assignment if the group epoch is larger than
the target assignment epoch. The
@@ -635,7 +700,7 @@ public class GroupMetadataManager {
Map<String, TopicMetadata> subscriptionMetadata =
group.computeSubscriptionMetadata(
member,
null,
- topicsImage
+ metadataImage.topics()
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
@@ -709,14 +774,15 @@ public class GroupMetadataManager {
String groupId = key.groupId();
String memberId = key.memberId();
+ ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId,
value != null);
+ Set<String> oldSubscribedTopicNames = new
HashSet<>(consumerGroup.subscribedTopicNames());
+
if (value != null) {
- ConsumerGroup consumerGroup =
getOrMaybeCreateConsumerGroup(groupId, true);
ConsumerGroupMember oldMember =
consumerGroup.getOrMaybeCreateMember(memberId, true);
consumerGroup.updateMember(new
ConsumerGroupMember.Builder(oldMember)
.updateWith(value)
.build());
} else {
- ConsumerGroup consumerGroup =
getOrMaybeCreateConsumerGroup(groupId, false);
ConsumerGroupMember oldMember =
consumerGroup.getOrMaybeCreateMember(memberId, false);
if (oldMember.memberEpoch() != -1) {
throw new IllegalStateException("Received a tombstone record
to delete member " + memberId
@@ -728,6 +794,81 @@ public class GroupMetadataManager {
}
consumerGroup.removeMember(memberId);
}
+
+ updateGroupsByTopics(groupId, oldSubscribedTopicNames,
consumerGroup.subscribedTopicNames());
+ }
+
+ /**
+ * @return The set of groups subscribed to the topic.
+ */
+ public Set<String> groupsSubscribedToTopic(String topicName) {
+ Set<String> groups = groupsByTopics.get(topicName);
+ return groups != null ? groups : Collections.emptySet();
+ }
+
+ /**
+ * Subscribes a group to a topic.
+ *
+ * @param groupId The group id.
+ * @param topicName The topic name.
+ */
+ private void subscribeGroupToTopic(
+ String groupId,
+ String topicName
+ ) {
+ groupsByTopics
+ .computeIfAbsent(topicName, __ -> new
TimelineHashSet<>(snapshotRegistry, 1))
+ .add(groupId);
+ }
+
+ /**
+ * Unsubscribes a group from a topic.
+ *
+ * @param groupId The group id.
+ * @param topicName The topic name.
+ */
+ private void unsubscribeGroupFromTopic(
+ String groupId,
+ String topicName
+ ) {
+ groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> {
+ groupIds.remove(groupId);
+ return groupIds.isEmpty() ? null : groupIds;
+ });
+ }
+
+ /**
+ * Updates the group by topics mapping.
+ *
+ * @param groupId The group id.
+ * @param oldSubscribedTopics The old group subscriptions.
+ * @param newSubscribedTopics The new group subscriptions.
+ */
+ private void updateGroupsByTopics(
+ String groupId,
+ Set<String> oldSubscribedTopics,
+ Set<String> newSubscribedTopics
+ ) {
+ if (oldSubscribedTopics.isEmpty()) {
+ newSubscribedTopics.forEach(topicName ->
+ subscribeGroupToTopic(groupId, topicName)
+ );
+ } else if (newSubscribedTopics.isEmpty()) {
+ oldSubscribedTopics.forEach(topicName ->
+ unsubscribeGroupFromTopic(groupId, topicName)
+ );
+ } else {
+ oldSubscribedTopics.forEach(topicName -> {
+ if (!newSubscribedTopics.contains(topicName)) {
+ unsubscribeGroupFromTopic(groupId, topicName);
+ }
+ });
+ newSubscribedTopics.forEach(topicName -> {
+ if (!oldSubscribedTopics.contains(topicName)) {
+ subscribeGroupToTopic(groupId, topicName);
+ }
+ });
+ }
}
/**
@@ -874,4 +1015,32 @@ public class GroupMetadataManager {
consumerGroup.updateMember(newMember);
}
}
+
+ /**
+ * A new metadata image is available.
+ *
+ * @param newImage The new metadata image.
+ * @param delta The delta image.
+ */
+ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta
delta) {
+ metadataImage = newImage;
+
+ // Notify all the groups subscribed to the created, updated or
+ // deleted topics.
+ Set<String> allGroupIds = new HashSet<>();
+ delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+ String topicName = topicDelta.name();
+ allGroupIds.addAll(groupsSubscribedToTopic(topicName));
+ });
+ delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
+ TopicImage topicImage = delta.image().topics().getTopic(topicId);
+ allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
+ });
+ allGroupIds.forEach(groupId -> {
+ Group group = groups.get(groupId);
+ if (group != null && group.type() == Group.GroupType.CONSUMER) {
+ ((ConsumerGroup) group).requestMetadataRefresh();
+ }
+ });
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
index 89d08d450a2..201da405e6d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
@@ -36,6 +36,8 @@ import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.runtime.Coordinator;
import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -130,6 +132,28 @@ public class ReplicatedGroupCoordinator implements
Coordinator<Record> {
return groupMetadataManager.consumerGroupHeartbeat(context, request);
}
+ /**
+ * The coordinator has been loaded. This is used to apply any
+ * post loading operations (e.g. registering timers).
+ *
+ * @param newImage The metadata image.
+ */
+ @Override
+ public void onLoaded(MetadataImage newImage) {
+ groupMetadataManager.onNewMetadataImage(newImage, new
MetadataDelta(newImage));
+ }
+
+ /**
+ * A new metadata image is available.
+ *
+ * @param newImage The new metadata image.
+ * @param delta The delta image.
+ */
+ @Override
+ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta
delta) {
+ groupMetadataManager.onNewMetadataImage(newImage, delta);
+ }
+
/**
* @return The ApiMessage or null.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 9df00733b30..6f682d4e82f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -58,6 +58,18 @@ public class ConsumerGroup implements Group {
}
}
+ public static class DeadlineAndEpoch {
+ static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+ public final long deadlineMs;
+ public final int epoch;
+
+ DeadlineAndEpoch(long deadlineMs, int epoch) {
+ this.deadlineMs = deadlineMs;
+ this.epoch = epoch;
+ }
+ }
+
/**
* The snapshot registry.
*/
@@ -119,6 +131,18 @@ public class ConsumerGroup implements Group {
*/
private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>>
currentPartitionEpoch;
+ /**
+ * The metadata refresh deadline. It consists of a timestamp in
milliseconds together with
+ * the group epoch at the time of setting it. The metadata refresh time is
considered as a
+ * soft state (read that it is not stored in a timeline data structure).
It is like this
+ * because it is not persisted to the log. The group epoch is here to
ensure that the
+ * metadata refresh deadline is invalidated if the group epoch does not
correspond to
+ * the current group epoch. This can happen if the metadata refresh
deadline is updated
+ * after having refreshed the metadata but the write operation failed. In
this case, the
+ * time is not automatically rolled back.
+ */
+ private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+
public ConsumerGroup(
SnapshotRegistry snapshotRegistry,
String groupId
@@ -249,8 +273,10 @@ public class ConsumerGroup implements Group {
* @param memberId The member id to remove.
*/
public void removeMember(String memberId) {
- ConsumerGroupMember member = members.remove(memberId);
- maybeRemovePartitionEpoch(member);
+ ConsumerGroupMember oldMember = members.remove(memberId);
+ maybeUpdateSubscribedTopicNames(oldMember, null);
+ maybeUpdateServerAssignors(oldMember, null);
+ maybeRemovePartitionEpoch(oldMember);
maybeUpdateGroupState();
}
@@ -279,6 +305,13 @@ public class ConsumerGroup implements Group {
return Collections.unmodifiableMap(members);
}
+ /**
+ * @return An immutable Set containing all the subscribed topic names.
+ */
+ public Set<String> subscribedTopicNames() {
+ return Collections.unmodifiableSet(subscribedTopicNames.keySet());
+ }
+
/**
* Returns the target assignment of the member.
*
@@ -423,6 +456,47 @@ public class ConsumerGroup implements Group {
return Collections.unmodifiableMap(newSubscriptionMetadata);
}
+ /**
+ * Updates the metadata refresh deadline.
+ *
+ * @param deadlineMs The deadline in milliseconds.
+ * @param groupEpoch The associated group epoch.
+ */
+ public void setMetadataRefreshDeadline(
+ long deadlineMs,
+ int groupEpoch
+ ) {
+ this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs,
groupEpoch);
+ }
+
+ /**
+ * Requests a metadata refresh.
+ */
+ public void requestMetadataRefresh() {
+ this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+ }
+
+ /**
+ * Checks if a metadata refresh is required. A refresh is required in two
cases:
+ * 1) The deadline is smaller or equal to the current time;
+ * 2) The group epoch associated with the deadline is larger than
+ * the current group epoch. This means that the operations which updated
+ * the deadline failed.
+ *
+ * @param currentTimeMs The current time in milliseconds.
+ * @return A boolean indicating whether a refresh is required or not.
+ */
+ public boolean hasMetadataExpired(long currentTimeMs) {
+ return currentTimeMs >= metadataRefreshDeadline.deadlineMs ||
groupEpoch() < metadataRefreshDeadline.epoch;
+ }
+
+ /**
+ * @return The metadata refresh deadline.
+ */
+ public DeadlineAndEpoch metadataRefreshDeadline() {
+ return metadataRefreshDeadline;
+ }
+
/**
* Updates the current state of the group.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
index 5ab88a0efa6..8189e1ab89b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.coordinator.group.runtime;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+
/**
* Coordinator is basically a replicated state machine managed by the
* {@link CoordinatorRuntime}.
@@ -25,8 +28,19 @@ public interface Coordinator<U> extends
CoordinatorPlayback<U> {
/**
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).
+ *
+ * @param newImage The metadata image.
+ */
+ default void onLoaded(MetadataImage newImage) {}
+
+ /**
+ * A new metadata image is available. This is only called after {@link
Coordinator#onLoaded(MetadataImage)}
+ * is called to signal that the coordinator has been fully loaded.
+ *
+ * @param newImage The new metadata image.
+ * @param delta The delta image.
*/
- default void onLoaded() {}
+ default void onNewMetadataImage(MetadataImage newImage, MetadataDelta
delta) {}
/**
* The coordinator has been unloaded. This is used to apply
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index a18964cc3bb..00c8937f436 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
@@ -349,7 +351,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> implements AutoClos
state = CoordinatorState.ACTIVE;
snapshotRegistry.getOrCreateSnapshot(0);
partitionWriter.registerListener(tp,
highWatermarklistener);
- coordinator.onLoaded();
+ coordinator.onLoaded(metadataImage);
break;
case FAILED:
@@ -807,6 +809,11 @@ public class CoordinatorRuntime<S extends Coordinator<U>,
U> implements AutoClos
*/
private final AtomicBoolean isRunning = new AtomicBoolean(true);
+ /**
+ * The latest known metadata image.
+ */
+ private volatile MetadataImage metadataImage = MetadataImage.EMPTY;
+
/**
* Constructor.
*
@@ -1083,6 +1090,37 @@ public class CoordinatorRuntime<S extends
Coordinator<U>, U> implements AutoClos
});
}
+ /**
+ * A new metadata image is available.
+ *
+ * @param newImage The new metadata image.
+ * @param delta The metadata delta.
+ */
+ public void onNewMetadataImage(
+ MetadataImage newImage,
+ MetadataDelta delta
+ ) {
+ throwIfNotRunning();
+ log.debug("Scheduling applying of a new metadata image with offset
{}.", newImage.offset());
+
+ // Update global image.
+ metadataImage = newImage;
+
+ // Push an event for each coordinator.
+ coordinators.keySet().forEach(tp -> {
+ scheduleInternalOperation("UpdateImage(tp=" + tp + ", offset=" +
newImage.offset() + ")", tp, () -> {
+ CoordinatorContext context = contextOrThrow(tp);
+ if (context.state == CoordinatorState.ACTIVE) {
+ log.debug("Applying new metadata image with offset {} to
{}.", newImage.offset(), tp);
+ context.coordinator.onNewMetadataImage(newImage, delta);
+ } else {
+ log.debug("Ignoring new metadata image with offset {} for
{} because the coordinator is not active.",
+ newImage.offset(), tp);
+ }
+ });
+ });
+ }
+
/**
* Closes the runtime. This closes all the coordinators currently
registered
* in the runtime.
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 2296d58ecd7..41c2b1cb8be 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -27,6 +27,7 @@ import
org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
@@ -37,6 +38,8 @@ import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
@@ -59,8 +62,10 @@ import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicImage;
-import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -78,13 +83,16 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import static org.apache.kafka.common.utils.Utils.mkSet;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -113,10 +121,10 @@ public class GroupMetadataManagerTest {
}
}
- public static class TopicsImageBuilder {
- private TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+ public static class MetadataImageBuilder {
+ private MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
- public TopicsImageBuilder addTopic(
+ public MetadataImageBuilder addTopic(
Uuid topicId,
String topicName,
int numPartitions
@@ -130,8 +138,8 @@ public class GroupMetadataManagerTest {
return this;
}
- public TopicsImage build() {
- return delta.apply();
+ public MetadataImage build() {
+ return delta.apply(MetadataProvenance.EMPTY);
}
}
@@ -141,6 +149,7 @@ public class GroupMetadataManagerTest {
private int assignmentEpoch;
private final Map<String, ConsumerGroupMember> members = new
HashMap<>();
private final Map<String, Assignment> assignments = new HashMap<>();
+ private Map<String, TopicMetadata> subscriptionMetadata;
public ConsumerGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
@@ -153,6 +162,11 @@ public class GroupMetadataManagerTest {
return this;
}
+ public ConsumerGroupBuilder withSubscriptionMetadata(Map<String,
TopicMetadata> subscriptionMetadata) {
+ this.subscriptionMetadata = subscriptionMetadata;
+ return this;
+ }
+
public ConsumerGroupBuilder withAssignment(String memberId, Map<Uuid,
Set<Integer>> assignment) {
this.assignments.put(memberId, new Assignment(assignment));
return this;
@@ -172,19 +186,21 @@ public class GroupMetadataManagerTest {
});
// Add subscription metadata.
- Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
- members.forEach((memberId, member) -> {
- member.subscribedTopicNames().forEach(topicName -> {
- TopicImage topicImage = topicsImage.getTopic(topicName);
- if (topicImage != null) {
- subscriptionMetadata.put(topicName, new TopicMetadata(
- topicImage.id(),
- topicImage.name(),
- topicImage.partitions().size()
- ));
- }
+ if (subscriptionMetadata == null) {
+ subscriptionMetadata = new HashMap<>();
+ members.forEach((memberId, member) -> {
+ member.subscribedTopicNames().forEach(topicName -> {
+ TopicImage topicImage =
topicsImage.getTopic(topicName);
+ if (topicImage != null) {
+ subscriptionMetadata.put(topicName, new
TopicMetadata(
+ topicImage.id(),
+ topicImage.name(),
+ topicImage.partitions().size()
+ ));
+ }
+ });
});
- });
+ }
if (!subscriptionMetadata.isEmpty()) {
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
@@ -212,15 +228,17 @@ public class GroupMetadataManagerTest {
static class GroupMetadataManagerTestContext {
static class Builder {
+ final private Time time = new MockTime();
final private LogContext logContext = new LogContext();
final private SnapshotRegistry snapshotRegistry = new
SnapshotRegistry(logContext);
- private TopicsImage topicsImage;
+ private MetadataImage metadataImage;
private List<PartitionAssignor> assignors;
private List<ConsumerGroupBuilder> consumerGroupBuilders = new
ArrayList<>();
private int consumerGroupMaxSize = Integer.MAX_VALUE;
+ private int consumerGroupMetadataRefreshIntervalMs =
Integer.MAX_VALUE;
- public Builder withTopicsImage(TopicsImage topicsImage) {
- this.topicsImage = topicsImage;
+ public Builder withMetadataImage(MetadataImage metadataImage) {
+ this.metadataImage = metadataImage;
return this;
}
@@ -239,24 +257,32 @@ public class GroupMetadataManagerTest {
return this;
}
+ public Builder withConsumerGroupMetadataRefreshIntervalMs(int
consumerGroupMetadataRefreshIntervalMs) {
+ this.consumerGroupMetadataRefreshIntervalMs =
consumerGroupMetadataRefreshIntervalMs;
+ return this;
+ }
+
public GroupMetadataManagerTestContext build() {
- if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
+ if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (assignors == null) assignors = Collections.emptyList();
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext(
+ time,
snapshotRegistry,
new GroupMetadataManager.Builder()
.withSnapshotRegistry(snapshotRegistry)
.withLogContext(logContext)
- .withTopicsImage(topicsImage)
+ .withTime(time)
+ .withMetadataImage(metadataImage)
.withConsumerGroupHeartbeatInterval(5000)
.withConsumerGroupMaxSize(consumerGroupMaxSize)
.withAssignors(assignors)
+
.withConsumerGroupMetadataRefreshIntervalMs(consumerGroupMetadataRefreshIntervalMs)
.build()
);
consumerGroupBuilders.forEach(builder -> {
- builder.build(topicsImage).forEach(context::replay);
+
builder.build(metadataImage.topics()).forEach(context::replay);
});
context.commit();
@@ -265,6 +291,7 @@ public class GroupMetadataManagerTest {
}
}
+ final Time time;
final SnapshotRegistry snapshotRegistry;
final GroupMetadataManager groupMetadataManager;
@@ -272,9 +299,11 @@ public class GroupMetadataManagerTest {
long lastWrittenOffset = 0L;
public GroupMetadataManagerTestContext(
+ Time time,
SnapshotRegistry snapshotRegistry,
GroupMetadataManager groupMetadataManager
) {
+ this.time = time;
this.snapshotRegistry = snapshotRegistry;
this.groupMetadataManager = groupMetadataManager;
}
@@ -486,7 +515,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(TopicsImage.EMPTY)
+ .withMetadataImage(MetadataImage.EMPTY)
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
@@ -675,7 +704,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -764,7 +793,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -865,7 +894,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -1009,7 +1038,7 @@ public class GroupMetadataManagerTest {
// Consumer group with two members.
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addTopic(zarTopicId, zarTopicName, 1)
@@ -1100,7 +1129,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -1546,7 +1575,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
@@ -1794,7 +1823,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -1912,7 +1941,7 @@ public class GroupMetadataManagerTest {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -1932,6 +1961,435 @@ public class GroupMetadataManagerTest {
.setTopicPartitions(Collections.emptyList())));
}
+ @Test
+ public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ // Create a context with one consumer group containing one member.
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withAssignment(memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10)
+ .withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
+ {
+ // foo only has 3 partitions stored in the metadata
but foo has
+ // 6 partitions the metadata image.
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 3));
+ }
+ }))
+ .build();
+
+ // The metadata refresh flag should be true.
+ ConsumerGroup consumerGroup = context.groupMetadataManager
+ .getOrMaybeCreateConsumerGroup(groupId, false);
+
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+ // Prepare the assignment result.
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Collections.singletonMap(memberId, new
MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ // Heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10));
+
+ // The member gets partitions 3, 4 and 5 assigned.
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setAssignedTopicPartitions(Arrays.asList(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+ ))),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(memberId)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setTargetMemberEpoch(11)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build();
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+ RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+
+ // Check next refresh time.
+
assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+ assertEquals(context.time.milliseconds() + 5 * 60 * 1000,
consumerGroup.metadataRefreshDeadline().deadlineMs);
+ assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+ }
+
+ @Test
+ public void testSubscriptionMetadataRefreshedAgainAfterWriteFailure() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ // Create a context with one consumer group containing one member.
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withAssignment(memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10)
+ .withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
+ {
+ // foo only has 3 partitions stored in the metadata
but foo has
+ // 6 partitions the metadata image.
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 3));
+ }
+ }))
+ .build();
+
+ // The metadata refresh flag should be true.
+ ConsumerGroup consumerGroup = context.groupMetadataManager
+ .getOrMaybeCreateConsumerGroup(groupId, false);
+
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+ // Prepare the assignment result.
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Collections.singletonMap(memberId, new
MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ // Heartbeat.
+ context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10));
+
+ // The metadata refresh flag is set to a future time.
+
assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+ assertEquals(context.time.milliseconds() + 5 * 60 * 1000,
consumerGroup.metadataRefreshDeadline().deadlineMs);
+ assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+
+ // Rollback the uncommitted changes. This does not rollback the
metadata flag
+ // because it is not using a timeline data structure.
+ context.rollback();
+
+ // However, the next heartbeat should detect the divergence based on
the epoch and trigger
+ // a metadata refresh.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10));
+
+
+ // The member gets partitions 3, 4 and 5 assigned.
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setAssignedTopicPartitions(Arrays.asList(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+ ))),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(memberId)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setTargetMemberEpoch(11)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build();
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId,
mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+ RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+
+ // Check next refresh time.
+
assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+ assertEquals(context.time.milliseconds() + 5 * 60 * 1000,
consumerGroup.metadataRefreshDeadline().deadlineMs);
+ assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+ }
+
+ @Test
+ public void testGroupIdsByTopics() {
+ String groupId1 = "group1";
+ String groupId2 = "group2";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .build();
+
+ assertEquals(Collections.emptySet(),
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(Collections.emptySet(),
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(Collections.emptySet(),
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M1 in group 1 subscribes to foo and bar.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+ new ConsumerGroupMember.Builder("group1-m1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .build()));
+
+ assertEquals(mkSet(groupId1),
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1),
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(Collections.emptySet(),
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M1 in group 2 subscribes to foo, bar and zar.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+ new ConsumerGroupMember.Builder("group2-m1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+ .build()));
+
+ assertEquals(mkSet(groupId1, groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1, groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M2 in group 1 subscribes to bar and zar.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+ new ConsumerGroupMember.Builder("group1-m2")
+ .setSubscribedTopicNames(Arrays.asList("bar", "zar"))
+ .build()));
+
+ assertEquals(mkSet(groupId1, groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1, groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1, groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M2 in group 2 subscribes to foo and bar.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+ new ConsumerGroupMember.Builder("group2-m2")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .build()));
+
+ assertEquals(mkSet(groupId1, groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1, groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1, groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M1 in group 1 is removed.
+
context.replay(RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId1,
"group1-m1"));
+
context.replay(RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId1,
"group1-m1"));
+
+ assertEquals(mkSet(groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1, groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1, groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M1 in group 2 subscribes to nothing.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+ new ConsumerGroupMember.Builder("group2-m1")
+ .setSubscribedTopicNames(Collections.emptyList())
+ .build()));
+
+ assertEquals(mkSet(groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1, groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1),
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M2 in group 2 subscribes to foo.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+ new ConsumerGroupMember.Builder("group2-m2")
+ .setSubscribedTopicNames(Arrays.asList("foo"))
+ .build()));
+
+ assertEquals(mkSet(groupId2),
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1),
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1),
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M2 in group 2 subscribes to nothing.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+ new ConsumerGroupMember.Builder("group2-m2")
+ .setSubscribedTopicNames(Collections.emptyList())
+ .build()));
+
+ assertEquals(Collections.emptySet(),
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1),
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1),
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M2 in group 1 subscribes to nothing.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+ new ConsumerGroupMember.Builder("group1-m2")
+ .setSubscribedTopicNames(Collections.emptyList())
+ .build()));
+
+ assertEquals(Collections.emptySet(),
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(Collections.emptySet(),
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(Collections.emptySet(),
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+ }
+
+ @Test
+ public void testOnNewMetadataImage() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(new
MockPartitionAssignor("range")))
+ .build();
+
+ // M1 in group 1 subscribes to a and b.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord("group1",
+ new ConsumerGroupMember.Builder("group1-m1")
+ .setSubscribedTopicNames(Arrays.asList("a", "b"))
+ .build()));
+
+ // M1 in group 2 subscribes to b and c.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord("group2",
+ new ConsumerGroupMember.Builder("group2-m1")
+ .setSubscribedTopicNames(Arrays.asList("b", "c"))
+ .build()));
+
+ // M1 in group 3 subscribes to d.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord("group3",
+ new ConsumerGroupMember.Builder("group3-m1")
+ .setSubscribedTopicNames(Arrays.asList("d"))
+ .build()));
+
+ // M1 in group 4 subscribes to e.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord("group4",
+ new ConsumerGroupMember.Builder("group4-m1")
+ .setSubscribedTopicNames(Arrays.asList("e"))
+ .build()));
+
+ // M1 in group 5 subscribes to f.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord("group5",
+ new ConsumerGroupMember.Builder("group5-m1")
+ .setSubscribedTopicNames(Arrays.asList("f"))
+ .build()));
+
+ // Ensures that all refresh flags are set to the future.
+ Arrays.asList("group1", "group2", "group3", "group4",
"group5").forEach(groupId -> {
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ group.setMetadataRefreshDeadline(context.time.milliseconds() +
5000L, 0);
+ assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
+ });
+
+ // Update the metadata image.
+ Uuid topicA = Uuid.randomUuid();
+ Uuid topicB = Uuid.randomUuid();
+ Uuid topicC = Uuid.randomUuid();
+ Uuid topicD = Uuid.randomUuid();
+ Uuid topicE = Uuid.randomUuid();
+
+ // Create a first base image with topic a, b, c and d.
+ MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ delta.replay(new TopicRecord().setTopicId(topicA).setName("a"));
+ delta.replay(new
PartitionRecord().setTopicId(topicA).setPartitionId(0));
+ delta.replay(new TopicRecord().setTopicId(topicB).setName("b"));
+ delta.replay(new
PartitionRecord().setTopicId(topicB).setPartitionId(0));
+ delta.replay(new TopicRecord().setTopicId(topicC).setName("c"));
+ delta.replay(new
PartitionRecord().setTopicId(topicC).setPartitionId(0));
+ delta.replay(new TopicRecord().setTopicId(topicD).setName("d"));
+ delta.replay(new
PartitionRecord().setTopicId(topicD).setPartitionId(0));
+ MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
+
+ // Create a delta which updates topic B, deletes topic D and creates
topic E.
+ delta = new MetadataDelta(image);
+ delta.replay(new
PartitionRecord().setTopicId(topicB).setPartitionId(2));
+ delta.replay(new RemoveTopicRecord().setTopicId(topicD));
+ delta.replay(new TopicRecord().setTopicId(topicE).setName("e"));
+ delta.replay(new
PartitionRecord().setTopicId(topicE).setPartitionId(1));
+ image = delta.apply(MetadataProvenance.EMPTY);
+
+ // Update metadata image with the delta.
+ context.groupMetadataManager.onNewMetadataImage(image, delta);
+
+ // Verify the groups.
+ Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId
-> {
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
+ });
+
+ Arrays.asList("group5").forEach(groupId -> {
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
+ });
+
+ // Verify image.
+ assertEquals(image, context.groupMetadataManager.image());
+ }
+
private <T> void assertUnorderedListEquals(
List<T> expected,
List<T> actual
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index 2454188ed94..7306e3a3c5d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -19,8 +19,9 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
-import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
@@ -392,7 +393,7 @@ public class ConsumerGroupTest {
Uuid barTopicId = Uuid.randomUuid();
Uuid zarTopicId = Uuid.randomUuid();
- TopicsImage image = new GroupMetadataManagerTest.TopicsImageBuilder()
+ MetadataImage image = new
GroupMetadataManagerTest.MetadataImageBuilder()
.addTopic(fooTopicId, "foo", 1)
.addTopic(barTopicId, "bar", 2)
.addTopic(zarTopicId, "zar", 3)
@@ -416,7 +417,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
null,
- image
+ image.topics()
)
);
@@ -428,7 +429,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
member1,
- image
+ image.topics()
)
);
@@ -443,7 +444,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
null,
- image
+ image.topics()
)
);
@@ -453,7 +454,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
member1,
null,
- image
+ image.topics()
)
);
@@ -466,7 +467,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
member2,
- image
+ image.topics()
)
);
@@ -482,7 +483,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
null,
- image
+ image.topics()
)
);
@@ -494,7 +495,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
member2,
null,
- image
+ image.topics()
)
);
@@ -506,7 +507,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
member1,
null,
- image
+ image.topics()
)
);
@@ -520,7 +521,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
member3,
- image
+ image.topics()
)
);
@@ -537,8 +538,57 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
null,
- image
+ image.topics()
)
);
}
+
+ @Test
+ public void testMetadataRefreshDeadline() {
+ MockTime time = new MockTime();
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ // Group epoch starts at 0.
+ assertEquals(0, group.groupEpoch());
+
+ // The refresh time deadline should be empty when the group is created
or loaded.
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(0, group.metadataRefreshDeadline().epoch);
+
+ // Set the refresh deadline. The metadata remains valid because the
deadline
+ // has not past and the group epoch is correct.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000,
group.groupEpoch());
+ assertFalse(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000,
group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch(),
group.metadataRefreshDeadline().epoch);
+
+ // Advance past the deadline. The metadata should have expired.
+ time.sleep(1001L);
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+
+ // Set the refresh time deadline with a higher group epoch. The
metadata is considered
+ // as expired because the group epoch attached to the deadline is
higher than the
+ // current group epoch.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000,
group.groupEpoch() + 1);
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000,
group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch() + 1,
group.metadataRefreshDeadline().epoch);
+
+ // Advance the group epoch.
+ group.setGroupEpoch(group.groupEpoch() + 1);
+
+ // Set the refresh deadline. The metadata remains valid because the
deadline
+ // has not past and the group epoch is correct.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000,
group.groupEpoch());
+ assertFalse(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000,
group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch(),
group.metadataRefreshDeadline().epoch);
+
+ // Request metadata refresh. The metadata expires immediately.
+ group.requestMetadataRefresh();
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(0, group.metadataRefreshDeadline().epoch);
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index 7c40721d5ae..c130fa474ad 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -20,6 +20,9 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashSet;
import org.junit.jupiter.api.Test;
@@ -222,7 +225,7 @@ public class CoordinatorRuntimeTest {
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
// Verify that onLoaded is called.
- verify(coordinator, times(1)).onLoaded();
+ verify(coordinator, times(1)).onLoaded(MetadataImage.EMPTY);
// Verify that the listener is registered.
verify(writer, times(1)).registerListener(
@@ -834,4 +837,60 @@ public class CoordinatorRuntimeTest {
// Verify that the loader was closed.
verify(loader).close();
}
+
+ @Test
+ public void testOnNewMetadataImage() {
+ TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+ TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+ MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+ MockPartitionWriter writer = mock(MockPartitionWriter.class);
+ MockCoordinatorBuilderSupplier supplier =
mock(MockCoordinatorBuilderSupplier.class);
+ MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+ CoordinatorRuntime<MockCoordinator, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinator, String>()
+ .withLoader(loader)
+ .withEventProcessor(new MockEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorBuilderSupplier(supplier)
+ .build();
+
+ MockCoordinator coordinator0 = mock(MockCoordinator.class);
+ MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+ when(supplier.get()).thenReturn(builder);
+ when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
+ when(builder.build())
+ .thenReturn(coordinator0)
+ .thenReturn(coordinator1);
+
+ CompletableFuture<Void> future0 = new CompletableFuture<>();
+ when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+ CompletableFuture<Void> future1 = new CompletableFuture<>();
+ when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+ runtime.scheduleLoadOperation(tp0, 0);
+ runtime.scheduleLoadOperation(tp1, 0);
+
+ // Coordinator 0 is loaded. It should get the current image
+ // that is the empty one.
+ future0.complete(null);
+ verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+ // Publish a new image.
+ MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+ runtime.onNewMetadataImage(newImage, delta);
+
+ // Coordinator 0 should be notified about it.
+ verify(coordinator0).onNewMetadataImage(newImage, delta);
+
+ // Coordinator 1 is loaded. It should get the current image
+ // that is the new image.
+ future1.complete(null);
+ verify(coordinator1).onLoaded(newImage);
+ }
}