This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new 6ec17cfc6d1 Add warm-up tasks to Streams membership manager (#18214)
6ec17cfc6d1 is described below
commit 6ec17cfc6d121d48f2d4f139da46dcbff62228fb
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Dec 17 13:07:14 2024 +0100
Add warm-up tasks to Streams membership manager (#18214)
This commit adds warm-up tasks assignment to the
Streams membership manager.
---
.../internals/StreamsMembershipManager.java | 27 +-
.../internals/StreamsMembershipManagerTest.java | 422 ++++++++++++++++++---
2 files changed, 379 insertions(+), 70 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index e14b02ee406..0c213aefc29 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -747,13 +747,14 @@ public class StreamsMembershipManager implements
RequestManager {
markReconciliationInProgress();
- // ToDo: add standby and warmup tasks
SortedSet<StreamsAssignmentInterface.TaskId> assignedActiveTasks =
toTaskIdSet(targetAssignment.activeTasks);
SortedSet<StreamsAssignmentInterface.TaskId> ownedActiveTasks =
toTaskIdSet(currentAssignment.activeTasks);
SortedSet<StreamsAssignmentInterface.TaskId> activeTasksToRevoke = new
TreeSet<>(ownedActiveTasks);
activeTasksToRevoke.removeAll(assignedActiveTasks);
SortedSet<StreamsAssignmentInterface.TaskId> assignedStandbyTasks =
toTaskIdSet(targetAssignment.standbyTasks);
SortedSet<StreamsAssignmentInterface.TaskId> ownedStandbyTasks =
toTaskIdSet(currentAssignment.standbyTasks);
+ SortedSet<StreamsAssignmentInterface.TaskId> assignedWarmupTasks =
toTaskIdSet(targetAssignment.warmupTasks);
+ SortedSet<StreamsAssignmentInterface.TaskId> ownedWarmupTasks =
toTaskIdSet(currentAssignment.warmupTasks);
log.info("Assigned tasks with local epoch {}\n" +
"\tMember: {}\n" +
@@ -761,14 +762,18 @@ public class StreamsMembershipManager implements
RequestManager {
"\tOwned active tasks: {}\n" +
"\tActive tasks to revoke: {}\n" +
"\tAssigned standby tasks: {}\n" +
- "\tOwned standby tasks: {}\n",
+ "\tOwned standby tasks: {}\n" +
+ "\tAssigned warm-up tasks: {}\n" +
+ "\tOwned warm-up tasks: {}\n",
targetAssignment.localEpoch,
memberId,
assignedActiveTasks,
ownedActiveTasks,
activeTasksToRevoke,
assignedStandbyTasks,
- ownedStandbyTasks
+ ownedStandbyTasks,
+ assignedWarmupTasks,
+ ownedWarmupTasks
);
SortedSet<TopicPartition> ownedTopicPartitionsFromSubscriptionState =
new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
@@ -791,7 +796,7 @@ public class StreamsMembershipManager implements
RequestManager {
final CompletableFuture<Void>
onTasksRevokedAndAssignedCallbacksExecuted =
onTasksRevokedCallbackExecuted.thenCompose(__ -> {
if (!maybeAbortReconciliation()) {
- return assignTasks(assignedActiveTasks, ownedActiveTasks,
assignedStandbyTasks);
+ return assignTasks(assignedActiveTasks, ownedActiveTasks,
assignedStandbyTasks, assignedWarmupTasks);
}
return CompletableFuture.completedFuture(null);
});
@@ -833,12 +838,20 @@ public class StreamsMembershipManager implements
RequestManager {
private CompletableFuture<Void> assignTasks(final
SortedSet<StreamsAssignmentInterface.TaskId> activeTasksToAssign,
final
SortedSet<StreamsAssignmentInterface.TaskId> ownedActiveTasks,
- final
SortedSet<StreamsAssignmentInterface.TaskId> standbyTasksToAssign) {
- log.info("Assigning active tasks {} and standby tasks {}",
+ final
SortedSet<StreamsAssignmentInterface.TaskId> standbyTasksToAssign,
+ final
SortedSet<StreamsAssignmentInterface.TaskId> warmupTasksToAssign) {
+ log.info("Assigning " +
+ (activeTasksToAssign.isEmpty() ? "no active tasks, " : "active
tasks {}, ") +
+ (standbyTasksToAssign.isEmpty() ? "no standby tasks, " :
"standby tasks {}, and ") +
+ (warmupTasksToAssign.isEmpty() ? "no warm-up tasks. " :
"warm-up tasks {}.") +
+ "to the member.",
activeTasksToAssign.stream()
.map(StreamsAssignmentInterface.TaskId::toString)
.collect(Collectors.joining(", ")),
standbyTasksToAssign.stream()
+ .map(StreamsAssignmentInterface.TaskId::toString)
+ .collect(Collectors.joining(", ")),
+ warmupTasksToAssign.stream()
.map(StreamsAssignmentInterface.TaskId::toString)
.collect(Collectors.joining(", "))
);
@@ -856,7 +869,7 @@ public class StreamsMembershipManager implements
RequestManager {
new StreamsAssignmentInterface.Assignment(
activeTasksToAssign,
standbyTasksToAssign,
- Collections.emptySet()
+ warmupTasksToAssign
)
);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index 8d799c279d7..c9d87ef5197 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -164,7 +164,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks =
Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0));
final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecuted);
joining();
@@ -190,11 +190,11 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks = Set.of(
new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
);
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
.thenReturn(onTasksRevokedCallbackExecuted);
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecuted);
when(subscriptionState.assignedPartitions())
.thenReturn(Collections.emptySet())
@@ -231,9 +231,9 @@ public class StreamsMembershipManagerTest {
new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0),
new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
);
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecuted);
when(subscriptionState.assignedPartitions())
.thenReturn(Collections.emptySet())
@@ -271,11 +271,11 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks = Set.of(
new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
);
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksToRevoke))
.thenReturn(onTasksRevokedCallbackExecuted);
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecuted);
when(subscriptionState.assignedPartitions())
.thenReturn(Collections.emptySet())
@@ -313,12 +313,12 @@ public class StreamsMembershipManagerTest {
final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
when(
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(activeTasks, Collections.emptySet())
+ makeTaskAssignment(activeTasks, Collections.emptySet(),
Collections.emptySet())
)
).thenReturn(onTasksAssignedCallbackExecuted);
joining();
- reconcile(makeHeartbeatResponse(
+ reconcile(makeHeartbeatResponseWithActiveTasks(
SUB_TOPOLOGY_ID_0, List.of(PARTITION_0),
SUB_TOPOLOGY_ID_1, List.of(PARTITION_0))
);
@@ -334,6 +334,98 @@ public class StreamsMembershipManagerTest {
verifyThatNoTasksHaveBeenRevoked();
}
+ @Test
+ public void testReconcilingActiveTaskToStandbyTask() {
+
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUB_TOPOLOGY_ID_0,
TOPIC_0);
+ final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
+ final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
+ final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new
CompletableFuture<>();
+ final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
+ );
+ final Set<StreamsAssignmentInterface.TaskId> standbyTasks = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
+ );
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(activeTasksSetup, Collections.emptySet(),
Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasks,
Collections.emptySet()))
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)))
+ .thenReturn(Collections.emptySet());
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_1)));
+
+ final Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new
TopicPartition(TOPIC_0, PARTITION_0));
+ final Collection<TopicPartition> expectedFullPartitionsToAssign =
Collections.emptySet();
+ final Collection<TopicPartition> expectedNewPartitionsToAssign =
expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+ expectedPartitionsToRevoke,
+ expectedFullPartitionsToAssign,
+ expectedNewPartitionsToAssign
+ );
+ onTasksRevokedCallbackExecuted.complete(null);
+
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+ }
+
+ @Test
+ public void testReconcilingActiveTaskToWarmupTask() {
+
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUB_TOPOLOGY_ID_0,
TOPIC_0);
+ final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
+ final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
+ final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new
CompletableFuture<>();
+ final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
+ );
+ final Set<StreamsAssignmentInterface.TaskId> warmupTasks = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
+ );
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(activeTasksSetup, Collections.emptySet(),
Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(),
warmupTasks))
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)))
+ .thenReturn(Collections.emptySet());
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_1)));
+
+ final Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new
TopicPartition(TOPIC_0, PARTITION_0));
+ final Collection<TopicPartition> expectedFullPartitionsToAssign =
Collections.emptySet();
+ final Collection<TopicPartition> expectedNewPartitionsToAssign =
expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+ expectedPartitionsToRevoke,
+ expectedFullPartitionsToAssign,
+ expectedNewPartitionsToAssign
+ );
+ onTasksRevokedCallbackExecuted.complete(null);
+
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+ }
+
@Test
public void testReconcilingEmptyToSingleStandbyTask() {
final Set<StreamsAssignmentInterface.TaskId> standbyTasks =
@@ -341,7 +433,7 @@ public class StreamsMembershipManagerTest {
final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
when(
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(Collections.emptySet(), standbyTasks)
+ makeTaskAssignment(Collections.emptySet(), standbyTasks,
Collections.emptySet())
)
).thenReturn(onTasksAssignedCallbackExecuted);
joining();
@@ -368,12 +460,12 @@ public class StreamsMembershipManagerTest {
);
when(
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(Collections.emptySet(), standbyTasksSetup)
+ makeTaskAssignment(Collections.emptySet(), standbyTasksSetup,
Collections.emptySet())
)
).thenReturn(onTasksAssignedCallbackExecutedSetup);
when(
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(Collections.emptySet(), standbyTasks)
+ makeTaskAssignment(Collections.emptySet(), standbyTasks,
Collections.emptySet())
)
).thenReturn(onTasksAssignedCallbackExecuted);
joining();
@@ -404,12 +496,12 @@ public class StreamsMembershipManagerTest {
);
when(
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(Collections.emptySet(), standbyTasksSetup)
+ makeTaskAssignment(Collections.emptySet(), standbyTasksSetup,
Collections.emptySet())
)
).thenReturn(onTasksAssignedCallbackExecutedSetup);
when(
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(Collections.emptySet(), standbyTasks)
+ makeTaskAssignment(Collections.emptySet(), standbyTasks,
Collections.emptySet())
)
).thenReturn(onTasksAssignedCallbackExecuted);
joining();
@@ -440,12 +532,12 @@ public class StreamsMembershipManagerTest {
);
when(
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(Collections.emptySet(), standbyTasksSetup)
+ makeTaskAssignment(Collections.emptySet(), standbyTasksSetup,
Collections.emptySet())
)
).thenReturn(onTasksAssignedCallbackExecutedSetup);
when(
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(Collections.emptySet(), standbyTasks)
+ makeTaskAssignment(Collections.emptySet(), standbyTasks,
Collections.emptySet())
)
).thenReturn(onTasksAssignedCallbackExecuted);
joining();
@@ -464,83 +556,273 @@ public class StreamsMembershipManagerTest {
}
@Test
- public void testReconcilingActiveTaskToStandbyTask() {
+ public void testReconcilingStandbyTaskToActiveTask() {
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUB_TOPOLOGY_ID_0,
TOPIC_0);
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
- final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new
CompletableFuture<>();
- final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
+ final Set<StreamsAssignmentInterface.TaskId> standbyTasksSetup =
Set.of(
new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
);
- final Set<StreamsAssignmentInterface.TaskId> standbyTasks = Set.of(
- new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
+ final Set<StreamsAssignmentInterface.TaskId> activeTasks = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
);
when(
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(activeTasksSetup, Collections.emptySet())
+ makeTaskAssignment(Collections.emptySet(), standbyTasksSetup,
Collections.emptySet())
)
).thenReturn(onTasksAssignedCallbackExecutedSetup);
-
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
- .thenReturn(onTasksRevokedCallbackExecuted);
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(Collections.emptySet(), standbyTasks))
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(activeTasks, Collections.emptySet(),
Collections.emptySet())
+ )
).thenReturn(onTasksAssignedCallbackExecuted);
when(subscriptionState.assignedPartitions())
.thenReturn(Collections.emptySet())
- .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)))
- .thenReturn(Collections.emptySet());
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
joining();
- reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
acknowledging(onTasksAssignedCallbackExecutedSetup);
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_1)));
+
+ final Collection<TopicPartition> expectedFullPartitionsToAssign =
Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ final Collection<TopicPartition> expectedNewPartitionsToAssign =
expectedFullPartitionsToAssign;
+
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingStandbyTaskToWarmupTask() {
+ final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
+ final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
+ final Set<StreamsAssignmentInterface.TaskId> standbyTasksSetup =
Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
+ );
+ final Set<StreamsAssignmentInterface.TaskId> warmupTasks = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
+ );
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasksSetup,
Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(),
warmupTasks))
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
reconcile(makeHeartbeatResponseWithStandbyTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_1)));
- final Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new
TopicPartition(TOPIC_0, PARTITION_0));
final Collection<TopicPartition> expectedFullPartitionsToAssign =
Collections.emptySet();
final Collection<TopicPartition> expectedNewPartitionsToAssign =
expectedFullPartitionsToAssign;
- verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
- expectedPartitionsToRevoke,
- expectedFullPartitionsToAssign,
- expectedNewPartitionsToAssign
+
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingEmptyToSingleWarmupTask() {
+ final Set<StreamsAssignmentInterface.TaskId> warmupTasks =
+ Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0));
+ final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(),
Collections.emptySet(), warmupTasks)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
+
+ final Collection<TopicPartition> expectedFullPartitionsToAssign =
Collections.emptySet();
+ final Collection<TopicPartition> expectedNewPartitionsToAssign =
expectedFullPartitionsToAssign;
+
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingWarmupTaskToDifferentWarmupTask() {
+ final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
+ final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
+ final Set<StreamsAssignmentInterface.TaskId> warmupTasksSetup = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
);
- onTasksRevokedCallbackExecuted.complete(null);
+ final Set<StreamsAssignmentInterface.TaskId> warmupTasks = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
+ );
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(),
Collections.emptySet(), warmupTasksSetup)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(),
Collections.emptySet(), warmupTasks)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_1)));
+
+ final Collection<TopicPartition> expectedFullPartitionsToAssign =
Collections.emptySet();
+ final Collection<TopicPartition> expectedNewPartitionsToAssign =
expectedFullPartitionsToAssign;
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
expectedNewPartitionsToAssign);
onTasksAssignedCallbackExecuted.complete(null);
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
}
@Test
- public void testReconcilingStandbyTaskToActiveTask() {
+ public void testReconcilingSingleWarmupTaskToAdditionalWarmupTask() {
+ final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
+ final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
+ final Set<StreamsAssignmentInterface.TaskId> warmupTasksSetup = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
+ );
+ final Set<StreamsAssignmentInterface.TaskId> warmupTasks = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0),
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
+ );
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(),
Collections.emptySet(), warmupTasksSetup)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(),
Collections.emptySet(), warmupTasks)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0, PARTITION_1)));
+
+ final Collection<TopicPartition> expectedFullPartitionsToAssign =
Collections.emptySet();
+ final Collection<TopicPartition> expectedNewPartitionsToAssign =
expectedFullPartitionsToAssign;
+
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingMultipleWarmupTaskToSingleWarmupTask() {
+ final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
+ final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
+ final Set<StreamsAssignmentInterface.TaskId> warmupTasksSetup = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0),
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
+ );
+ final Set<StreamsAssignmentInterface.TaskId> warmupTasks = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
+ );
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(),
Collections.emptySet(), warmupTasksSetup)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(),
Collections.emptySet(), warmupTasks)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0, PARTITION_1)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_1)));
+
+ final Collection<TopicPartition> expectedFullPartitionsToAssign =
Collections.emptySet();
+ final Collection<TopicPartition> expectedNewPartitionsToAssign =
expectedFullPartitionsToAssign;
+
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingWarmupTaskToActiveTask() {
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUB_TOPOLOGY_ID_0,
TOPIC_0);
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
- final Set<StreamsAssignmentInterface.TaskId> standbyTasksSetup =
Set.of(
+ final Set<StreamsAssignmentInterface.TaskId> warmupTasksSetup = Set.of(
new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
);
final Set<StreamsAssignmentInterface.TaskId> activeTasks = Set.of(
- new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
);
when(
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(Collections.emptySet(), standbyTasksSetup)
+ makeTaskAssignment(Collections.emptySet(),
Collections.emptySet(), warmupTasksSetup)
)
).thenReturn(onTasksAssignedCallbackExecutedSetup);
when(
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
- makeTaskAssignment(activeTasks, Collections.emptySet())
+ makeTaskAssignment(activeTasks, Collections.emptySet(),
Collections.emptySet())
)
).thenReturn(onTasksAssignedCallbackExecuted);
when(subscriptionState.assignedPartitions())
.thenReturn(Collections.emptySet())
.thenReturn(Collections.emptySet())
- .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_1)));
joining();
- reconcile(makeHeartbeatResponseWithStandbyTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
acknowledging(onTasksAssignedCallbackExecutedSetup);
- reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_1)));
- final Collection<TopicPartition> expectedFullPartitionsToAssign =
Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Collection<TopicPartition> expectedFullPartitionsToAssign =
Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ final Collection<TopicPartition> expectedNewPartitionsToAssign =
expectedFullPartitionsToAssign;
+
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingWarmupTaskToStandbyTask() {
+ final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
+ final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
+ final Set<StreamsAssignmentInterface.TaskId> warmupTasksSetup = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
+ );
+ final Set<StreamsAssignmentInterface.TaskId> standbyTasks = Set.of(
+ new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_1)
+ );
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(),
Collections.emptySet(), warmupTasksSetup)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasks,
Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_1)));
+
+ final Collection<TopicPartition> expectedFullPartitionsToAssign =
Collections.emptySet();
final Collection<TopicPartition> expectedNewPartitionsToAssign =
expectedFullPartitionsToAssign;
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
expectedNewPartitionsToAssign);
onTasksAssignedCallbackExecuted.complete(null);
@@ -554,7 +836,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks =
Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0));
final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new
CompletableFuture<>();
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecuted);
joining();
@@ -624,7 +906,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks =
Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0));
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new
CompletableFuture<>();
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasks))
@@ -661,7 +943,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks =
Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0));
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
joining();
reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
@@ -692,7 +974,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
);
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
final CompletableFuture<Void> onAllTasksRevokedCallbackExecuted = new
CompletableFuture<>();
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
@@ -716,7 +998,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks =
Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0));
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
joining();
reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
@@ -734,7 +1016,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks =
Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0));
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
joining();
reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
@@ -751,7 +1033,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks =
Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0));
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
joining();
reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
@@ -802,7 +1084,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks =
Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0));
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
joining();
reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
@@ -816,7 +1098,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks =
Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0));
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
joining();
reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
@@ -831,7 +1113,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasks =
Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0));
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
new CompletableFuture<>();
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
joining();
reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0,
List.of(PARTITION_0)));
@@ -932,7 +1214,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
);
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new
CompletableFuture<>();
when(streamsAssignmentInterface.requestOnAllTasksLostCallbackInvocation())
@@ -950,7 +1232,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
);
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new
CompletableFuture<>();
when(streamsAssignmentInterface.requestOnAllTasksLostCallbackInvocation())
@@ -969,7 +1251,7 @@ public class StreamsMembershipManagerTest {
final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0,
PARTITION_0)
);
-
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet())))
+
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
Collections.emptySet(), Collections.emptySet())))
.thenReturn(onTasksAssignedCallbackExecutedSetup);
final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new
CompletableFuture<>();
when(streamsAssignmentInterface.requestOnAllTasksLostCallbackInvocation())
@@ -1205,10 +1487,23 @@ public class StreamsMembershipManagerTest {
);
}
- private StreamsGroupHeartbeatResponse makeHeartbeatResponse(final String
subtopologyId0,
- final
List<Integer> partitions0,
- final String
subtopologyId1,
- final
List<Integer> partitions1) {
+ private StreamsGroupHeartbeatResponse
makeHeartbeatResponseWithWarmupTasks(final String subtopologyId,
+
final List<Integer> partitions) {
+ return makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ List.of(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopologyId)
+ .setPartitions(partitions)
+ )
+ );
+ }
+
+ private StreamsGroupHeartbeatResponse
makeHeartbeatResponseWithActiveTasks(final String subtopologyId0,
+
final List<Integer> partitions0,
+
final String subtopologyId1,
+
final List<Integer> partitions1) {
return makeHeartbeatResponseWithActiveTasks(List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
.setSubtopologyId(subtopologyId0)
@@ -1237,11 +1532,12 @@ public class StreamsMembershipManagerTest {
}
private StreamsAssignmentInterface.Assignment makeTaskAssignment(final
Set<StreamsAssignmentInterface.TaskId> activeTasks,
- final
Set<StreamsAssignmentInterface.TaskId> standbyTasks) {
+ final
Set<StreamsAssignmentInterface.TaskId> standbyTasks,
+ final
Set<StreamsAssignmentInterface.TaskId> warmupTasks) {
return new StreamsAssignmentInterface.Assignment(
activeTasks,
standbyTasks,
- Collections.emptySet()
+ warmupTasks
);
}