This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new d3bf3cd KAFKA-8649: send latest commonly supported version in
assignment (#7426)
d3bf3cd is described below
commit d3bf3cd3d16945c17d45cf3b2bc4ef0d699cf418
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Wed Oct 2 16:01:54 2019 -0700
KAFKA-8649: send latest commonly supported version in assignment (#7426)
PR 7423 but targeted at 2.2.
Reviewers: Guozhang Wang <[email protected]>
---
.gitignore | 1 +
.../streams/processor/internals/StreamThread.java | 7 +-
.../internals/StreamsPartitionAssignor.java | 275 +++++++++++++--------
.../streams/processor/internals/TaskManager.java | 11 +-
.../internals/assignment/AssignmentInfo.java | 70 +++---
.../internals/assignment/ClientState.java | 36 ++-
.../internals/StreamsPartitionAssignorTest.java | 4 +-
.../processor/internals/TaskManagerTest.java | 1 +
.../internals/assignment/ClientStateTest.java | 4 +-
.../assignment/StickyTaskAssignorTest.java | 34 +--
.../kafka/streams/tests/StreamsUpgradeTest.java | 6 +
.../tests/streams/streams_upgrade_test.py | 28 +--
12 files changed, 285 insertions(+), 192 deletions(-)
diff --git a/.gitignore b/.gitignore
index a31643f..0c6854d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -53,3 +53,4 @@ kafkatest.egg-info/
systest/
*.swp
clients/src/generated
+clients/src/generated-test
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3083ec7..4c995e9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -286,9 +286,10 @@ public class StreamThread extends Thread {
);
} else if (streamThread.assignmentErrorCode.get() !=
StreamsPartitionAssignor.Error.NONE.code()) {
log.debug(
- "Encountered assignment error during partition
assignment: {}. Skipping task initialization",
- streamThread.assignmentErrorCode
- );
+ "Encountered assignment error during partition
assignment: {}. Skipping task initialization and "
+ + "pausing any partitions we may have been
assigned.",
+ streamThread.assignmentErrorCode);
+ taskManager.pausePartitions();
} else {
log.debug("Creating tasks based on assignment.");
taskManager.createTasks(assignment);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 89e95b6..3dd0836 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -159,8 +159,8 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
void addConsumer(final String consumerMemberId,
final SubscriptionInfo info) {
consumers.add(consumerMemberId);
- state.addPreviousActiveTasks(info.prevTasks());
- state.addPreviousStandbyTasks(info.standbyTasks());
+ state.addPreviousActiveTasks(consumerMemberId, info.prevTasks());
+ state.addPreviousStandbyTasks(consumerMemberId,
info.standbyTasks());
state.incrementCapacity();
}
@@ -397,6 +397,7 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
final Set<String> futureConsumers = new HashSet<>();
int minReceivedMetadataVersion =
SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+ int minSupportedMetadataVersion =
SubscriptionInfo.LATEST_SUPPORTED_VERSION;
supportedVersions.clear();
int futureMetadataVersion = UNKNOWN;
@@ -416,6 +417,11 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
minReceivedMetadataVersion = usedVersion;
}
+ final int latestSupportedVersion = info.latestSupportedVersion();
+ if (latestSupportedVersion < minSupportedMetadataVersion) {
+ minSupportedMetadataVersion = latestSupportedVersion;
+ }
+
// create the new client metadata if necessary
ClientMetadata clientMetadata =
clientsMetadata.get(info.processId());
@@ -659,18 +665,32 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
final Map<String, Assignment> assignment;
if (versionProbing) {
- assignment = versionProbingAssignment(clientsMetadata,
partitionsForTask, partitionsByHostState, futureConsumers,
minReceivedMetadataVersion);
+ assignment = versionProbingAssignment(
+ clientsMetadata,
+ partitionsForTask,
+ partitionsByHostState,
+ futureConsumers,
+ minReceivedMetadataVersion,
+ minSupportedMetadataVersion
+ );
} else {
- assignment = computeNewAssignment(clientsMetadata,
partitionsForTask, partitionsByHostState, minReceivedMetadataVersion);
+ assignment = computeNewAssignment(
+ clientsMetadata,
+ partitionsForTask,
+ partitionsByHostState,
+ minReceivedMetadataVersion,
+ minSupportedMetadataVersion
+ );
}
return assignment;
}
private Map<String, Assignment> computeNewAssignment(final Map<UUID,
ClientMetadata> clientsMetadata,
- final Map<TaskId,
Set<TopicPartition>> partitionsForTask,
- final Map<HostInfo,
Set<TopicPartition>> partitionsByHostState,
- final int
minUserMetadataVersion) {
+ final
Map<TaskId, Set<TopicPartition>> partitionsForTask,
+ final
Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
+ final int
minUserMetadataVersion,
+ final int
minSupportedMetadataVersion) {
final Map<String, Assignment> assignment = new HashMap<>();
// within the client, distribute tasks to its owned consumers
@@ -684,17 +704,15 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
int consumerTaskIndex = 0;
for (final String consumer : consumers) {
- final Map<TaskId, Set<TopicPartition>> standby = new
HashMap<>();
- final ArrayList<AssignedPartition> assignedPartitions = new
ArrayList<>();
+ final List<TaskId> activeTasks =
interleavedActive.get(consumerTaskIndex);
- final List<TaskId> assignedActiveList =
interleavedActive.get(consumerTaskIndex);
+ // These will be filled in by
buildAssignedActiveTaskAndPartitionsList below
+ final List<TopicPartition> activePartitionsList = new
ArrayList<>();
+ final List<TaskId> assignedActiveList = new ArrayList<>();
- for (final TaskId taskId : assignedActiveList) {
- for (final TopicPartition partition :
partitionsForTask.get(taskId)) {
- assignedPartitions.add(new AssignedPartition(taskId,
partition));
- }
- }
+ buildAssignedActiveTaskAndPartitionsList(activeTasks,
activePartitionsList, assignedActiveList, partitionsForTask);
+ final Map<TaskId, Set<TopicPartition>> standby = new
HashMap<>();
if (!state.standbyTasks().isEmpty()) {
final List<TaskId> assignedStandbyList =
interleavedStandby.get(consumerTaskIndex);
for (final TaskId taskId : assignedStandbyList) {
@@ -704,29 +722,33 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
consumerTaskIndex++;
- Collections.sort(assignedPartitions);
- final List<TaskId> active = new ArrayList<>();
- final List<TopicPartition> activePartitions = new
ArrayList<>();
- for (final AssignedPartition partition : assignedPartitions) {
- active.add(partition.taskId);
- activePartitions.add(partition.partition);
- }
-
// finally, encode the assignment before sending back to
coordinator
- assignment.put(consumer, new Assignment(
- activePartitions,
- new AssignmentInfo(minUserMetadataVersion, active,
standby, partitionsByHostState, 0).encode()));
+ assignment.put(
+ consumer,
+ new Assignment(
+ activePartitionsList,
+ new AssignmentInfo(
+ minUserMetadataVersion,
+ minSupportedMetadataVersion,
+ assignedActiveList,
+ standby,
+ partitionsByHostState,
+ 0
+ ).encode()
+ )
+ );
}
}
return assignment;
}
- private Map<String, Assignment> versionProbingAssignment(final Map<UUID,
ClientMetadata> clientsMetadata,
- final Map<TaskId,
Set<TopicPartition>> partitionsForTask,
- final
Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
- final Set<String>
futureConsumers,
- final int
minUserMetadataVersion) {
+ private static Map<String, Assignment> versionProbingAssignment(final
Map<UUID, ClientMetadata> clientsMetadata,
+ final
Map<TaskId, Set<TopicPartition>> partitionsForTask,
+ final
Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
+ final
Set<String> futureConsumers,
+ final int
minUserMetadataVersion,
+ final int
minSupportedMetadataVersion) {
final Map<String, Assignment> assignment = new HashMap<>();
// assign previously assigned tasks to "old consumers"
@@ -737,23 +759,27 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
continue;
}
- final List<TaskId> activeTasks = new
ArrayList<>(clientMetadata.state.prevActiveTasks());
+ // Return the same active tasks that were claimed in the
subscription
+ final List<TaskId> activeTasks = new
ArrayList<>(clientMetadata.state.prevActiveTasksForConsumer(consumerId));
- final List<TopicPartition> assignedPartitions = new
ArrayList<>();
- for (final TaskId taskId : activeTasks) {
- assignedPartitions.addAll(partitionsForTask.get(taskId));
- }
+ // These will be filled in by
buildAssignedActiveTaskAndPartitionsList below
+ final List<TopicPartition> activePartitionsList = new
ArrayList<>();
+ final List<TaskId> assignedActiveList = new ArrayList<>();
+
+ buildAssignedActiveTaskAndPartitionsList(activeTasks,
activePartitionsList, assignedActiveList, partitionsForTask);
+ // Return the same standby tasks that were claimed in the
subscription
final Map<TaskId, Set<TopicPartition>> standbyTasks = new
HashMap<>();
- for (final TaskId taskId :
clientMetadata.state.prevStandbyTasks()) {
+ for (final TaskId taskId :
clientMetadata.state.prevStandbyTasksForConsumer(consumerId)) {
standbyTasks.put(taskId, partitionsForTask.get(taskId));
}
assignment.put(consumerId, new Assignment(
- assignedPartitions,
+ activePartitionsList,
new AssignmentInfo(
minUserMetadataVersion,
- activeTasks,
+ minSupportedMetadataVersion,
+ assignedActiveList,
standbyTasks,
partitionsByHostState,
0)
@@ -766,13 +792,34 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
for (final String consumerId : futureConsumers) {
assignment.put(consumerId, new Assignment(
Collections.emptyList(),
- new AssignmentInfo().encode()
+ new AssignmentInfo(minUserMetadataVersion,
minSupportedMetadataVersion).encode()
));
}
return assignment;
}
+ private static void buildAssignedActiveTaskAndPartitionsList(final
List<TaskId> activeTasks,
+ final
List<TopicPartition> activePartitionsList,
+ final
List<TaskId> assignedActiveList,
+ final
Map<TaskId, Set<TopicPartition>> partitionsForTask) {
+ final List<AssignedPartition> assignedPartitions = new ArrayList<>();
+
+ // Build up list of all assigned partition-task pairs
+ for (final TaskId taskId : activeTasks) {
+ for (final TopicPartition partition :
partitionsForTask.get(taskId)) {
+ assignedPartitions.add(new AssignedPartition(taskId,
partition));
+ }
+ }
+
+ // Add one copy of a task for each corresponding partition, so the
receiver can determine the task <-> tp mapping
+ Collections.sort(assignedPartitions);
+ for (final AssignedPartition partition : assignedPartitions) {
+ assignedActiveList.add(partition.taskId);
+ activePartitionsList.add(partition.partition);
+ }
+ }
+
// visible for testing
List<List<TaskId>> interleaveTasksByGroupId(final Collection<TaskId>
taskIds, final int numberThreads) {
final LinkedList<TaskId> sortedTasks = new LinkedList<>(taskIds);
@@ -793,6 +840,70 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
return taskIdsForConsumerAssignment;
}
+ private void validateMetadataVersions(final int
receivedAssignmentMetadataVersion,
+ final int
latestCommonlySupportedVersion) {
+
+ if (receivedAssignmentMetadataVersion >
usedSubscriptionMetadataVersion) {
+ log.error("Leader sent back an assignment with version {} which
was greater than our used version {}",
+ receivedAssignmentMetadataVersion,
usedSubscriptionMetadataVersion);
+ throw new TaskAssignmentException(
+ "Sent a version " + usedSubscriptionMetadataVersion
+ + " subscription but got an assignment with higher version
"
+ + receivedAssignmentMetadataVersion + "."
+ );
+ }
+
+ if (latestCommonlySupportedVersion >
SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
+ log.error("Leader sent back assignment with commonly supported
version {} that is greater than our "
+ + "actual latest supported version {}",
latestCommonlySupportedVersion,
+ SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+ throw new TaskAssignmentException("Can't upgrade to metadata
version greater than we support");
+ }
+ }
+
+ // Returns true if subscription version was changed, indicating version
probing and need to rebalance again
+ protected boolean maybeUpdateSubscriptionVersion(final int
receivedAssignmentMetadataVersion,
+ final int
latestCommonlySupportedVersion) {
+ if (receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
+
+ // If the latest commonly supported version is now greater than
our used version, this indicates we have just
+ // completed the rolling upgrade and can now update our
subscription version for the final rebalance
+ if (latestCommonlySupportedVersion >
usedSubscriptionMetadataVersion) {
+ log.info(
+ "Sent a version {} subscription and group's latest
commonly supported version is {} (successful "
+ +
+ "version probing and end of rolling upgrade).
Upgrading subscription metadata version to " +
+ "{} for next rebalance.",
+ usedSubscriptionMetadataVersion,
+ latestCommonlySupportedVersion,
+ latestCommonlySupportedVersion
+ );
+ usedSubscriptionMetadataVersion =
latestCommonlySupportedVersion;
+ return true;
+ }
+
+ // If we received a lower version than we sent, someone else in
the group still hasn't upgraded. We
+ // should downgrade our subscription until everyone is on the
latest version
+ if (receivedAssignmentMetadataVersion <
usedSubscriptionMetadataVersion) {
+ log.info(
+ "Sent a version {} subscription and got version {}
assignment back (successful version probing). "
+ +
+ "Downgrade subscription metadata to commonly supported
version and trigger new rebalance.",
+ usedSubscriptionMetadataVersion,
+ receivedAssignmentMetadataVersion
+ );
+ usedSubscriptionMetadataVersion =
latestCommonlySupportedVersion;
+ return true;
+ }
+ } else {
+ log.debug("Received an assignment version {} that is less than the
earliest version that allows version " +
+ "probing {}. If this is not during a rolling upgrade from
version 2.0 or below, this is an error.",
+ receivedAssignmentMetadataVersion, EARLIEST_PROBEABLE_VERSION);
+ }
+
+ return false;
+ }
+
/**
* @throws TaskAssignmentException if there is no task id for one of the
partitions specified
*/
@@ -804,36 +915,27 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
final AssignmentInfo info =
AssignmentInfo.decode(assignment.userData());
if (info.errCode() != Error.NONE.code) {
// set flag to shutdown streams app
- assignmentErrorCode.set(info.errCode());
+ setAssignmentErrorCode(info.errCode());
return;
}
+ /*
+ * latestCommonlySupportedVersion belongs to
[usedSubscriptionMetadataVersion, LATEST_SUPPORTED_VERSION]
+ * receivedAssignmentMetadataVersion belongs to
[EARLIEST_PROBEABLE_VERSION, usedSubscriptionMetadataVersion]
+ *
+ * usedSubscriptionMetadataVersion will be downgraded to
receivedAssignmentMetadataVersion during a rolling
+ * bounce upgrade with version probing.
+ *
+ * usedSubscriptionMetadataVersion will be upgraded to
latestCommonlySupportedVersion when all members have
+ * been bounced and it is safe to use the latest version.
+ */
final int receivedAssignmentMetadataVersion = info.version();
- final int leaderSupportedVersion = info.latestSupportedVersion();
-
- if (receivedAssignmentMetadataVersion >
usedSubscriptionMetadataVersion) {
- throw new IllegalStateException("Sent a version " +
usedSubscriptionMetadataVersion
- + " subscription but got an assignment with higher version " +
receivedAssignmentMetadataVersion + ".");
- }
+ final int latestCommonlySupportedVersion =
info.commonlySupportedVersion();
- if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion
- && receivedAssignmentMetadataVersion >=
EARLIEST_PROBEABLE_VERSION) {
-
- if (receivedAssignmentMetadataVersion == leaderSupportedVersion) {
- log.info("Sent a version {} subscription and got version {}
assignment back (successful version probing). " +
- "Downgrading subscription metadata to received version
and trigger new rebalance.",
- usedSubscriptionMetadataVersion,
- receivedAssignmentMetadataVersion);
- usedSubscriptionMetadataVersion =
receivedAssignmentMetadataVersion;
- } else {
- log.info("Sent a version {} subscription and got version {}
assignment back (successful version probing). " +
- "Setting subscription metadata to leaders supported
version {} and trigger new rebalance.",
- usedSubscriptionMetadataVersion,
- receivedAssignmentMetadataVersion,
- leaderSupportedVersion);
- usedSubscriptionMetadataVersion = leaderSupportedVersion;
- }
+ validateMetadataVersions(receivedAssignmentMetadataVersion,
latestCommonlySupportedVersion);
- assignmentErrorCode.set(Error.VERSION_PROBING.code);
+ // Check if this was a version probing rebalance and check the error
code to trigger another rebalance if so
+ if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion,
latestCommonlySupportedVersion)) {
+ setAssignmentErrorCode(Error.VERSION_PROBING.code());
return;
}
@@ -849,31 +951,9 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
partitionsByHost = Collections.emptyMap();
break;
case VERSION_TWO:
- processVersionTwoAssignment(info, partitions, activeTasks,
topicToPartitionInfo);
- partitionsByHost = info.partitionsByHost();
- break;
case VERSION_THREE:
- if (leaderSupportedVersion > usedSubscriptionMetadataVersion) {
- log.info("Sent a version {} subscription and group
leader's latest supported version is {}. " +
- "Upgrading subscription metadata version to {} for
next rebalance.",
- usedSubscriptionMetadataVersion,
- leaderSupportedVersion,
- leaderSupportedVersion);
- usedSubscriptionMetadataVersion = leaderSupportedVersion;
- }
- processVersionThreeAssignment(info, partitions, activeTasks,
topicToPartitionInfo);
- partitionsByHost = info.partitionsByHost();
- break;
case VERSION_FOUR:
- if (leaderSupportedVersion > usedSubscriptionMetadataVersion) {
- log.info("Sent a version {} subscription and group
leader's latest supported version is {}. " +
- "Upgrading subscription metadata version to {} for
next rebalance.",
- usedSubscriptionMetadataVersion,
- leaderSupportedVersion,
- leaderSupportedVersion);
- usedSubscriptionMetadataVersion = leaderSupportedVersion;
- }
- processVersionFourAssignment(info, partitions, activeTasks,
topicToPartitionInfo);
+ processVersionTwoAssignment(info, partitions, activeTasks,
topicToPartitionInfo);
partitionsByHost = info.partitionsByHost();
break;
default:
@@ -922,26 +1002,12 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
}
}
- private void processVersionThreeAssignment(final AssignmentInfo info,
- final List<TopicPartition>
partitions,
- final Map<TaskId,
Set<TopicPartition>> activeTasks,
- final Map<TopicPartition,
PartitionInfo> topicToPartitionInfo) {
- processVersionTwoAssignment(info, partitions, activeTasks,
topicToPartitionInfo);
- }
-
- private void processVersionFourAssignment(final AssignmentInfo info,
- final List<TopicPartition>
partitions,
- final Map<TaskId,
Set<TopicPartition>> activeTasks,
- final Map<TopicPartition,
PartitionInfo> topicToPartitionInfo) {
- processVersionThreeAssignment(info, partitions, activeTasks,
topicToPartitionInfo);
- }
-
// for testing
protected void processLatestVersionAssignment(final AssignmentInfo info,
final List<TopicPartition>
partitions,
final Map<TaskId,
Set<TopicPartition>> activeTasks,
final Map<TopicPartition,
PartitionInfo> topicToPartitionInfo) {
- processVersionThreeAssignment(info, partitions, activeTasks,
topicToPartitionInfo);
+ processVersionTwoAssignment(info, partitions, activeTasks,
topicToPartitionInfo);
}
/**
@@ -1037,6 +1103,9 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
}
}
+ protected void setAssignmentErrorCode(final Integer errorCode) {
+ assignmentErrorCode.set(errorCode);
+ }
// following functions are for test only
void setInternalTopicManager(final InternalTopicManager
internalTopicManager) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c136fdb..c0123ab 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -106,9 +106,9 @@ public class TaskManager {
addStreamTasks(assignment);
addStandbyTasks();
- // Pause all the partitions until the underlying state store is ready
for all the active tasks.
- log.trace("Pausing partitions: {}", assignment);
- consumer.pause(assignment);
+
+ // Pause all the new partitions until the underlying state store is
ready for all the active tasks.
+ pausePartitions();
}
private void addStreamTasks(final Collection<TopicPartition> assignment) {
@@ -317,6 +317,11 @@ public class TaskManager {
this.consumer = consumer;
}
+ void pausePartitions() {
+ log.trace("Pausing partitions: {}", consumer.assignment());
+ consumer.pause(consumer.assignment());
+ }
+
/**
* @throws IllegalStateException If store gets registered after
initialized is already finished
* @throws StreamsException if the store's change log does not contain the
partition
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 8ad4036..5c7b037 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -45,59 +45,55 @@ public class AssignmentInfo {
static final int UNKNOWN = -1;
private final int usedVersion;
- private final int latestSupportedVersion;
+ private final int commonlySupportedVersion;
private int errCode;
private List<TaskId> activeTasks;
private Map<TaskId, Set<TopicPartition>> standbyTasks;
private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
- // used for decoding; don't apply version checks
- private AssignmentInfo(final int version,
- final int latestSupportedVersion) {
- this.usedVersion = version;
- this.latestSupportedVersion = latestSupportedVersion;
- this.errCode = 0;
- }
-
- public AssignmentInfo(final List<TaskId> activeTasks,
- final Map<TaskId, Set<TopicPartition>> standbyTasks,
- final Map<HostInfo, Set<TopicPartition>> hostState) {
- this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState,
0);
- }
-
- public AssignmentInfo() {
- this(LATEST_SUPPORTED_VERSION,
+ // used for decoding and "future consumer" assignments during version
probing
+ public AssignmentInfo(final int version,
+ final int commonlySupportedVersion) {
+ this(version,
+ commonlySupportedVersion,
Collections.emptyList(),
Collections.emptyMap(),
Collections.emptyMap(),
0);
}
+ public AssignmentInfo(final List<TaskId> activeTasks,
+ final Map<TaskId, Set<TopicPartition>> standbyTasks,
+ final Map<HostInfo, Set<TopicPartition>>
partitionsByHost) {
+ this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks,
partitionsByHost, 0);
+ }
+
public AssignmentInfo(final int version,
final List<TaskId> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks,
- final Map<HostInfo, Set<TopicPartition>> hostState,
+ final Map<HostInfo, Set<TopicPartition>>
partitionsByHost,
final int errCode) {
- this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks,
hostState, errCode);
- if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
- throw new IllegalArgumentException("version must be between 1 and
" + LATEST_SUPPORTED_VERSION
- + "; was: " + version);
- }
+ this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks,
partitionsByHost, errCode);
}
- // for testing only; don't apply version checks
- AssignmentInfo(final int version,
- final int latestSupportedVersion,
- final List<TaskId> activeTasks,
- final Map<TaskId, Set<TopicPartition>> standbyTasks,
- final Map<HostInfo, Set<TopicPartition>> hostState,
- final int errCode) {
+
+ public AssignmentInfo(final int version,
+ final int commonlySupportedVersion,
+ final List<TaskId> activeTasks,
+ final Map<TaskId, Set<TopicPartition>> standbyTasks,
+ final Map<HostInfo, Set<TopicPartition>>
partitionsByHost,
+ final int errCode) {
this.usedVersion = version;
- this.latestSupportedVersion = latestSupportedVersion;
+ this.commonlySupportedVersion = commonlySupportedVersion;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
- this.partitionsByHost = hostState;
+ this.partitionsByHost = partitionsByHost;
this.errCode = errCode;
+
+ if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
+ throw new IllegalArgumentException("version must be between 1 and
" + LATEST_SUPPORTED_VERSION
+ + "; was: " + version);
+ }
}
public int version() {
@@ -108,8 +104,8 @@ public class AssignmentInfo {
return errCode;
}
- public int latestSupportedVersion() {
- return latestSupportedVersion;
+ public int commonlySupportedVersion() {
+ return commonlySupportedVersion;
}
public List<TaskId> activeTasks() {
@@ -334,7 +330,7 @@ public class AssignmentInfo {
@Override
public int hashCode() {
- return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^
standbyTasks.hashCode()
+ return usedVersion ^ commonlySupportedVersion ^ activeTasks.hashCode()
^ standbyTasks.hashCode()
^ partitionsByHost.hashCode() ^ errCode;
}
@@ -343,7 +339,7 @@ public class AssignmentInfo {
if (o instanceof AssignmentInfo) {
final AssignmentInfo other = (AssignmentInfo) o;
return usedVersion == other.usedVersion &&
- latestSupportedVersion == other.latestSupportedVersion &&
+ commonlySupportedVersion == other.commonlySupportedVersion
&&
errCode == other.errCode &&
activeTasks.equals(other.activeTasks) &&
standbyTasks.equals(other.standbyTasks) &&
@@ -356,7 +352,7 @@ public class AssignmentInfo {
@Override
public String toString() {
return "[version=" + usedVersion
- + ", supported version=" + latestSupportedVersion
+ + ", supported version=" + commonlySupportedVersion
+ ", active tasks=" + activeTasks
+ ", standby tasks=" + standbyTasks
+ ", global assignment=" + partitionsByHost + "]";
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 6eff7bd..ab213d5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.kafka.streams.processor.TaskId;
import java.util.HashSet;
@@ -29,15 +31,25 @@ public class ClientState {
private final Set<TaskId> prevStandbyTasks;
private final Set<TaskId> prevAssignedTasks;
- private int capacity;
+ private final Map<String, Set<TaskId>> prevActiveTasksByConsumer;
+ private final Map<String, Set<TaskId>> prevStandbyTasksByConsumer;
+ private int capacity;
public ClientState() {
this(0);
}
ClientState(final int capacity) {
- this(new HashSet<>(), new HashSet<>(), new HashSet<>(), new
HashSet<>(), new HashSet<>(), new HashSet<>(), capacity);
+ this(new HashSet<>(),
+ new HashSet<>(),
+ new HashSet<>(),
+ new HashSet<>(),
+ new HashSet<>(),
+ new HashSet<>(),
+ new HashMap<>(),
+ new HashMap<>(),
+ capacity);
}
private ClientState(final Set<TaskId> activeTasks,
@@ -46,6 +58,8 @@ public class ClientState {
final Set<TaskId> prevActiveTasks,
final Set<TaskId> prevStandbyTasks,
final Set<TaskId> prevAssignedTasks,
+ final Map<String, Set<TaskId>>
prevActiveTasksByConsumer,
+ final Map<String, Set<TaskId>>
prevStandbyTasksByConsumer,
final int capacity) {
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
@@ -53,6 +67,8 @@ public class ClientState {
this.prevActiveTasks = prevActiveTasks;
this.prevStandbyTasks = prevStandbyTasks;
this.prevAssignedTasks = prevAssignedTasks;
+ this.prevActiveTasksByConsumer = prevActiveTasksByConsumer;
+ this.prevStandbyTasksByConsumer = prevStandbyTasksByConsumer;
this.capacity = capacity;
}
@@ -64,6 +80,8 @@ public class ClientState {
new HashSet<>(prevActiveTasks),
new HashSet<>(prevStandbyTasks),
new HashSet<>(prevAssignedTasks),
+ new HashMap<>(prevActiveTasksByConsumer),
+ new HashMap<>(prevStandbyTasksByConsumer),
capacity);
}
@@ -107,14 +125,24 @@ public class ClientState {
return activeTasks.size();
}
- public void addPreviousActiveTasks(final Set<TaskId> prevTasks) {
+ public void addPreviousActiveTasks(final String consumer, final
Set<TaskId> prevTasks) {
prevActiveTasks.addAll(prevTasks);
prevAssignedTasks.addAll(prevTasks);
+ prevActiveTasksByConsumer.put(consumer, prevTasks);
}
- public void addPreviousStandbyTasks(final Set<TaskId> standbyTasks) {
+ public void addPreviousStandbyTasks(final String consumer, final
Set<TaskId> standbyTasks) {
prevStandbyTasks.addAll(standbyTasks);
prevAssignedTasks.addAll(standbyTasks);
+ prevStandbyTasksByConsumer.put(consumer, standbyTasks);
+ }
+
+ public Set<TaskId> prevActiveTasksForConsumer(final String consumer) {
+ return prevActiveTasksByConsumer.get(consumer);
+ }
+
+ public Set<TaskId> prevStandbyTasksForConsumer(final String consumer) {
+ return prevStandbyTasksByConsumer.get(consumer);
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 140c219..2d960f3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
+import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo.LATEST_SUPPORTED_VERSION;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
@@ -1240,7 +1241,8 @@ public class StreamsPartitionAssignorTest {
)));
assertThat(assignment.get("consumer1").partitions(),
equalTo(asList(t1p0, t1p1)));
-
assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()),
equalTo(new AssignmentInfo()));
+
assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()),
+ equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION,
LATEST_SUPPORTED_VERSION)));
assertThat(assignment.get("future-consumer").partitions().size(),
equalTo(0));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 7d7d4e6..9e5f8bd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -315,6 +315,7 @@ public class TaskManagerTest {
@Test
public void shouldPauseActivePartitions() {
mockSingleActiveTask();
+ expect(consumer.assignment()).andReturn(taskId0Partitions).times(2);
consumer.pause(taskId0Partitions);
expectLastCall();
replay();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
index 034ae7b..dc54c86 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
@@ -70,7 +70,7 @@ public class ClientStateTest {
final TaskId tid1 = new TaskId(0, 1);
final TaskId tid2 = new TaskId(0, 2);
- client.addPreviousActiveTasks(Utils.mkSet(tid1, tid2));
+ client.addPreviousActiveTasks("consumer", Utils.mkSet(tid1, tid2));
assertThat(client.previousActiveTasks(), equalTo(Utils.mkSet(tid1,
tid2)));
assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1,
tid2)));
}
@@ -80,7 +80,7 @@ public class ClientStateTest {
final TaskId tid1 = new TaskId(0, 1);
final TaskId tid2 = new TaskId(0, 2);
- client.addPreviousStandbyTasks(Utils.mkSet(tid1, tid2));
+ client.addPreviousStandbyTasks("consumer", Utils.mkSet(tid1, tid2));
assertThat(client.previousActiveTasks().size(), equalTo(0));
assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1,
tid2)));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 17d403f..19d7730 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -207,11 +207,11 @@ public class StickyTaskAssignorTest {
@Test
public void shouldAssignTasksToClientWithPreviousStandbyTasks() {
final ClientState client1 = createClient(p1, 1);
- client1.addPreviousStandbyTasks(Utils.mkSet(task02));
+ client1.addPreviousStandbyTasks("consumer", Utils.mkSet(task02));
final ClientState client2 = createClient(p2, 1);
- client2.addPreviousStandbyTasks(Utils.mkSet(task01));
+ client2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
final ClientState client3 = createClient(p3, 1);
- client3.addPreviousStandbyTasks(Utils.mkSet(task00));
+ client3.addPreviousStandbyTasks("consumer", Utils.mkSet(task00));
final StickyTaskAssignor taskAssignor = createTaskAssignor(task00,
task01, task02);
@@ -225,9 +225,9 @@ public class StickyTaskAssignorTest {
@Test
public void
shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() {
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1,
task00);
- c1.addPreviousStandbyTasks(Utils.mkSet(task01));
+ c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 2,
task02);
- c2.addPreviousStandbyTasks(Utils.mkSet(task01));
+ c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
final StickyTaskAssignor taskAssignor = createTaskAssignor(task00,
task01, task02);
@@ -455,9 +455,9 @@ public class StickyTaskAssignorTest {
@Test
public void
shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() {
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1,
task01, task02);
- c1.addPreviousStandbyTasks(Utils.mkSet(task03, task00));
+ c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task00));
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1,
task03, task00);
- c2.addPreviousStandbyTasks(Utils.mkSet(task01, task02));
+ c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task02));
createClient(p3, 1);
createClient(p4, 1);
@@ -577,14 +577,14 @@ public class StickyTaskAssignorTest {
final TaskId task23 = new TaskId(2, 3);
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1,
task01, task12, task13);
- c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21,
task23));
+ c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, task11,
task20, task21, task23));
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1,
task00, task11, task22);
- c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20,
task03, task12, task21, task13, task23));
+ c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10,
task02, task20, task03, task12, task21, task13, task23));
final ClientState c3 = createClientWithPreviousActiveTasks(p3, 1,
task20, task21, task23);
- c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12));
+ c3.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, task12));
final ClientState newClient = createClient(p4, 1);
- newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01,
task02, task11, task20, task03, task12, task21, task13, task22, task23));
+ newClient.addPreviousStandbyTasks("consumer", Utils.mkSet(task00,
task10, task01, task02, task11, task20, task03, task12, task21, task13, task22,
task23));
final StickyTaskAssignor<Integer> taskAssignor =
createTaskAssignor(task00, task10, task01, task02, task11, task20, task03,
task12, task21, task13, task22, task23);
taskAssignor.assign(0);
@@ -607,15 +607,15 @@ public class StickyTaskAssignorTest {
final TaskId task23 = new TaskId(2, 3);
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1,
task01, task12, task13);
- c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21,
task23));
+ c1.addPreviousStandbyTasks("c1onsumer", Utils.mkSet(task00, task11,
task20, task21, task23));
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1,
task00, task11, task22);
- c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20,
task03, task12, task21, task13, task23));
+ c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10,
task02, task20, task03, task12, task21, task13, task23));
final ClientState bounce1 = createClient(p3, 1);
- bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23));
+ bounce1.addPreviousStandbyTasks("consumer", Utils.mkSet(task20,
task21, task23));
final ClientState bounce2 = createClient(p4, 1);
- bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10));
+ bounce2.addPreviousStandbyTasks("consumer", Utils.mkSet(task02,
task03, task10));
final StickyTaskAssignor<Integer> taskAssignor =
createTaskAssignor(task00, task10, task01, task02, task11, task20, task03,
task12, task21, task13, task22, task23);
taskAssignor.assign(0);
@@ -658,7 +658,7 @@ public class StickyTaskAssignorTest {
final TaskId task06 = new TaskId(0, 6);
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1,
task00, task01, task02, task06);
final ClientState c2 = createClient(p2, 1);
- c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05));
+ c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task04,
task05));
final ClientState newClient = createClient(p3, 1);
final StickyTaskAssignor<Integer> taskAssignor =
createTaskAssignor(task00, task01, task02, task03, task04, task05, task06);
@@ -705,7 +705,7 @@ public class StickyTaskAssignorTest {
private ClientState createClientWithPreviousActiveTasks(final Integer
processId, final int capacity, final TaskId... taskIds) {
final ClientState clientState = new ClientState(capacity);
- clientState.addPreviousActiveTasks(Utils.mkSet(taskIds));
+ clientState.addPreviousActiveTasks("consumer", Utils.mkSet(taskIds));
clients.put(processId, clientState);
return clientState;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 27bee81..ac56371 100644
---
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -163,6 +163,11 @@ public class StreamsUpgradeTest {
final AssignmentInfo info = AssignmentInfo.decode(
assignment.userData().putInt(0,
AssignmentInfo.LATEST_SUPPORTED_VERSION));
+ if (super.maybeUpdateSubscriptionVersion(usedVersion,
info.commonlySupportedVersion())) {
+ setAssignmentErrorCode(Error.VERSION_PROBING.code());
+ return;
+ }
+
final List<TopicPartition> partitions = new
ArrayList<>(assignment.partitions());
partitions.sort(PARTITION_COMPARATOR);
@@ -292,6 +297,7 @@ public class StreamsUpgradeTest {
private FutureAssignmentInfo(final boolean bumpUsedVersion,
final boolean bumpSupportedVersion,
final ByteBuffer bytes) {
+ super(LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION);
this.bumpUsedVersion = bumpUsedVersion;
this.bumpSupportedVersion = bumpSupportedVersion;
originalUserMetadata = bytes;
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 4f8dc47..7b9a310 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -349,8 +349,6 @@ class StreamsUpgradeTest(Test):
for p in self.processors:
found = list(p.node.account.ssh_capture("grep \"Finished
assignment for group\" %s" % p.LOG_FILE, allow_fail=True))
if len(found) >= self.leader_counter[p] + 1:
- if self.leader is not None:
- raise Exception("Could not uniquely identify leader")
self.leader = p
self.leader_counter[p] = self.leader_counter[p] + 1
@@ -547,39 +545,25 @@ class StreamsUpgradeTest(Test):
else:
self.leader_counter[self.leader] =
self.leader_counter[self.leader] + 1
- if processor == self.leader:
- leader_monitor = log_monitor
- elif first_other_processor == self.leader:
- leader_monitor = first_other_monitor
- elif second_other_processor == self.leader:
- leader_monitor = second_other_monitor
- else:
- raise Exception("Could not identify leader.")
-
monitors = {}
monitors[processor] = log_monitor
monitors[first_other_processor] = first_other_monitor
monitors[second_other_processor] = second_other_monitor
- leader_monitor.wait_until("Received a future (version
probing) subscription (version: 5). Sending empty assignment back (with
supported version 4).",
- timeout_sec=60,
- err_msg="Could not detect
'version probing' attempt at leader " + str(self.leader.node.account))
-
if len(self.old_processors) > 0:
- log_monitor.wait_until("Sent a version 5 subscription
and got version 4 assignment back (successful version probing). Downgrading
subscription metadata to received version and trigger new rebalance.",
+ log_monitor.wait_until("Sent a version 5 subscription
and got version 4 assignment back (successful version probing). Downgrade
subscription metadata to commonly supported version and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect
'successful version probing' at upgrading node " + str(node.account))
else:
- log_monitor.wait_until("Sent a version 5 subscription
and got version 4 assignment back (successful version probing). Setting
subscription metadata to leaders supported version 5 and trigger new
rebalance.",
+ log_monitor.wait_until("Sent a version 5 subscription
and got version 4 assignment back (successful version probing). Downgrade
subscription metadata to commonly supported version and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect
'successful version probing with upgraded leader' at upgrading node " +
str(node.account))
- first_other_monitor.wait_until("Sent a version 4
subscription and group leader.s latest supported version is 5. Upgrading
subscription metadata version to 5 for next rebalance.",
+ first_other_monitor.wait_until("Sent a version 4
subscription and group.s latest commonly supported version is 5 (successful
version probing and end of rolling upgrade). Upgrading subscription metadata
version to 5 for next rebalance.",
timeout_sec=60,
- err_msg="Never saw
output 'Upgrade metadata to version 4' on" + str(first_other_node.account))
- second_other_monitor.wait_until("Sent a version 4
subscription and group leader.s latest supported version is 5. Upgrading
subscription metadata version to 5 for next rebalance.",
+ err_msg="Never saw
output 'Upgrade metadata to version 5' on" + str(first_other_node.account))
+ second_other_monitor.wait_until("Sent a version 4
subscription and group.s latest commonly supported version is 5 (successful
version probing and end of rolling upgrade). Upgrading subscription metadata
version to 5 for next rebalance.",
timeout_sec=60,
- err_msg="Never saw
output 'Upgrade metadata to version 4' on" + str(second_other_node.account))
-
+ err_msg="Never saw
output 'Upgrade metadata to version 5' on" + str(second_other_node.account))
log_monitor.wait_until("Version probing detected.
Triggering new rebalance.",
timeout_sec=60,
err_msg="Could not detect
'Triggering new rebalance' at upgrading node " + str(node.account))