This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/kip1071 by this push:
     new 6ec17cfc6d1 Add warm-up tasks to Streams membership manager (#18214)
6ec17cfc6d1 is described below

commit 6ec17cfc6d121d48f2d4f139da46dcbff62228fb
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Dec 17 13:07:14 2024 +0100

    Add warm-up tasks to Streams membership manager (#18214)
    
    This commit adds warm-up tasks assignment to the
    Streams membership manager.
---
 .../internals/StreamsMembershipManager.java        |  27 +-
 .../internals/StreamsMembershipManagerTest.java    | 422 ++++++++++++++++++---
 2 files changed, 379 insertions(+), 70 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index e14b02ee406..0c213aefc29 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -747,13 +747,14 @@ public class StreamsMembershipManager implements 
RequestManager {
 
         markReconciliationInProgress();
 
-        // ToDo: add standby and warmup tasks
         SortedSet<StreamsAssignmentInterface.TaskId> assignedActiveTasks = 
toTaskIdSet(targetAssignment.activeTasks);
         SortedSet<StreamsAssignmentInterface.TaskId> ownedActiveTasks = 
toTaskIdSet(currentAssignment.activeTasks);
         SortedSet<StreamsAssignmentInterface.TaskId> activeTasksToRevoke = new 
TreeSet<>(ownedActiveTasks);
         activeTasksToRevoke.removeAll(assignedActiveTasks);
         SortedSet<StreamsAssignmentInterface.TaskId> assignedStandbyTasks = 
toTaskIdSet(targetAssignment.standbyTasks);
         SortedSet<StreamsAssignmentInterface.TaskId> ownedStandbyTasks = 
toTaskIdSet(currentAssignment.standbyTasks);
+        SortedSet<StreamsAssignmentInterface.TaskId> assignedWarmupTasks = 
toTaskIdSet(targetAssignment.warmupTasks);
+        SortedSet<StreamsAssignmentInterface.TaskId> ownedWarmupTasks = 
toTaskIdSet(currentAssignment.warmupTasks);
 
         log.info("Assigned tasks with local epoch {}\n" +
                 "\tMember:                        {}\n" +
@@ -761,14 +762,18 @@ public class StreamsMembershipManager implements 
RequestManager {
                 "\tOwned active tasks:            {}\n" +
                 "\tActive tasks to revoke:        {}\n" +
                 "\tAssigned standby tasks:        {}\n" +
-                "\tOwned standby tasks:           {}\n",
+                "\tOwned standby tasks:           {}\n" +
+                "\tAssigned warm-up tasks:        {}\n" +
+                "\tOwned warm-up tasks:           {}\n",
             targetAssignment.localEpoch,
             memberId,
             assignedActiveTasks,
             ownedActiveTasks,
             activeTasksToRevoke,
             assignedStandbyTasks,
-            ownedStandbyTasks
+            ownedStandbyTasks,
+            assignedWarmupTasks,
+            ownedWarmupTasks
         );
 
         SortedSet<TopicPartition> ownedTopicPartitionsFromSubscriptionState = 
new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
@@ -791,7 +796,7 @@ public class StreamsMembershipManager implements 
RequestManager {
 
         final CompletableFuture<Void> 
onTasksRevokedAndAssignedCallbacksExecuted = 
onTasksRevokedCallbackExecuted.thenCompose(__ -> {
             if (!maybeAbortReconciliation()) {
-                return assignTasks(assignedActiveTasks, ownedActiveTasks, 
assignedStandbyTasks);
+                return assignTasks(assignedActiveTasks, ownedActiveTasks, 
assignedStandbyTasks, assignedWarmupTasks);
             }
             return CompletableFuture.completedFuture(null);
         });
@@ -833,12 +838,20 @@ public class StreamsMembershipManager implements 
RequestManager {
 
     private CompletableFuture<Void> assignTasks(final 
SortedSet<StreamsAssignmentInterface.TaskId> activeTasksToAssign,
                                                 final 
SortedSet<StreamsAssignmentInterface.TaskId> ownedActiveTasks,
-                                                final 
SortedSet<StreamsAssignmentInterface.TaskId> standbyTasksToAssign) {
-        log.info("Assigning active tasks {} and standby tasks {}",
+                                                final 
SortedSet<StreamsAssignmentInterface.TaskId> standbyTasksToAssign,
+                                                final 
SortedSet<StreamsAssignmentInterface.TaskId> warmupTasksToAssign) {
+        log.info("Assigning " +
+                (activeTasksToAssign.isEmpty() ? "no active tasks, " : "active 
tasks {}, ") +
+                (standbyTasksToAssign.isEmpty() ? "no standby tasks, " : 
"standby tasks {}, and ") +
+                (warmupTasksToAssign.isEmpty() ? "no warm-up tasks. " : 
"warm-up tasks {}.") +
+                "to the member.",
             activeTasksToAssign.stream()
                 .map(StreamsAssignmentInterface.TaskId::toString)
                 .collect(Collectors.joining(", ")),
             standbyTasksToAssign.stream()
+                .map(StreamsAssignmentInterface.TaskId::toString)
+                .collect(Collectors.joining(", ")),
+            warmupTasksToAssign.stream()
                 .map(StreamsAssignmentInterface.TaskId::toString)
                 .collect(Collectors.joining(", "))
         );
@@ -856,7 +869,7 @@ public class StreamsMembershipManager implements 
RequestManager {
             new StreamsAssignmentInterface.Assignment(
                 activeTasksToAssign,
                 standbyTasksToAssign,
-                Collections.emptySet()
+                warmupTasksToAssign
             )
         );
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index 8d799c279d7..c9d87ef5197 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -164,7 +164,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks =
             Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
         final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecuted);
         joining();
 
@@ -190,11 +190,11 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks = Set.of(
             new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
         );
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
             .thenReturn(onTasksRevokedCallbackExecuted);
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecuted);
         when(subscriptionState.assignedPartitions())
             .thenReturn(Collections.emptySet())
@@ -231,9 +231,9 @@ public class StreamsMembershipManagerTest {
             new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0),
             new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
         );
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecuted);
         when(subscriptionState.assignedPartitions())
             .thenReturn(Collections.emptySet())
@@ -271,11 +271,11 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks = Set.of(
             new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
         );
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksToRevoke))
             .thenReturn(onTasksRevokedCallbackExecuted);
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecuted);
         when(subscriptionState.assignedPartitions())
             .thenReturn(Collections.emptySet())
@@ -313,12 +313,12 @@ public class StreamsMembershipManagerTest {
         final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
         when(
             
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-                makeTaskAssignment(activeTasks, Collections.emptySet())
+                makeTaskAssignment(activeTasks, Collections.emptySet(), 
Collections.emptySet())
             )
         ).thenReturn(onTasksAssignedCallbackExecuted);
         joining();
 
-        reconcile(makeHeartbeatResponse(
+        reconcile(makeHeartbeatResponseWithActiveTasks(
             SUB_TOPOLOGY_ID_0, List.of(PARTITION_0),
             SUB_TOPOLOGY_ID_1, List.of(PARTITION_0))
         );
@@ -334,6 +334,98 @@ public class StreamsMembershipManagerTest {
         verifyThatNoTasksHaveBeenRevoked();
     }
 
+    @Test
+    public void testReconcilingActiveTaskToStandbyTask() {
+        
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUB_TOPOLOGY_ID_0,
 TOPIC_0);
+        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
+        final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
+        final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new 
CompletableFuture<>();
+        final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
+        );
+        final Set<StreamsAssignmentInterface.TaskId> standbyTasks = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
+        );
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(activeTasksSetup, Collections.emptySet(), 
Collections.emptySet())
+            )
+        ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+        
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+            .thenReturn(onTasksRevokedCallbackExecuted);
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+            makeTaskAssignment(Collections.emptySet(), standbyTasks, 
Collections.emptySet()))
+        ).thenReturn(onTasksAssignedCallbackExecuted);
+        when(subscriptionState.assignedPartitions())
+            .thenReturn(Collections.emptySet())
+            .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)))
+            .thenReturn(Collections.emptySet());
+        joining();
+        reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+        reconcile(makeHeartbeatResponseWithStandbyTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_1)));
+
+        final Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new 
TopicPartition(TOPIC_0, PARTITION_0));
+        final Collection<TopicPartition> expectedFullPartitionsToAssign = 
Collections.emptySet();
+        final Collection<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
+        verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+            expectedPartitionsToRevoke,
+            expectedFullPartitionsToAssign,
+            expectedNewPartitionsToAssign
+        );
+        onTasksRevokedCallbackExecuted.complete(null);
+        
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
+        onTasksAssignedCallbackExecuted.complete(null);
+        
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+    }
+
+    @Test
+    public void testReconcilingActiveTaskToWarmupTask() {
+        
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUB_TOPOLOGY_ID_0,
 TOPIC_0);
+        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
+        final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
+        final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new 
CompletableFuture<>();
+        final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
+        );
+        final Set<StreamsAssignmentInterface.TaskId> warmupTasks = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
+        );
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(activeTasksSetup, Collections.emptySet(), 
Collections.emptySet())
+            )
+        ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+        
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+            .thenReturn(onTasksRevokedCallbackExecuted);
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+            makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), 
warmupTasks))
+        ).thenReturn(onTasksAssignedCallbackExecuted);
+        when(subscriptionState.assignedPartitions())
+            .thenReturn(Collections.emptySet())
+            .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)))
+            .thenReturn(Collections.emptySet());
+        joining();
+        reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+        reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_1)));
+
+        final Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new 
TopicPartition(TOPIC_0, PARTITION_0));
+        final Collection<TopicPartition> expectedFullPartitionsToAssign = 
Collections.emptySet();
+        final Collection<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
+        verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+            expectedPartitionsToRevoke,
+            expectedFullPartitionsToAssign,
+            expectedNewPartitionsToAssign
+        );
+        onTasksRevokedCallbackExecuted.complete(null);
+        
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
+        onTasksAssignedCallbackExecuted.complete(null);
+        
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+    }
+
     @Test
     public void testReconcilingEmptyToSingleStandbyTask() {
         final Set<StreamsAssignmentInterface.TaskId> standbyTasks =
@@ -341,7 +433,7 @@ public class StreamsMembershipManagerTest {
         final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
         when(
             
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-                makeTaskAssignment(Collections.emptySet(), standbyTasks)
+                makeTaskAssignment(Collections.emptySet(), standbyTasks, 
Collections.emptySet())
             )
         ).thenReturn(onTasksAssignedCallbackExecuted);
         joining();
@@ -368,12 +460,12 @@ public class StreamsMembershipManagerTest {
         );
         when(
             
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-                makeTaskAssignment(Collections.emptySet(), standbyTasksSetup)
+                makeTaskAssignment(Collections.emptySet(), standbyTasksSetup, 
Collections.emptySet())
             )
         ).thenReturn(onTasksAssignedCallbackExecutedSetup);
         when(
             
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-                makeTaskAssignment(Collections.emptySet(), standbyTasks)
+                makeTaskAssignment(Collections.emptySet(), standbyTasks, 
Collections.emptySet())
             )
         ).thenReturn(onTasksAssignedCallbackExecuted);
         joining();
@@ -404,12 +496,12 @@ public class StreamsMembershipManagerTest {
         );
         when(
             
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-                makeTaskAssignment(Collections.emptySet(), standbyTasksSetup)
+                makeTaskAssignment(Collections.emptySet(), standbyTasksSetup, 
Collections.emptySet())
             )
         ).thenReturn(onTasksAssignedCallbackExecutedSetup);
         when(
             
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-                makeTaskAssignment(Collections.emptySet(), standbyTasks)
+                makeTaskAssignment(Collections.emptySet(), standbyTasks, 
Collections.emptySet())
             )
         ).thenReturn(onTasksAssignedCallbackExecuted);
         joining();
@@ -440,12 +532,12 @@ public class StreamsMembershipManagerTest {
         );
         when(
             
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-                makeTaskAssignment(Collections.emptySet(), standbyTasksSetup)
+                makeTaskAssignment(Collections.emptySet(), standbyTasksSetup, 
Collections.emptySet())
             )
         ).thenReturn(onTasksAssignedCallbackExecutedSetup);
         when(
             
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-                makeTaskAssignment(Collections.emptySet(), standbyTasks)
+                makeTaskAssignment(Collections.emptySet(), standbyTasks, 
Collections.emptySet())
             )
         ).thenReturn(onTasksAssignedCallbackExecuted);
         joining();
@@ -464,83 +556,273 @@ public class StreamsMembershipManagerTest {
     }
 
     @Test
-    public void testReconcilingActiveTaskToStandbyTask() {
+    public void testReconcilingStandbyTaskToActiveTask() {
         
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUB_TOPOLOGY_ID_0,
 TOPIC_0);
         final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
         final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
-        final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new 
CompletableFuture<>();
-        final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
+        final Set<StreamsAssignmentInterface.TaskId> standbyTasksSetup = 
Set.of(
             new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
         );
-        final Set<StreamsAssignmentInterface.TaskId> standbyTasks = Set.of(
-            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
+        final Set<StreamsAssignmentInterface.TaskId> activeTasks = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
         );
         when(
             
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-                makeTaskAssignment(activeTasksSetup, Collections.emptySet())
+                makeTaskAssignment(Collections.emptySet(), standbyTasksSetup, 
Collections.emptySet())
             )
         ).thenReturn(onTasksAssignedCallbackExecutedSetup);
-        
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
-            .thenReturn(onTasksRevokedCallbackExecuted);
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-            makeTaskAssignment(Collections.emptySet(), standbyTasks))
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(activeTasks, Collections.emptySet(), 
Collections.emptySet())
+            )
         ).thenReturn(onTasksAssignedCallbackExecuted);
         when(subscriptionState.assignedPartitions())
             .thenReturn(Collections.emptySet())
-            .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)))
-            .thenReturn(Collections.emptySet());
+            .thenReturn(Collections.emptySet())
+            .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
         joining();
-        reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        reconcile(makeHeartbeatResponseWithStandbyTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
         acknowledging(onTasksAssignedCallbackExecutedSetup);
 
+        reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_1)));
+
+        final Collection<TopicPartition> expectedFullPartitionsToAssign = 
Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+        final Collection<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
+        
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
+        onTasksAssignedCallbackExecuted.complete(null);
+        
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+        verifyThatNoTasksHaveBeenRevoked();
+    }
+
+    @Test
+    public void testReconcilingStandbyTaskToWarmupTask() {
+        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
+        final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
+        final Set<StreamsAssignmentInterface.TaskId> standbyTasksSetup = 
Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
+        );
+        final Set<StreamsAssignmentInterface.TaskId> warmupTasks = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
+        );
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(Collections.emptySet(), standbyTasksSetup, 
Collections.emptySet())
+            )
+        ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+            makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), 
warmupTasks))
+        ).thenReturn(onTasksAssignedCallbackExecuted);
+        joining();
         reconcile(makeHeartbeatResponseWithStandbyTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        acknowledging(onTasksAssignedCallbackExecutedSetup);
+        Mockito.reset(subscriptionState);
+
+        reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_1)));
 
-        final Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new 
TopicPartition(TOPIC_0, PARTITION_0));
         final Collection<TopicPartition> expectedFullPartitionsToAssign = 
Collections.emptySet();
         final Collection<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
-        verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
-            expectedPartitionsToRevoke,
-            expectedFullPartitionsToAssign,
-            expectedNewPartitionsToAssign
+        
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
+        onTasksAssignedCallbackExecuted.complete(null);
+        
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+        verifyThatNoTasksHaveBeenRevoked();
+    }
+
+    @Test
+    public void testReconcilingEmptyToSingleWarmupTask() {
+        final Set<StreamsAssignmentInterface.TaskId> warmupTasks =
+            Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
+        final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(Collections.emptySet(), 
Collections.emptySet(), warmupTasks)
+            )
+        ).thenReturn(onTasksAssignedCallbackExecuted);
+        joining();
+
+        reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+
+        final Collection<TopicPartition> expectedFullPartitionsToAssign = 
Collections.emptySet();
+        final Collection<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
+        
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
+        onTasksAssignedCallbackExecuted.complete(null);
+        
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+        verifyThatNoTasksHaveBeenRevoked();
+    }
+
+    @Test
+    public void testReconcilingWarmupTaskToDifferentWarmupTask() {
+        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
+        final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
+        final Set<StreamsAssignmentInterface.TaskId> warmupTasksSetup = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
         );
-        onTasksRevokedCallbackExecuted.complete(null);
+        final Set<StreamsAssignmentInterface.TaskId> warmupTasks = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
+        );
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(Collections.emptySet(), 
Collections.emptySet(), warmupTasksSetup)
+            )
+        ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(Collections.emptySet(), 
Collections.emptySet(), warmupTasks)
+            )
+        ).thenReturn(onTasksAssignedCallbackExecuted);
+        joining();
+        reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        acknowledging(onTasksAssignedCallbackExecutedSetup);
+        Mockito.reset(subscriptionState);
+
+        reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_1)));
+
+        final Collection<TopicPartition> expectedFullPartitionsToAssign = 
Collections.emptySet();
+        final Collection<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
         onTasksAssignedCallbackExecuted.complete(null);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+        verifyThatNoTasksHaveBeenRevoked();
     }
 
     @Test
-    public void testReconcilingStandbyTaskToActiveTask() {
+    public void testReconcilingSingleWarmupTaskToAdditionalWarmupTask() {
+        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
+        final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
+        final Set<StreamsAssignmentInterface.TaskId> warmupTasksSetup = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
+        );
+        final Set<StreamsAssignmentInterface.TaskId> warmupTasks = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0),
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
+        );
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(Collections.emptySet(), 
Collections.emptySet(), warmupTasksSetup)
+            )
+        ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(Collections.emptySet(), 
Collections.emptySet(), warmupTasks)
+            )
+        ).thenReturn(onTasksAssignedCallbackExecuted);
+        joining();
+        reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        acknowledging(onTasksAssignedCallbackExecutedSetup);
+        Mockito.reset(subscriptionState);
+
+        reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0, PARTITION_1)));
+
+        final Collection<TopicPartition> expectedFullPartitionsToAssign = 
Collections.emptySet();
+        final Collection<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
+        
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
+        onTasksAssignedCallbackExecuted.complete(null);
+        
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+        verifyThatNoTasksHaveBeenRevoked();
+    }
+
+    @Test
+    public void testReconcilingMultipleWarmupTaskToSingleWarmupTask() {
+        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
+        final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
+        final Set<StreamsAssignmentInterface.TaskId> warmupTasksSetup = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0),
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
+        );
+        final Set<StreamsAssignmentInterface.TaskId> warmupTasks = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
+        );
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(Collections.emptySet(), 
Collections.emptySet(), warmupTasksSetup)
+            )
+        ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(Collections.emptySet(), 
Collections.emptySet(), warmupTasks)
+            )
+        ).thenReturn(onTasksAssignedCallbackExecuted);
+        joining();
+        reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0, PARTITION_1)));
+        acknowledging(onTasksAssignedCallbackExecutedSetup);
+        Mockito.reset(subscriptionState);
+
+        reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_1)));
+
+        final Collection<TopicPartition> expectedFullPartitionsToAssign = 
Collections.emptySet();
+        final Collection<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
+        
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
+        onTasksAssignedCallbackExecuted.complete(null);
+        
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+        verifyThatNoTasksHaveBeenRevoked();
+    }
+
+    @Test
+    public void testReconcilingWarmupTaskToActiveTask() {
         
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUB_TOPOLOGY_ID_0,
 TOPIC_0);
         final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
         final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
-        final Set<StreamsAssignmentInterface.TaskId> standbyTasksSetup = 
Set.of(
+        final Set<StreamsAssignmentInterface.TaskId> warmupTasksSetup = Set.of(
             new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
         );
         final Set<StreamsAssignmentInterface.TaskId> activeTasks = Set.of(
-            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
         );
         when(
             
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-                makeTaskAssignment(Collections.emptySet(), standbyTasksSetup)
+                makeTaskAssignment(Collections.emptySet(), 
Collections.emptySet(), warmupTasksSetup)
             )
         ).thenReturn(onTasksAssignedCallbackExecutedSetup);
         when(
             
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
-                makeTaskAssignment(activeTasks, Collections.emptySet())
+                makeTaskAssignment(activeTasks, Collections.emptySet(), 
Collections.emptySet())
             )
         ).thenReturn(onTasksAssignedCallbackExecuted);
         when(subscriptionState.assignedPartitions())
             .thenReturn(Collections.emptySet())
             .thenReturn(Collections.emptySet())
-            .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
+            .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_1)));
         joining();
-        reconcile(makeHeartbeatResponseWithStandbyTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
         acknowledging(onTasksAssignedCallbackExecutedSetup);
 
-        reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_1)));
 
-        final Collection<TopicPartition> expectedFullPartitionsToAssign = 
Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+        final Collection<TopicPartition> expectedFullPartitionsToAssign = 
Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+        final Collection<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
+        
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
+        onTasksAssignedCallbackExecuted.complete(null);
+        
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
+        verifyThatNoTasksHaveBeenRevoked();
+    }
+
+    @Test
+    public void testReconcilingWarmupTaskToStandbyTask() {
+        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
+        final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
+        final Set<StreamsAssignmentInterface.TaskId> warmupTasksSetup = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
+        );
+        final Set<StreamsAssignmentInterface.TaskId> standbyTasks = Set.of(
+            new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_1)
+        );
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(Collections.emptySet(), 
Collections.emptySet(), warmupTasksSetup)
+            )
+        ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+        when(
+            
streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(
+                makeTaskAssignment(Collections.emptySet(), standbyTasks, 
Collections.emptySet())
+            )
+        ).thenReturn(onTasksAssignedCallbackExecuted);
+        joining();
+        reconcile(makeHeartbeatResponseWithWarmupTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        acknowledging(onTasksAssignedCallbackExecutedSetup);
+        Mockito.reset(subscriptionState);
+
+        reconcile(makeHeartbeatResponseWithStandbyTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_1)));
+
+        final Collection<TopicPartition> expectedFullPartitionsToAssign = 
Collections.emptySet();
         final Collection<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
         onTasksAssignedCallbackExecuted.complete(null);
@@ -554,7 +836,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks =
             Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
         final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new 
CompletableFuture<>();
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecuted);
         joining();
 
@@ -624,7 +906,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks =
             Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
         final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new 
CompletableFuture<>();
         
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasks))
@@ -661,7 +943,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks =
             Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
         final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
@@ -692,7 +974,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
             new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
         );
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         final CompletableFuture<Void> onAllTasksRevokedCallbackExecuted = new 
CompletableFuture<>();
         
when(streamsAssignmentInterface.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
@@ -716,7 +998,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks =
             Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
         final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
@@ -734,7 +1016,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks =
             Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
         final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
@@ -751,7 +1033,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks =
             Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
         final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
@@ -802,7 +1084,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks =
             Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
         final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
@@ -816,7 +1098,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks =
             Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
         final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
@@ -831,7 +1113,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasks =
             Set.of(new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0));
         final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = 
new CompletableFuture<>();
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, 
List.of(PARTITION_0)));
@@ -932,7 +1214,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
             new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
         );
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new 
CompletableFuture<>();
         
when(streamsAssignmentInterface.requestOnAllTasksLostCallbackInvocation())
@@ -950,7 +1232,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
             new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
         );
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new 
CompletableFuture<>();
         
when(streamsAssignmentInterface.requestOnAllTasksLostCallbackInvocation())
@@ -969,7 +1251,7 @@ public class StreamsMembershipManagerTest {
         final Set<StreamsAssignmentInterface.TaskId> activeTasksSetup = Set.of(
             new StreamsAssignmentInterface.TaskId(SUB_TOPOLOGY_ID_0, 
PARTITION_0)
         );
-        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet())))
+        
when(streamsAssignmentInterface.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup,
 Collections.emptySet(), Collections.emptySet())))
             .thenReturn(onTasksAssignedCallbackExecutedSetup);
         final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new 
CompletableFuture<>();
         
when(streamsAssignmentInterface.requestOnAllTasksLostCallbackInvocation())
@@ -1205,10 +1487,23 @@ public class StreamsMembershipManagerTest {
         );
     }
 
-    private StreamsGroupHeartbeatResponse makeHeartbeatResponse(final String 
subtopologyId0,
-                                                                final 
List<Integer> partitions0,
-                                                                final String 
subtopologyId1,
-                                                                final 
List<Integer> partitions1) {
+    private StreamsGroupHeartbeatResponse 
makeHeartbeatResponseWithWarmupTasks(final String subtopologyId,
+                                                                               
final List<Integer> partitions) {
+        return makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            List.of(
+                new StreamsGroupHeartbeatResponseData.TaskIds()
+                    .setSubtopologyId(subtopologyId)
+                    .setPartitions(partitions)
+            )
+        );
+    }
+
+    private StreamsGroupHeartbeatResponse 
makeHeartbeatResponseWithActiveTasks(final String subtopologyId0,
+                                                                               
final List<Integer> partitions0,
+                                                                               
final String subtopologyId1,
+                                                                               
final List<Integer> partitions1) {
         return makeHeartbeatResponseWithActiveTasks(List.of(
             new StreamsGroupHeartbeatResponseData.TaskIds()
                 .setSubtopologyId(subtopologyId0)
@@ -1237,11 +1532,12 @@ public class StreamsMembershipManagerTest {
     }
 
     private StreamsAssignmentInterface.Assignment makeTaskAssignment(final 
Set<StreamsAssignmentInterface.TaskId> activeTasks,
-                                                                     final 
Set<StreamsAssignmentInterface.TaskId> standbyTasks) {
+                                                                     final 
Set<StreamsAssignmentInterface.TaskId> standbyTasks,
+                                                                     final 
Set<StreamsAssignmentInterface.TaskId> warmupTasks) {
         return new StreamsAssignmentInterface.Assignment(
             activeTasks,
             standbyTasks,
-            Collections.emptySet()
+            warmupTasks
         );
     }
 

Reply via email to