This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new d3bf3cd  KAFKA-8649: send latest commonly supported version in 
assignment (#7426)
d3bf3cd is described below

commit d3bf3cd3d16945c17d45cf3b2bc4ef0d699cf418
Author: A. Sophie Blee-Goldman <sop...@confluent.io>
AuthorDate: Wed Oct 2 16:01:54 2019 -0700

    KAFKA-8649: send latest commonly supported version in assignment (#7426)
    
    PR 7423 but targeted at 2.2.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .gitignore                                         |   1 +
 .../streams/processor/internals/StreamThread.java  |   7 +-
 .../internals/StreamsPartitionAssignor.java        | 275 +++++++++++++--------
 .../streams/processor/internals/TaskManager.java   |  11 +-
 .../internals/assignment/AssignmentInfo.java       |  70 +++---
 .../internals/assignment/ClientState.java          |  36 ++-
 .../internals/StreamsPartitionAssignorTest.java    |   4 +-
 .../processor/internals/TaskManagerTest.java       |   1 +
 .../internals/assignment/ClientStateTest.java      |   4 +-
 .../assignment/StickyTaskAssignorTest.java         |  34 +--
 .../kafka/streams/tests/StreamsUpgradeTest.java    |   6 +
 .../tests/streams/streams_upgrade_test.py          |  28 +--
 12 files changed, 285 insertions(+), 192 deletions(-)

diff --git a/.gitignore b/.gitignore
index a31643f..0c6854d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -53,3 +53,4 @@ kafkatest.egg-info/
 systest/
 *.swp
 clients/src/generated
+clients/src/generated-test
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3083ec7..4c995e9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -286,9 +286,10 @@ public class StreamThread extends Thread {
                     );
                 } else if (streamThread.assignmentErrorCode.get() != 
StreamsPartitionAssignor.Error.NONE.code()) {
                     log.debug(
-                        "Encountered assignment error during partition 
assignment: {}. Skipping task initialization",
-                        streamThread.assignmentErrorCode
-                    );
+                        "Encountered assignment error during partition 
assignment: {}. Skipping task initialization and "
+                            + "pausing any partitions we may have been 
assigned.",
+                        streamThread.assignmentErrorCode);
+                    taskManager.pausePartitions();
                 } else {
                     log.debug("Creating tasks based on assignment.");
                     taskManager.createTasks(assignment);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 89e95b6..3dd0836 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -159,8 +159,8 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
         void addConsumer(final String consumerMemberId,
                          final SubscriptionInfo info) {
             consumers.add(consumerMemberId);
-            state.addPreviousActiveTasks(info.prevTasks());
-            state.addPreviousStandbyTasks(info.standbyTasks());
+            state.addPreviousActiveTasks(consumerMemberId, info.prevTasks());
+            state.addPreviousStandbyTasks(consumerMemberId, 
info.standbyTasks());
             state.incrementCapacity();
         }
 
@@ -397,6 +397,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
         final Set<String> futureConsumers = new HashSet<>();
 
         int minReceivedMetadataVersion = 
SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+        int minSupportedMetadataVersion = 
SubscriptionInfo.LATEST_SUPPORTED_VERSION;
 
         supportedVersions.clear();
         int futureMetadataVersion = UNKNOWN;
@@ -416,6 +417,11 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
                 minReceivedMetadataVersion = usedVersion;
             }
 
+            final int latestSupportedVersion = info.latestSupportedVersion();
+            if (latestSupportedVersion < minSupportedMetadataVersion) {
+                minSupportedMetadataVersion = latestSupportedVersion;
+            }
+
             // create the new client metadata if necessary
             ClientMetadata clientMetadata = 
clientsMetadata.get(info.processId());
 
@@ -659,18 +665,32 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
 
         final Map<String, Assignment> assignment;
         if (versionProbing) {
-            assignment = versionProbingAssignment(clientsMetadata, 
partitionsForTask, partitionsByHostState, futureConsumers, 
minReceivedMetadataVersion);
+            assignment = versionProbingAssignment(
+                clientsMetadata,
+                partitionsForTask,
+                partitionsByHostState,
+                futureConsumers,
+                minReceivedMetadataVersion,
+                minSupportedMetadataVersion
+            );
         } else {
-            assignment = computeNewAssignment(clientsMetadata, 
partitionsForTask, partitionsByHostState, minReceivedMetadataVersion);
+            assignment = computeNewAssignment(
+                clientsMetadata,
+                partitionsForTask,
+                partitionsByHostState,
+                minReceivedMetadataVersion,
+                minSupportedMetadataVersion
+            );
         }
 
         return assignment;
     }
 
     private Map<String, Assignment> computeNewAssignment(final Map<UUID, 
ClientMetadata> clientsMetadata,
-                                                         final Map<TaskId, 
Set<TopicPartition>> partitionsForTask,
-                                                         final Map<HostInfo, 
Set<TopicPartition>> partitionsByHostState,
-                                                         final int 
minUserMetadataVersion) {
+                                                                final 
Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                                                final 
Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
+                                                                final int 
minUserMetadataVersion,
+                                                                final int 
minSupportedMetadataVersion) {
         final Map<String, Assignment> assignment = new HashMap<>();
 
         // within the client, distribute tasks to its owned consumers
@@ -684,17 +704,15 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
             int consumerTaskIndex = 0;
 
             for (final String consumer : consumers) {
-                final Map<TaskId, Set<TopicPartition>> standby = new 
HashMap<>();
-                final ArrayList<AssignedPartition> assignedPartitions = new 
ArrayList<>();
+                final List<TaskId> activeTasks = 
interleavedActive.get(consumerTaskIndex);
 
-                final List<TaskId> assignedActiveList = 
interleavedActive.get(consumerTaskIndex);
+                // These will be filled in by 
buildAssignedActiveTaskAndPartitionsList below
+                final List<TopicPartition> activePartitionsList = new 
ArrayList<>();
+                final List<TaskId> assignedActiveList = new ArrayList<>();
 
-                for (final TaskId taskId : assignedActiveList) {
-                    for (final TopicPartition partition : 
partitionsForTask.get(taskId)) {
-                        assignedPartitions.add(new AssignedPartition(taskId, 
partition));
-                    }
-                }
+                buildAssignedActiveTaskAndPartitionsList(activeTasks, 
activePartitionsList, assignedActiveList, partitionsForTask);
 
+                final Map<TaskId, Set<TopicPartition>> standby = new 
HashMap<>();
                 if (!state.standbyTasks().isEmpty()) {
                     final List<TaskId> assignedStandbyList = 
interleavedStandby.get(consumerTaskIndex);
                     for (final TaskId taskId : assignedStandbyList) {
@@ -704,29 +722,33 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
 
                 consumerTaskIndex++;
 
-                Collections.sort(assignedPartitions);
-                final List<TaskId> active = new ArrayList<>();
-                final List<TopicPartition> activePartitions = new 
ArrayList<>();
-                for (final AssignedPartition partition : assignedPartitions) {
-                    active.add(partition.taskId);
-                    activePartitions.add(partition.partition);
-                }
-
                 // finally, encode the assignment before sending back to 
coordinator
-                assignment.put(consumer, new Assignment(
-                    activePartitions,
-                    new AssignmentInfo(minUserMetadataVersion, active, 
standby, partitionsByHostState, 0).encode()));
+                assignment.put(
+                    consumer,
+                    new Assignment(
+                        activePartitionsList,
+                        new AssignmentInfo(
+                            minUserMetadataVersion,
+                            minSupportedMetadataVersion,
+                            assignedActiveList,
+                            standby,
+                            partitionsByHostState,
+                            0
+                        ).encode()
+                    )
+                );
             }
         }
 
         return assignment;
     }
 
-    private Map<String, Assignment> versionProbingAssignment(final Map<UUID, 
ClientMetadata> clientsMetadata,
-                                                             final Map<TaskId, 
Set<TopicPartition>> partitionsForTask,
-                                                             final 
Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
-                                                             final Set<String> 
futureConsumers,
-                                                             final int 
minUserMetadataVersion) {
+    private static Map<String, Assignment> versionProbingAssignment(final 
Map<UUID, ClientMetadata> clientsMetadata,
+                                                                    final 
Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                                                    final 
Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
+                                                                    final 
Set<String> futureConsumers,
+                                                                    final int 
minUserMetadataVersion,
+                                                                    final int 
minSupportedMetadataVersion) {
         final Map<String, Assignment> assignment = new HashMap<>();
 
         // assign previously assigned tasks to "old consumers"
@@ -737,23 +759,27 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
                     continue;
                 }
 
-                final List<TaskId> activeTasks = new 
ArrayList<>(clientMetadata.state.prevActiveTasks());
+                // Return the same active tasks that were claimed in the 
subscription
+                final List<TaskId> activeTasks = new 
ArrayList<>(clientMetadata.state.prevActiveTasksForConsumer(consumerId));
 
-                final List<TopicPartition> assignedPartitions = new 
ArrayList<>();
-                for (final TaskId taskId : activeTasks) {
-                    assignedPartitions.addAll(partitionsForTask.get(taskId));
-                }
+                // These will be filled in by 
buildAssignedActiveTaskAndPartitionsList below
+                final List<TopicPartition> activePartitionsList = new 
ArrayList<>();
+                final List<TaskId> assignedActiveList = new ArrayList<>();
+
+                buildAssignedActiveTaskAndPartitionsList(activeTasks, 
activePartitionsList, assignedActiveList, partitionsForTask);
 
+                // Return the same standby tasks that were claimed in the 
subscription
                 final Map<TaskId, Set<TopicPartition>> standbyTasks = new 
HashMap<>();
-                for (final TaskId taskId : 
clientMetadata.state.prevStandbyTasks()) {
+                for (final TaskId taskId : 
clientMetadata.state.prevStandbyTasksForConsumer(consumerId)) {
                     standbyTasks.put(taskId, partitionsForTask.get(taskId));
                 }
 
                 assignment.put(consumerId, new Assignment(
-                    assignedPartitions,
+                    activePartitionsList,
                     new AssignmentInfo(
                         minUserMetadataVersion,
-                        activeTasks,
+                        minSupportedMetadataVersion,
+                        assignedActiveList,
                         standbyTasks,
                         partitionsByHostState,
                         0)
@@ -766,13 +792,34 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
         for (final String consumerId : futureConsumers) {
             assignment.put(consumerId, new Assignment(
                 Collections.emptyList(),
-                new AssignmentInfo().encode()
+                new AssignmentInfo(minUserMetadataVersion, 
minSupportedMetadataVersion).encode()
             ));
         }
 
         return assignment;
     }
 
+    private static void buildAssignedActiveTaskAndPartitionsList(final 
List<TaskId> activeTasks,
+                                                                 final 
List<TopicPartition> activePartitionsList,
+                                                                 final 
List<TaskId> assignedActiveList,
+                                                                 final 
Map<TaskId, Set<TopicPartition>> partitionsForTask) {
+        final List<AssignedPartition> assignedPartitions = new ArrayList<>();
+
+        // Build up list of all assigned partition-task pairs
+        for (final TaskId taskId : activeTasks) {
+            for (final TopicPartition partition : 
partitionsForTask.get(taskId)) {
+                assignedPartitions.add(new AssignedPartition(taskId, 
partition));
+            }
+        }
+
+        // Add one copy of a task for each corresponding partition, so the 
receiver can determine the task <-> tp mapping
+        Collections.sort(assignedPartitions);
+        for (final AssignedPartition partition : assignedPartitions) {
+            assignedActiveList.add(partition.taskId);
+            activePartitionsList.add(partition.partition);
+        }
+    }
+
     // visible for testing
     List<List<TaskId>> interleaveTasksByGroupId(final Collection<TaskId> 
taskIds, final int numberThreads) {
         final LinkedList<TaskId> sortedTasks = new LinkedList<>(taskIds);
@@ -793,6 +840,70 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
         return taskIdsForConsumerAssignment;
     }
 
+    private void validateMetadataVersions(final int 
receivedAssignmentMetadataVersion,
+                                          final int 
latestCommonlySupportedVersion) {
+
+        if (receivedAssignmentMetadataVersion > 
usedSubscriptionMetadataVersion) {
+            log.error("Leader sent back an assignment with version {} which 
was greater than our used version {}",
+                receivedAssignmentMetadataVersion, 
usedSubscriptionMetadataVersion);
+            throw new TaskAssignmentException(
+                "Sent a version " + usedSubscriptionMetadataVersion
+                    + " subscription but got an assignment with higher version 
"
+                    + receivedAssignmentMetadataVersion + "."
+            );
+        }
+
+        if (latestCommonlySupportedVersion > 
SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
+            log.error("Leader sent back assignment with commonly supported 
version {} that is greater than our "
+                    + "actual latest supported version {}", 
latestCommonlySupportedVersion,
+                SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+            throw new TaskAssignmentException("Can't upgrade to metadata 
version greater than we support");
+        }
+    }
+
+    // Returns true if subscription version was changed, indicating version 
probing and need to rebalance again
+    protected boolean maybeUpdateSubscriptionVersion(final int 
receivedAssignmentMetadataVersion,
+                                                     final int 
latestCommonlySupportedVersion) {
+        if (receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
+
+            // If the latest commonly supported version is now greater than 
our used version, this indicates we have just
+            // completed the rolling upgrade and can now update our 
subscription version for the final rebalance
+            if (latestCommonlySupportedVersion > 
usedSubscriptionMetadataVersion) {
+                log.info(
+                    "Sent a version {} subscription and group's latest 
commonly supported version is {} (successful "
+                        +
+                        "version probing and end of rolling upgrade). 
Upgrading subscription metadata version to " +
+                        "{} for next rebalance.",
+                    usedSubscriptionMetadataVersion,
+                    latestCommonlySupportedVersion,
+                    latestCommonlySupportedVersion
+                );
+                usedSubscriptionMetadataVersion = 
latestCommonlySupportedVersion;
+                return true;
+            }
+
+            // If we received a lower version than we sent, someone else in 
the group still hasn't upgraded. We
+            // should downgrade our subscription until everyone is on the 
latest version
+            if (receivedAssignmentMetadataVersion < 
usedSubscriptionMetadataVersion) {
+                log.info(
+                    "Sent a version {} subscription and got version {} 
assignment back (successful version probing). "
+                        +
+                        "Downgrade subscription metadata to commonly supported 
version and trigger new rebalance.",
+                    usedSubscriptionMetadataVersion,
+                    receivedAssignmentMetadataVersion
+                );
+                usedSubscriptionMetadataVersion = 
latestCommonlySupportedVersion;
+                return true;
+            }
+        } else {
+            log.debug("Received an assignment version {} that is less than the 
earliest version that allows version " +
+                "probing {}. If this is not during a rolling upgrade from 
version 2.0 or below, this is an error.",
+                receivedAssignmentMetadataVersion, EARLIEST_PROBEABLE_VERSION);
+        }
+
+        return false;
+    }
+
     /**
      * @throws TaskAssignmentException if there is no task id for one of the 
partitions specified
      */
@@ -804,36 +915,27 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
         final AssignmentInfo info = 
AssignmentInfo.decode(assignment.userData());
         if (info.errCode() != Error.NONE.code) {
             // set flag to shutdown streams app
-            assignmentErrorCode.set(info.errCode());
+            setAssignmentErrorCode(info.errCode());
             return;
         }
+        /*
+         * latestCommonlySupportedVersion belongs to 
[usedSubscriptionMetadataVersion, LATEST_SUPPORTED_VERSION]
+         * receivedAssignmentMetadataVersion belongs to 
[EARLIEST_PROBEABLE_VERSION, usedSubscriptionMetadataVersion]
+         *
+         * usedSubscriptionMetadataVersion will be downgraded to 
receivedAssignmentMetadataVersion during a rolling
+         * bounce upgrade with version probing.
+         *
+         * usedSubscriptionMetadataVersion will be upgraded to 
latestCommonlySupportedVersion when all members have
+         * been bounced and it is safe to use the latest version.
+         */
         final int receivedAssignmentMetadataVersion = info.version();
-        final int leaderSupportedVersion = info.latestSupportedVersion();
-
-        if (receivedAssignmentMetadataVersion > 
usedSubscriptionMetadataVersion) {
-            throw new IllegalStateException("Sent a version " + 
usedSubscriptionMetadataVersion
-                + " subscription but got an assignment with higher version " + 
receivedAssignmentMetadataVersion + ".");
-        }
+        final int latestCommonlySupportedVersion = 
info.commonlySupportedVersion();
 
-        if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion
-            && receivedAssignmentMetadataVersion >= 
EARLIEST_PROBEABLE_VERSION) {
-
-            if (receivedAssignmentMetadataVersion == leaderSupportedVersion) {
-                log.info("Sent a version {} subscription and got version {} 
assignment back (successful version probing). " +
-                        "Downgrading subscription metadata to received version 
and trigger new rebalance.",
-                    usedSubscriptionMetadataVersion,
-                    receivedAssignmentMetadataVersion);
-                usedSubscriptionMetadataVersion = 
receivedAssignmentMetadataVersion;
-            } else {
-                log.info("Sent a version {} subscription and got version {} 
assignment back (successful version probing). " +
-                    "Setting subscription metadata to leaders supported 
version {} and trigger new rebalance.",
-                    usedSubscriptionMetadataVersion,
-                    receivedAssignmentMetadataVersion,
-                    leaderSupportedVersion);
-                usedSubscriptionMetadataVersion = leaderSupportedVersion;
-            }
+        validateMetadataVersions(receivedAssignmentMetadataVersion, 
latestCommonlySupportedVersion);
 
-            assignmentErrorCode.set(Error.VERSION_PROBING.code);
+        // Check if this was a version probing rebalance and check the error 
code to trigger another rebalance if so
+        if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, 
latestCommonlySupportedVersion)) {
+            setAssignmentErrorCode(Error.VERSION_PROBING.code());
             return;
         }
 
@@ -849,31 +951,9 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
                 partitionsByHost = Collections.emptyMap();
                 break;
             case VERSION_TWO:
-                processVersionTwoAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
-                partitionsByHost = info.partitionsByHost();
-                break;
             case VERSION_THREE:
-                if (leaderSupportedVersion > usedSubscriptionMetadataVersion) {
-                    log.info("Sent a version {} subscription and group 
leader's latest supported version is {}. " +
-                        "Upgrading subscription metadata version to {} for 
next rebalance.",
-                        usedSubscriptionMetadataVersion,
-                        leaderSupportedVersion,
-                        leaderSupportedVersion);
-                    usedSubscriptionMetadataVersion = leaderSupportedVersion;
-                }
-                processVersionThreeAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
-                partitionsByHost = info.partitionsByHost();
-                break;
             case VERSION_FOUR:
-                if (leaderSupportedVersion > usedSubscriptionMetadataVersion) {
-                    log.info("Sent a version {} subscription and group 
leader's latest supported version is {}. " +
-                        "Upgrading subscription metadata version to {} for 
next rebalance.",
-                        usedSubscriptionMetadataVersion,
-                        leaderSupportedVersion,
-                        leaderSupportedVersion);
-                    usedSubscriptionMetadataVersion = leaderSupportedVersion;
-                }
-                processVersionFourAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
+                processVersionTwoAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
                 partitionsByHost = info.partitionsByHost();
                 break;
             default:
@@ -922,26 +1002,12 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
         }
     }
 
-    private void processVersionThreeAssignment(final AssignmentInfo info,
-                                               final List<TopicPartition> 
partitions,
-                                               final Map<TaskId, 
Set<TopicPartition>> activeTasks,
-                                               final Map<TopicPartition, 
PartitionInfo> topicToPartitionInfo) {
-        processVersionTwoAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
-    }
-
-    private void processVersionFourAssignment(final AssignmentInfo info,
-                                              final List<TopicPartition> 
partitions,
-                                              final Map<TaskId, 
Set<TopicPartition>> activeTasks,
-                                              final Map<TopicPartition, 
PartitionInfo> topicToPartitionInfo) {
-        processVersionThreeAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
-    }
-
     // for testing
     protected void processLatestVersionAssignment(final AssignmentInfo info,
                                                   final List<TopicPartition> 
partitions,
                                                   final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                                                   final Map<TopicPartition, 
PartitionInfo> topicToPartitionInfo) {
-        processVersionThreeAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
+        processVersionTwoAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
     }
 
     /**
@@ -1037,6 +1103,9 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
 
         }
     }
+    protected void setAssignmentErrorCode(final Integer errorCode) {
+        assignmentErrorCode.set(errorCode);
+    }
 
     // following functions are for test only
     void setInternalTopicManager(final InternalTopicManager 
internalTopicManager) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c136fdb..c0123ab 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -106,9 +106,9 @@ public class TaskManager {
 
         addStreamTasks(assignment);
         addStandbyTasks();
-        // Pause all the partitions until the underlying state store is ready 
for all the active tasks.
-        log.trace("Pausing partitions: {}", assignment);
-        consumer.pause(assignment);
+
+        // Pause all the new partitions until the underlying state store is 
ready for all the active tasks.
+        pausePartitions();
     }
 
     private void addStreamTasks(final Collection<TopicPartition> assignment) {
@@ -317,6 +317,11 @@ public class TaskManager {
         this.consumer = consumer;
     }
 
+    void pausePartitions() {
+        log.trace("Pausing partitions: {}", consumer.assignment());
+        consumer.pause(consumer.assignment());
+    }
+
     /**
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 8ad4036..5c7b037 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -45,59 +45,55 @@ public class AssignmentInfo {
     static final int UNKNOWN = -1;
 
     private final int usedVersion;
-    private final int latestSupportedVersion;
+    private final int commonlySupportedVersion;
     private int errCode;
     private List<TaskId> activeTasks;
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
     private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
 
-    // used for decoding; don't apply version checks
-    private AssignmentInfo(final int version,
-                           final int latestSupportedVersion) {
-        this.usedVersion = version;
-        this.latestSupportedVersion = latestSupportedVersion;
-        this.errCode = 0;
-    }
-
-    public AssignmentInfo(final List<TaskId> activeTasks,
-                          final Map<TaskId, Set<TopicPartition>> standbyTasks,
-                          final Map<HostInfo, Set<TopicPartition>> hostState) {
-        this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, 
0);
-    }
-
-    public AssignmentInfo() {
-        this(LATEST_SUPPORTED_VERSION,
+    // used for decoding and "future consumer" assignments during version 
probing
+    public AssignmentInfo(final int version,
+                          final int commonlySupportedVersion) {
+        this(version,
+            commonlySupportedVersion,
             Collections.emptyList(),
             Collections.emptyMap(),
             Collections.emptyMap(),
             0);
     }
 
+    public AssignmentInfo(final List<TaskId> activeTasks,
+                          final Map<TaskId, Set<TopicPartition>> standbyTasks,
+                          final Map<HostInfo, Set<TopicPartition>> 
partitionsByHost) {
+        this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, 
partitionsByHost, 0);
+    }
+
     public AssignmentInfo(final int version,
                           final List<TaskId> activeTasks,
                           final Map<TaskId, Set<TopicPartition>> standbyTasks,
-                          final Map<HostInfo, Set<TopicPartition>> hostState,
+                          final Map<HostInfo, Set<TopicPartition>> 
partitionsByHost,
                           final int errCode) {
-        this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, 
hostState, errCode);
-        if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
-            throw new IllegalArgumentException("version must be between 1 and 
" + LATEST_SUPPORTED_VERSION
-                + "; was: " + version);
-        }
+        this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, 
partitionsByHost, errCode);
     }
 
-    // for testing only; don't apply version checks
-    AssignmentInfo(final int version,
-                   final int latestSupportedVersion,
-                   final List<TaskId> activeTasks,
-                   final Map<TaskId, Set<TopicPartition>> standbyTasks,
-                   final Map<HostInfo, Set<TopicPartition>> hostState,
-                   final int errCode) {
+
+    public AssignmentInfo(final int version,
+                          final int commonlySupportedVersion,
+                          final List<TaskId> activeTasks,
+                          final Map<TaskId, Set<TopicPartition>> standbyTasks,
+                          final Map<HostInfo, Set<TopicPartition>> 
partitionsByHost,
+                          final int errCode) {
         this.usedVersion = version;
-        this.latestSupportedVersion = latestSupportedVersion;
+        this.commonlySupportedVersion = commonlySupportedVersion;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
-        this.partitionsByHost = hostState;
+        this.partitionsByHost = partitionsByHost;
         this.errCode = errCode;
+
+        if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
+            throw new IllegalArgumentException("version must be between 1 and 
" + LATEST_SUPPORTED_VERSION
+                + "; was: " + version);
+        }
     }
 
     public int version() {
@@ -108,8 +104,8 @@ public class AssignmentInfo {
         return errCode;
     }
 
-    public int latestSupportedVersion() {
-        return latestSupportedVersion;
+    public int commonlySupportedVersion() {
+        return commonlySupportedVersion;
     }
 
     public List<TaskId> activeTasks() {
@@ -334,7 +330,7 @@ public class AssignmentInfo {
 
     @Override
     public int hashCode() {
-        return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ 
standbyTasks.hashCode()
+        return usedVersion ^ commonlySupportedVersion ^ activeTasks.hashCode() 
^ standbyTasks.hashCode()
             ^ partitionsByHost.hashCode() ^ errCode;
     }
 
@@ -343,7 +339,7 @@ public class AssignmentInfo {
         if (o instanceof AssignmentInfo) {
             final AssignmentInfo other = (AssignmentInfo) o;
             return usedVersion == other.usedVersion &&
-                    latestSupportedVersion == other.latestSupportedVersion &&
+                    commonlySupportedVersion == other.commonlySupportedVersion 
&&
                     errCode == other.errCode &&
                     activeTasks.equals(other.activeTasks) &&
                     standbyTasks.equals(other.standbyTasks) &&
@@ -356,7 +352,7 @@ public class AssignmentInfo {
     @Override
     public String toString() {
         return "[version=" + usedVersion
-            + ", supported version=" + latestSupportedVersion
+            + ", supported version=" + commonlySupportedVersion
             + ", active tasks=" + activeTasks
             + ", standby tasks=" + standbyTasks
             + ", global assignment=" + partitionsByHost + "]";
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 6eff7bd..ab213d5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.kafka.streams.processor.TaskId;
 
 import java.util.HashSet;
@@ -29,15 +31,25 @@ public class ClientState {
     private final Set<TaskId> prevStandbyTasks;
     private final Set<TaskId> prevAssignedTasks;
 
-    private int capacity;
+    private final Map<String, Set<TaskId>> prevActiveTasksByConsumer;
+    private final Map<String, Set<TaskId>> prevStandbyTasksByConsumer;
 
+    private int capacity;
 
     public ClientState() {
         this(0);
     }
 
     ClientState(final int capacity) {
-        this(new HashSet<>(), new HashSet<>(), new HashSet<>(), new 
HashSet<>(), new HashSet<>(), new HashSet<>(), capacity);
+        this(new HashSet<>(),
+             new HashSet<>(),
+             new HashSet<>(),
+             new HashSet<>(),
+             new HashSet<>(),
+             new HashSet<>(),
+             new HashMap<>(),
+             new HashMap<>(),
+             capacity);
     }
 
     private ClientState(final Set<TaskId> activeTasks,
@@ -46,6 +58,8 @@ public class ClientState {
                         final Set<TaskId> prevActiveTasks,
                         final Set<TaskId> prevStandbyTasks,
                         final Set<TaskId> prevAssignedTasks,
+                        final Map<String, Set<TaskId>> 
prevActiveTasksByConsumer,
+                        final Map<String, Set<TaskId>> 
prevStandbyTasksByConsumer,
                         final int capacity) {
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
@@ -53,6 +67,8 @@ public class ClientState {
         this.prevActiveTasks = prevActiveTasks;
         this.prevStandbyTasks = prevStandbyTasks;
         this.prevAssignedTasks = prevAssignedTasks;
+        this.prevActiveTasksByConsumer = prevActiveTasksByConsumer;
+        this.prevStandbyTasksByConsumer = prevStandbyTasksByConsumer;
         this.capacity = capacity;
     }
 
@@ -64,6 +80,8 @@ public class ClientState {
             new HashSet<>(prevActiveTasks),
             new HashSet<>(prevStandbyTasks),
             new HashSet<>(prevAssignedTasks),
+            new HashMap<>(prevActiveTasksByConsumer),
+            new HashMap<>(prevStandbyTasksByConsumer),
             capacity);
     }
 
@@ -107,14 +125,24 @@ public class ClientState {
         return activeTasks.size();
     }
 
-    public void addPreviousActiveTasks(final Set<TaskId> prevTasks) {
+    public void addPreviousActiveTasks(final String consumer, final 
Set<TaskId> prevTasks) {
         prevActiveTasks.addAll(prevTasks);
         prevAssignedTasks.addAll(prevTasks);
+        prevActiveTasksByConsumer.put(consumer, prevTasks);
     }
 
-    public void addPreviousStandbyTasks(final Set<TaskId> standbyTasks) {
+    public void addPreviousStandbyTasks(final String consumer, final 
Set<TaskId> standbyTasks) {
         prevStandbyTasks.addAll(standbyTasks);
         prevAssignedTasks.addAll(standbyTasks);
+        prevStandbyTasksByConsumer.put(consumer, standbyTasks);
+    }
+
+    public Set<TaskId> prevActiveTasksForConsumer(final String consumer) {
+        return prevActiveTasksByConsumer.get(consumer);
+    }
+
+    public Set<TaskId> prevStandbyTasksForConsumer(final String consumer) {
+        return prevStandbyTasksByConsumer.get(consumer);
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 140c219..2d960f3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo.LATEST_SUPPORTED_VERSION;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertEquals;
@@ -1240,7 +1241,8 @@ public class StreamsPartitionAssignorTest {
             )));
         assertThat(assignment.get("consumer1").partitions(), 
equalTo(asList(t1p0, t1p1)));
 
-        
assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), 
equalTo(new AssignmentInfo()));
+        
assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()),
+            equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, 
LATEST_SUPPORTED_VERSION)));
         assertThat(assignment.get("future-consumer").partitions().size(), 
equalTo(0));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 7d7d4e6..9e5f8bd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -315,6 +315,7 @@ public class TaskManagerTest {
     @Test
     public void shouldPauseActivePartitions() {
         mockSingleActiveTask();
+        expect(consumer.assignment()).andReturn(taskId0Partitions).times(2);
         consumer.pause(taskId0Partitions);
         expectLastCall();
         replay();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
index 034ae7b..dc54c86 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
@@ -70,7 +70,7 @@ public class ClientStateTest {
         final TaskId tid1 = new TaskId(0, 1);
         final TaskId tid2 = new TaskId(0, 2);
 
-        client.addPreviousActiveTasks(Utils.mkSet(tid1, tid2));
+        client.addPreviousActiveTasks("consumer", Utils.mkSet(tid1, tid2));
         assertThat(client.previousActiveTasks(), equalTo(Utils.mkSet(tid1, 
tid2)));
         assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, 
tid2)));
     }
@@ -80,7 +80,7 @@ public class ClientStateTest {
         final TaskId tid1 = new TaskId(0, 1);
         final TaskId tid2 = new TaskId(0, 2);
 
-        client.addPreviousStandbyTasks(Utils.mkSet(tid1, tid2));
+        client.addPreviousStandbyTasks("consumer", Utils.mkSet(tid1, tid2));
         assertThat(client.previousActiveTasks().size(), equalTo(0));
         assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, 
tid2)));
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 17d403f..19d7730 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -207,11 +207,11 @@ public class StickyTaskAssignorTest {
     @Test
     public void shouldAssignTasksToClientWithPreviousStandbyTasks() {
         final ClientState client1 = createClient(p1, 1);
-        client1.addPreviousStandbyTasks(Utils.mkSet(task02));
+        client1.addPreviousStandbyTasks("consumer", Utils.mkSet(task02));
         final ClientState client2 = createClient(p2, 1);
-        client2.addPreviousStandbyTasks(Utils.mkSet(task01));
+        client2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
         final ClientState client3 = createClient(p3, 1);
-        client3.addPreviousStandbyTasks(Utils.mkSet(task00));
+        client3.addPreviousStandbyTasks("consumer", Utils.mkSet(task00));
 
         final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, 
task01, task02);
 
@@ -225,9 +225,9 @@ public class StickyTaskAssignorTest {
     @Test
     public void 
shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() {
         final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, 
task00);
-        c1.addPreviousStandbyTasks(Utils.mkSet(task01));
+        c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
         final ClientState c2 = createClientWithPreviousActiveTasks(p2, 2, 
task02);
-        c2.addPreviousStandbyTasks(Utils.mkSet(task01));
+        c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
 
         final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, 
task01, task02);
 
@@ -455,9 +455,9 @@ public class StickyTaskAssignorTest {
     @Test
     public void 
shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() {
         final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, 
task01, task02);
-        c1.addPreviousStandbyTasks(Utils.mkSet(task03, task00));
+        c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task00));
         final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, 
task03, task00);
-        c2.addPreviousStandbyTasks(Utils.mkSet(task01, task02));
+        c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task02));
 
         createClient(p3, 1);
         createClient(p4, 1);
@@ -577,14 +577,14 @@ public class StickyTaskAssignorTest {
         final TaskId task23 = new TaskId(2, 3);
 
         final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, 
task01, task12, task13);
-        c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, 
task23));
+        c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, task11, 
task20, task21, task23));
         final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, 
task00, task11, task22);
-        c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, 
task03, task12, task21, task13, task23));
+        c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10, 
task02, task20, task03, task12, task21, task13, task23));
         final ClientState c3 = createClientWithPreviousActiveTasks(p3, 1, 
task20, task21, task23);
-        c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12));
+        c3.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, task12));
 
         final ClientState newClient = createClient(p4, 1);
-        newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01, 
task02, task11, task20, task03, task12, task21, task13, task22, task23));
+        newClient.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, 
task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, 
task23));
 
         final StickyTaskAssignor<Integer> taskAssignor = 
createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, 
task12, task21, task13, task22, task23);
         taskAssignor.assign(0);
@@ -607,15 +607,15 @@ public class StickyTaskAssignorTest {
         final TaskId task23 = new TaskId(2, 3);
 
         final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, 
task01, task12, task13);
-        c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, 
task23));
+        c1.addPreviousStandbyTasks("c1onsumer", Utils.mkSet(task00, task11, 
task20, task21, task23));
         final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, 
task00, task11, task22);
-        c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, 
task03, task12, task21, task13, task23));
+        c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10, 
task02, task20, task03, task12, task21, task13, task23));
 
         final ClientState bounce1 = createClient(p3, 1);
-        bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23));
+        bounce1.addPreviousStandbyTasks("consumer", Utils.mkSet(task20, 
task21, task23));
 
         final ClientState bounce2 = createClient(p4, 1);
-        bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10));
+        bounce2.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, 
task03, task10));
 
         final StickyTaskAssignor<Integer> taskAssignor = 
createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, 
task12, task21, task13, task22, task23);
         taskAssignor.assign(0);
@@ -658,7 +658,7 @@ public class StickyTaskAssignorTest {
         final TaskId task06 = new TaskId(0, 6);
         final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, 
task00, task01, task02, task06);
         final ClientState c2 = createClient(p2, 1);
-        c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05));
+        c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task04, 
task05));
         final ClientState newClient = createClient(p3, 1);
 
         final StickyTaskAssignor<Integer> taskAssignor = 
createTaskAssignor(task00, task01, task02, task03, task04, task05, task06);
@@ -705,7 +705,7 @@ public class StickyTaskAssignorTest {
 
     private ClientState createClientWithPreviousActiveTasks(final Integer 
processId, final int capacity, final TaskId... taskIds) {
         final ClientState clientState = new ClientState(capacity);
-        clientState.addPreviousActiveTasks(Utils.mkSet(taskIds));
+        clientState.addPreviousActiveTasks("consumer", Utils.mkSet(taskIds));
         clients.put(processId, clientState);
         return clientState;
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 27bee81..ac56371 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -163,6 +163,11 @@ public class StreamsUpgradeTest {
             final AssignmentInfo info = AssignmentInfo.decode(
                 assignment.userData().putInt(0, 
AssignmentInfo.LATEST_SUPPORTED_VERSION));
 
+            if (super.maybeUpdateSubscriptionVersion(usedVersion, 
info.commonlySupportedVersion())) {
+                setAssignmentErrorCode(Error.VERSION_PROBING.code());
+                return;
+            }
+
             final List<TopicPartition> partitions = new 
ArrayList<>(assignment.partitions());
             partitions.sort(PARTITION_COMPARATOR);
 
@@ -292,6 +297,7 @@ public class StreamsUpgradeTest {
         private FutureAssignmentInfo(final boolean bumpUsedVersion,
                                      final boolean bumpSupportedVersion,
                                      final ByteBuffer bytes) {
+            super(LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION);
             this.bumpUsedVersion = bumpUsedVersion;
             this.bumpSupportedVersion = bumpSupportedVersion;
             originalUserMetadata = bytes;
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 4f8dc47..7b9a310 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -349,8 +349,6 @@ class StreamsUpgradeTest(Test):
             for p in self.processors:
                 found = list(p.node.account.ssh_capture("grep \"Finished 
assignment for group\" %s" % p.LOG_FILE, allow_fail=True))
                 if len(found) >= self.leader_counter[p] + 1:
-                    if self.leader is not None:
-                        raise Exception("Could not uniquely identify leader")
                     self.leader = p
                     self.leader_counter[p] = self.leader_counter[p] + 1
 
@@ -547,39 +545,25 @@ class StreamsUpgradeTest(Test):
                     else:
                         self.leader_counter[self.leader] = 
self.leader_counter[self.leader] + 1
 
-                    if processor == self.leader:
-                        leader_monitor = log_monitor
-                    elif first_other_processor == self.leader:
-                        leader_monitor = first_other_monitor
-                    elif second_other_processor == self.leader:
-                        leader_monitor = second_other_monitor
-                    else:
-                        raise Exception("Could not identify leader.")
-
                     monitors = {}
                     monitors[processor] = log_monitor
                     monitors[first_other_processor] = first_other_monitor
                     monitors[second_other_processor] = second_other_monitor
 
-                    leader_monitor.wait_until("Received a future (version 
probing) subscription (version: 5). Sending empty assignment back (with 
supported version 4).",
-                                              timeout_sec=60,
-                                              err_msg="Could not detect 
'version probing' attempt at leader " + str(self.leader.node.account))
-
                     if len(self.old_processors) > 0:
-                        log_monitor.wait_until("Sent a version 5 subscription 
and got version 4 assignment back (successful version probing). Downgrading 
subscription metadata to received version and trigger new rebalance.",
+                        log_monitor.wait_until("Sent a version 5 subscription 
and got version 4 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 
'successful version probing' at upgrading node " + str(node.account))
                     else:
-                        log_monitor.wait_until("Sent a version 5 subscription 
and got version 4 assignment back (successful version probing). Setting 
subscription metadata to leaders supported version 5 and trigger new 
rebalance.",
+                        log_monitor.wait_until("Sent a version 5 subscription 
and got version 4 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 
'successful version probing with upgraded leader' at upgrading node " + 
str(node.account))
-                        first_other_monitor.wait_until("Sent a version 4 
subscription and group leader.s latest supported version is 5. Upgrading 
subscription metadata version to 5 for next rebalance.",
+                        first_other_monitor.wait_until("Sent a version 4 
subscription and group.s latest commonly supported version is 5 (successful 
version probing and end of rolling upgrade). Upgrading subscription metadata 
version to 5 for next rebalance.",
                                                        timeout_sec=60,
-                                                       err_msg="Never saw 
output 'Upgrade metadata to version 4' on" + str(first_other_node.account))
-                        second_other_monitor.wait_until("Sent a version 4 
subscription and group leader.s latest supported version is 5. Upgrading 
subscription metadata version to 5 for next rebalance.",
+                                                       err_msg="Never saw 
output 'Upgrade metadata to version 5' on" + str(first_other_node.account))
+                        second_other_monitor.wait_until("Sent a version 4 
subscription and group.s latest commonly supported version is 5 (successful 
version probing and end of rolling upgrade). Upgrading subscription metadata 
version to 5 for next rebalance.",
                                                         timeout_sec=60,
-                                                        err_msg="Never saw 
output 'Upgrade metadata to version 4' on" + str(second_other_node.account))
-
+                                                        err_msg="Never saw 
output 'Upgrade metadata to version 5' on" + str(second_other_node.account))
                     log_monitor.wait_until("Version probing detected. 
Triggering new rebalance.",
                                            timeout_sec=60,
                                            err_msg="Could not detect 
'Triggering new rebalance' at upgrading node " + str(node.account))

Reply via email to