This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new d3bf3cd KAFKA-8649: send latest commonly supported version in assignment (#7426) d3bf3cd is described below commit d3bf3cd3d16945c17d45cf3b2bc4ef0d699cf418 Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Wed Oct 2 16:01:54 2019 -0700 KAFKA-8649: send latest commonly supported version in assignment (#7426) PR 7423 but targeted at 2.2. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .gitignore | 1 + .../streams/processor/internals/StreamThread.java | 7 +- .../internals/StreamsPartitionAssignor.java | 275 +++++++++++++-------- .../streams/processor/internals/TaskManager.java | 11 +- .../internals/assignment/AssignmentInfo.java | 70 +++--- .../internals/assignment/ClientState.java | 36 ++- .../internals/StreamsPartitionAssignorTest.java | 4 +- .../processor/internals/TaskManagerTest.java | 1 + .../internals/assignment/ClientStateTest.java | 4 +- .../assignment/StickyTaskAssignorTest.java | 34 +-- .../kafka/streams/tests/StreamsUpgradeTest.java | 6 + .../tests/streams/streams_upgrade_test.py | 28 +-- 12 files changed, 285 insertions(+), 192 deletions(-) diff --git a/.gitignore b/.gitignore index a31643f..0c6854d 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,4 @@ kafkatest.egg-info/ systest/ *.swp clients/src/generated +clients/src/generated-test \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3083ec7..4c995e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -286,9 +286,10 @@ public class StreamThread extends Thread { ); } else if (streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code()) { log.debug( - "Encountered assignment error during partition assignment: {}. Skipping task initialization", - streamThread.assignmentErrorCode - ); + "Encountered assignment error during partition assignment: {}. Skipping task initialization and " + + "pausing any partitions we may have been assigned.", + streamThread.assignmentErrorCode); + taskManager.pausePartitions(); } else { log.debug("Creating tasks based on assignment."); taskManager.createTasks(assignment); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 89e95b6..3dd0836 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -159,8 +159,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable void addConsumer(final String consumerMemberId, final SubscriptionInfo info) { consumers.add(consumerMemberId); - state.addPreviousActiveTasks(info.prevTasks()); - state.addPreviousStandbyTasks(info.standbyTasks()); + state.addPreviousActiveTasks(consumerMemberId, info.prevTasks()); + state.addPreviousStandbyTasks(consumerMemberId, info.standbyTasks()); state.incrementCapacity(); } @@ -397,6 +397,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable final Set<String> futureConsumers = new HashSet<>(); int minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; + int minSupportedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; supportedVersions.clear(); int futureMetadataVersion = UNKNOWN; @@ -416,6 +417,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable minReceivedMetadataVersion = usedVersion; } + final int latestSupportedVersion = info.latestSupportedVersion(); + if (latestSupportedVersion < minSupportedMetadataVersion) { + minSupportedMetadataVersion = latestSupportedVersion; + } + // create the new client metadata if necessary ClientMetadata clientMetadata = clientsMetadata.get(info.processId()); @@ -659,18 +665,32 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable final Map<String, Assignment> assignment; if (versionProbing) { - assignment = versionProbingAssignment(clientsMetadata, partitionsForTask, partitionsByHostState, futureConsumers, minReceivedMetadataVersion); + assignment = versionProbingAssignment( + clientsMetadata, + partitionsForTask, + partitionsByHostState, + futureConsumers, + minReceivedMetadataVersion, + minSupportedMetadataVersion + ); } else { - assignment = computeNewAssignment(clientsMetadata, partitionsForTask, partitionsByHostState, minReceivedMetadataVersion); + assignment = computeNewAssignment( + clientsMetadata, + partitionsForTask, + partitionsByHostState, + minReceivedMetadataVersion, + minSupportedMetadataVersion + ); } return assignment; } private Map<String, Assignment> computeNewAssignment(final Map<UUID, ClientMetadata> clientsMetadata, - final Map<TaskId, Set<TopicPartition>> partitionsForTask, - final Map<HostInfo, Set<TopicPartition>> partitionsByHostState, - final int minUserMetadataVersion) { + final Map<TaskId, Set<TopicPartition>> partitionsForTask, + final Map<HostInfo, Set<TopicPartition>> partitionsByHostState, + final int minUserMetadataVersion, + final int minSupportedMetadataVersion) { final Map<String, Assignment> assignment = new HashMap<>(); // within the client, distribute tasks to its owned consumers @@ -684,17 +704,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable int consumerTaskIndex = 0; for (final String consumer : consumers) { - final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>(); - final ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>(); + final List<TaskId> activeTasks = interleavedActive.get(consumerTaskIndex); - final List<TaskId> assignedActiveList = interleavedActive.get(consumerTaskIndex); + // These will be filled in by buildAssignedActiveTaskAndPartitionsList below + final List<TopicPartition> activePartitionsList = new ArrayList<>(); + final List<TaskId> assignedActiveList = new ArrayList<>(); - for (final TaskId taskId : assignedActiveList) { - for (final TopicPartition partition : partitionsForTask.get(taskId)) { - assignedPartitions.add(new AssignedPartition(taskId, partition)); - } - } + buildAssignedActiveTaskAndPartitionsList(activeTasks, activePartitionsList, assignedActiveList, partitionsForTask); + final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>(); if (!state.standbyTasks().isEmpty()) { final List<TaskId> assignedStandbyList = interleavedStandby.get(consumerTaskIndex); for (final TaskId taskId : assignedStandbyList) { @@ -704,29 +722,33 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable consumerTaskIndex++; - Collections.sort(assignedPartitions); - final List<TaskId> active = new ArrayList<>(); - final List<TopicPartition> activePartitions = new ArrayList<>(); - for (final AssignedPartition partition : assignedPartitions) { - active.add(partition.taskId); - activePartitions.add(partition.partition); - } - // finally, encode the assignment before sending back to coordinator - assignment.put(consumer, new Assignment( - activePartitions, - new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState, 0).encode())); + assignment.put( + consumer, + new Assignment( + activePartitionsList, + new AssignmentInfo( + minUserMetadataVersion, + minSupportedMetadataVersion, + assignedActiveList, + standby, + partitionsByHostState, + 0 + ).encode() + ) + ); } } return assignment; } - private Map<String, Assignment> versionProbingAssignment(final Map<UUID, ClientMetadata> clientsMetadata, - final Map<TaskId, Set<TopicPartition>> partitionsForTask, - final Map<HostInfo, Set<TopicPartition>> partitionsByHostState, - final Set<String> futureConsumers, - final int minUserMetadataVersion) { + private static Map<String, Assignment> versionProbingAssignment(final Map<UUID, ClientMetadata> clientsMetadata, + final Map<TaskId, Set<TopicPartition>> partitionsForTask, + final Map<HostInfo, Set<TopicPartition>> partitionsByHostState, + final Set<String> futureConsumers, + final int minUserMetadataVersion, + final int minSupportedMetadataVersion) { final Map<String, Assignment> assignment = new HashMap<>(); // assign previously assigned tasks to "old consumers" @@ -737,23 +759,27 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable continue; } - final List<TaskId> activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasks()); + // Return the same active tasks that were claimed in the subscription + final List<TaskId> activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasksForConsumer(consumerId)); - final List<TopicPartition> assignedPartitions = new ArrayList<>(); - for (final TaskId taskId : activeTasks) { - assignedPartitions.addAll(partitionsForTask.get(taskId)); - } + // These will be filled in by buildAssignedActiveTaskAndPartitionsList below + final List<TopicPartition> activePartitionsList = new ArrayList<>(); + final List<TaskId> assignedActiveList = new ArrayList<>(); + + buildAssignedActiveTaskAndPartitionsList(activeTasks, activePartitionsList, assignedActiveList, partitionsForTask); + // Return the same standby tasks that were claimed in the subscription final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); - for (final TaskId taskId : clientMetadata.state.prevStandbyTasks()) { + for (final TaskId taskId : clientMetadata.state.prevStandbyTasksForConsumer(consumerId)) { standbyTasks.put(taskId, partitionsForTask.get(taskId)); } assignment.put(consumerId, new Assignment( - assignedPartitions, + activePartitionsList, new AssignmentInfo( minUserMetadataVersion, - activeTasks, + minSupportedMetadataVersion, + assignedActiveList, standbyTasks, partitionsByHostState, 0) @@ -766,13 +792,34 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable for (final String consumerId : futureConsumers) { assignment.put(consumerId, new Assignment( Collections.emptyList(), - new AssignmentInfo().encode() + new AssignmentInfo(minUserMetadataVersion, minSupportedMetadataVersion).encode() )); } return assignment; } + private static void buildAssignedActiveTaskAndPartitionsList(final List<TaskId> activeTasks, + final List<TopicPartition> activePartitionsList, + final List<TaskId> assignedActiveList, + final Map<TaskId, Set<TopicPartition>> partitionsForTask) { + final List<AssignedPartition> assignedPartitions = new ArrayList<>(); + + // Build up list of all assigned partition-task pairs + for (final TaskId taskId : activeTasks) { + for (final TopicPartition partition : partitionsForTask.get(taskId)) { + assignedPartitions.add(new AssignedPartition(taskId, partition)); + } + } + + // Add one copy of a task for each corresponding partition, so the receiver can determine the task <-> tp mapping + Collections.sort(assignedPartitions); + for (final AssignedPartition partition : assignedPartitions) { + assignedActiveList.add(partition.taskId); + activePartitionsList.add(partition.partition); + } + } + // visible for testing List<List<TaskId>> interleaveTasksByGroupId(final Collection<TaskId> taskIds, final int numberThreads) { final LinkedList<TaskId> sortedTasks = new LinkedList<>(taskIds); @@ -793,6 +840,70 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable return taskIdsForConsumerAssignment; } + private void validateMetadataVersions(final int receivedAssignmentMetadataVersion, + final int latestCommonlySupportedVersion) { + + if (receivedAssignmentMetadataVersion > usedSubscriptionMetadataVersion) { + log.error("Leader sent back an assignment with version {} which was greater than our used version {}", + receivedAssignmentMetadataVersion, usedSubscriptionMetadataVersion); + throw new TaskAssignmentException( + "Sent a version " + usedSubscriptionMetadataVersion + + " subscription but got an assignment with higher version " + + receivedAssignmentMetadataVersion + "." + ); + } + + if (latestCommonlySupportedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) { + log.error("Leader sent back assignment with commonly supported version {} that is greater than our " + + "actual latest supported version {}", latestCommonlySupportedVersion, + SubscriptionInfo.LATEST_SUPPORTED_VERSION); + throw new TaskAssignmentException("Can't upgrade to metadata version greater than we support"); + } + } + + // Returns true if subscription version was changed, indicating version probing and need to rebalance again + protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMetadataVersion, + final int latestCommonlySupportedVersion) { + if (receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) { + + // If the latest commonly supported version is now greater than our used version, this indicates we have just + // completed the rolling upgrade and can now update our subscription version for the final rebalance + if (latestCommonlySupportedVersion > usedSubscriptionMetadataVersion) { + log.info( + "Sent a version {} subscription and group's latest commonly supported version is {} (successful " + + + "version probing and end of rolling upgrade). Upgrading subscription metadata version to " + + "{} for next rebalance.", + usedSubscriptionMetadataVersion, + latestCommonlySupportedVersion, + latestCommonlySupportedVersion + ); + usedSubscriptionMetadataVersion = latestCommonlySupportedVersion; + return true; + } + + // If we received a lower version than we sent, someone else in the group still hasn't upgraded. We + // should downgrade our subscription until everyone is on the latest version + if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion) { + log.info( + "Sent a version {} subscription and got version {} assignment back (successful version probing). " + + + "Downgrade subscription metadata to commonly supported version and trigger new rebalance.", + usedSubscriptionMetadataVersion, + receivedAssignmentMetadataVersion + ); + usedSubscriptionMetadataVersion = latestCommonlySupportedVersion; + return true; + } + } else { + log.debug("Received an assignment version {} that is less than the earliest version that allows version " + + "probing {}. If this is not during a rolling upgrade from version 2.0 or below, this is an error.", + receivedAssignmentMetadataVersion, EARLIEST_PROBEABLE_VERSION); + } + + return false; + } + /** * @throws TaskAssignmentException if there is no task id for one of the partitions specified */ @@ -804,36 +915,27 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); if (info.errCode() != Error.NONE.code) { // set flag to shutdown streams app - assignmentErrorCode.set(info.errCode()); + setAssignmentErrorCode(info.errCode()); return; } + /* + * latestCommonlySupportedVersion belongs to [usedSubscriptionMetadataVersion, LATEST_SUPPORTED_VERSION] + * receivedAssignmentMetadataVersion belongs to [EARLIEST_PROBEABLE_VERSION, usedSubscriptionMetadataVersion] + * + * usedSubscriptionMetadataVersion will be downgraded to receivedAssignmentMetadataVersion during a rolling + * bounce upgrade with version probing. + * + * usedSubscriptionMetadataVersion will be upgraded to latestCommonlySupportedVersion when all members have + * been bounced and it is safe to use the latest version. + */ final int receivedAssignmentMetadataVersion = info.version(); - final int leaderSupportedVersion = info.latestSupportedVersion(); - - if (receivedAssignmentMetadataVersion > usedSubscriptionMetadataVersion) { - throw new IllegalStateException("Sent a version " + usedSubscriptionMetadataVersion - + " subscription but got an assignment with higher version " + receivedAssignmentMetadataVersion + "."); - } + final int latestCommonlySupportedVersion = info.commonlySupportedVersion(); - if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion - && receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) { - - if (receivedAssignmentMetadataVersion == leaderSupportedVersion) { - log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). " + - "Downgrading subscription metadata to received version and trigger new rebalance.", - usedSubscriptionMetadataVersion, - receivedAssignmentMetadataVersion); - usedSubscriptionMetadataVersion = receivedAssignmentMetadataVersion; - } else { - log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). " + - "Setting subscription metadata to leaders supported version {} and trigger new rebalance.", - usedSubscriptionMetadataVersion, - receivedAssignmentMetadataVersion, - leaderSupportedVersion); - usedSubscriptionMetadataVersion = leaderSupportedVersion; - } + validateMetadataVersions(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion); - assignmentErrorCode.set(Error.VERSION_PROBING.code); + // Check if this was a version probing rebalance and check the error code to trigger another rebalance if so + if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) { + setAssignmentErrorCode(Error.VERSION_PROBING.code()); return; } @@ -849,31 +951,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable partitionsByHost = Collections.emptyMap(); break; case VERSION_TWO: - processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); - partitionsByHost = info.partitionsByHost(); - break; case VERSION_THREE: - if (leaderSupportedVersion > usedSubscriptionMetadataVersion) { - log.info("Sent a version {} subscription and group leader's latest supported version is {}. " + - "Upgrading subscription metadata version to {} for next rebalance.", - usedSubscriptionMetadataVersion, - leaderSupportedVersion, - leaderSupportedVersion); - usedSubscriptionMetadataVersion = leaderSupportedVersion; - } - processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); - partitionsByHost = info.partitionsByHost(); - break; case VERSION_FOUR: - if (leaderSupportedVersion > usedSubscriptionMetadataVersion) { - log.info("Sent a version {} subscription and group leader's latest supported version is {}. " + - "Upgrading subscription metadata version to {} for next rebalance.", - usedSubscriptionMetadataVersion, - leaderSupportedVersion, - leaderSupportedVersion); - usedSubscriptionMetadataVersion = leaderSupportedVersion; - } - processVersionFourAssignment(info, partitions, activeTasks, topicToPartitionInfo); + processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); partitionsByHost = info.partitionsByHost(); break; default: @@ -922,26 +1002,12 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } } - private void processVersionThreeAssignment(final AssignmentInfo info, - final List<TopicPartition> partitions, - final Map<TaskId, Set<TopicPartition>> activeTasks, - final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) { - processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); - } - - private void processVersionFourAssignment(final AssignmentInfo info, - final List<TopicPartition> partitions, - final Map<TaskId, Set<TopicPartition>> activeTasks, - final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) { - processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); - } - // for testing protected void processLatestVersionAssignment(final AssignmentInfo info, final List<TopicPartition> partitions, final Map<TaskId, Set<TopicPartition>> activeTasks, final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) { - processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); + processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); } /** @@ -1037,6 +1103,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } } + protected void setAssignmentErrorCode(final Integer errorCode) { + assignmentErrorCode.set(errorCode); + } // following functions are for test only void setInternalTopicManager(final InternalTopicManager internalTopicManager) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index c136fdb..c0123ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -106,9 +106,9 @@ public class TaskManager { addStreamTasks(assignment); addStandbyTasks(); - // Pause all the partitions until the underlying state store is ready for all the active tasks. - log.trace("Pausing partitions: {}", assignment); - consumer.pause(assignment); + + // Pause all the new partitions until the underlying state store is ready for all the active tasks. + pausePartitions(); } private void addStreamTasks(final Collection<TopicPartition> assignment) { @@ -317,6 +317,11 @@ public class TaskManager { this.consumer = consumer; } + void pausePartitions() { + log.trace("Pausing partitions: {}", consumer.assignment()); + consumer.pause(consumer.assignment()); + } + /** * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index 8ad4036..5c7b037 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -45,59 +45,55 @@ public class AssignmentInfo { static final int UNKNOWN = -1; private final int usedVersion; - private final int latestSupportedVersion; + private final int commonlySupportedVersion; private int errCode; private List<TaskId> activeTasks; private Map<TaskId, Set<TopicPartition>> standbyTasks; private Map<HostInfo, Set<TopicPartition>> partitionsByHost; - // used for decoding; don't apply version checks - private AssignmentInfo(final int version, - final int latestSupportedVersion) { - this.usedVersion = version; - this.latestSupportedVersion = latestSupportedVersion; - this.errCode = 0; - } - - public AssignmentInfo(final List<TaskId> activeTasks, - final Map<TaskId, Set<TopicPartition>> standbyTasks, - final Map<HostInfo, Set<TopicPartition>> hostState) { - this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, 0); - } - - public AssignmentInfo() { - this(LATEST_SUPPORTED_VERSION, + // used for decoding and "future consumer" assignments during version probing + public AssignmentInfo(final int version, + final int commonlySupportedVersion) { + this(version, + commonlySupportedVersion, Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), 0); } + public AssignmentInfo(final List<TaskId> activeTasks, + final Map<TaskId, Set<TopicPartition>> standbyTasks, + final Map<HostInfo, Set<TopicPartition>> partitionsByHost) { + this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, 0); + } + public AssignmentInfo(final int version, final List<TaskId> activeTasks, final Map<TaskId, Set<TopicPartition>> standbyTasks, - final Map<HostInfo, Set<TopicPartition>> hostState, + final Map<HostInfo, Set<TopicPartition>> partitionsByHost, final int errCode) { - this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, errCode); - if (version < 1 || version > LATEST_SUPPORTED_VERSION) { - throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION - + "; was: " + version); - } + this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, errCode); } - // for testing only; don't apply version checks - AssignmentInfo(final int version, - final int latestSupportedVersion, - final List<TaskId> activeTasks, - final Map<TaskId, Set<TopicPartition>> standbyTasks, - final Map<HostInfo, Set<TopicPartition>> hostState, - final int errCode) { + + public AssignmentInfo(final int version, + final int commonlySupportedVersion, + final List<TaskId> activeTasks, + final Map<TaskId, Set<TopicPartition>> standbyTasks, + final Map<HostInfo, Set<TopicPartition>> partitionsByHost, + final int errCode) { this.usedVersion = version; - this.latestSupportedVersion = latestSupportedVersion; + this.commonlySupportedVersion = commonlySupportedVersion; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; - this.partitionsByHost = hostState; + this.partitionsByHost = partitionsByHost; this.errCode = errCode; + + if (version < 1 || version > LATEST_SUPPORTED_VERSION) { + throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION + + "; was: " + version); + } } public int version() { @@ -108,8 +104,8 @@ public class AssignmentInfo { return errCode; } - public int latestSupportedVersion() { - return latestSupportedVersion; + public int commonlySupportedVersion() { + return commonlySupportedVersion; } public List<TaskId> activeTasks() { @@ -334,7 +330,7 @@ public class AssignmentInfo { @Override public int hashCode() { - return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() + return usedVersion ^ commonlySupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode() ^ errCode; } @@ -343,7 +339,7 @@ public class AssignmentInfo { if (o instanceof AssignmentInfo) { final AssignmentInfo other = (AssignmentInfo) o; return usedVersion == other.usedVersion && - latestSupportedVersion == other.latestSupportedVersion && + commonlySupportedVersion == other.commonlySupportedVersion && errCode == other.errCode && activeTasks.equals(other.activeTasks) && standbyTasks.equals(other.standbyTasks) && @@ -356,7 +352,7 @@ public class AssignmentInfo { @Override public String toString() { return "[version=" + usedVersion - + ", supported version=" + latestSupportedVersion + + ", supported version=" + commonlySupportedVersion + ", active tasks=" + activeTasks + ", standby tasks=" + standbyTasks + ", global assignment=" + partitionsByHost + "]"; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 6eff7bd..ab213d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import java.util.HashMap; +import java.util.Map; import org.apache.kafka.streams.processor.TaskId; import java.util.HashSet; @@ -29,15 +31,25 @@ public class ClientState { private final Set<TaskId> prevStandbyTasks; private final Set<TaskId> prevAssignedTasks; - private int capacity; + private final Map<String, Set<TaskId>> prevActiveTasksByConsumer; + private final Map<String, Set<TaskId>> prevStandbyTasksByConsumer; + private int capacity; public ClientState() { this(0); } ClientState(final int capacity) { - this(new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), capacity); + this(new HashSet<>(), + new HashSet<>(), + new HashSet<>(), + new HashSet<>(), + new HashSet<>(), + new HashSet<>(), + new HashMap<>(), + new HashMap<>(), + capacity); } private ClientState(final Set<TaskId> activeTasks, @@ -46,6 +58,8 @@ public class ClientState { final Set<TaskId> prevActiveTasks, final Set<TaskId> prevStandbyTasks, final Set<TaskId> prevAssignedTasks, + final Map<String, Set<TaskId>> prevActiveTasksByConsumer, + final Map<String, Set<TaskId>> prevStandbyTasksByConsumer, final int capacity) { this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; @@ -53,6 +67,8 @@ public class ClientState { this.prevActiveTasks = prevActiveTasks; this.prevStandbyTasks = prevStandbyTasks; this.prevAssignedTasks = prevAssignedTasks; + this.prevActiveTasksByConsumer = prevActiveTasksByConsumer; + this.prevStandbyTasksByConsumer = prevStandbyTasksByConsumer; this.capacity = capacity; } @@ -64,6 +80,8 @@ public class ClientState { new HashSet<>(prevActiveTasks), new HashSet<>(prevStandbyTasks), new HashSet<>(prevAssignedTasks), + new HashMap<>(prevActiveTasksByConsumer), + new HashMap<>(prevStandbyTasksByConsumer), capacity); } @@ -107,14 +125,24 @@ public class ClientState { return activeTasks.size(); } - public void addPreviousActiveTasks(final Set<TaskId> prevTasks) { + public void addPreviousActiveTasks(final String consumer, final Set<TaskId> prevTasks) { prevActiveTasks.addAll(prevTasks); prevAssignedTasks.addAll(prevTasks); + prevActiveTasksByConsumer.put(consumer, prevTasks); } - public void addPreviousStandbyTasks(final Set<TaskId> standbyTasks) { + public void addPreviousStandbyTasks(final String consumer, final Set<TaskId> standbyTasks) { prevStandbyTasks.addAll(standbyTasks); prevAssignedTasks.addAll(standbyTasks); + prevStandbyTasksByConsumer.put(consumer, standbyTasks); + } + + public Set<TaskId> prevActiveTasksForConsumer(final String consumer) { + return prevActiveTasksByConsumer.get(consumer); + } + + public Set<TaskId> prevStandbyTasksForConsumer(final String consumer) { + return prevStandbyTasksByConsumer.get(consumer); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 140c219..2d960f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo.LATEST_SUPPORTED_VERSION; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertEquals; @@ -1240,7 +1241,8 @@ public class StreamsPartitionAssignorTest { ))); assertThat(assignment.get("consumer1").partitions(), equalTo(asList(t1p0, t1p1))); - assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), equalTo(new AssignmentInfo())); + assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), + equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION))); assertThat(assignment.get("future-consumer").partitions().size(), equalTo(0)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 7d7d4e6..9e5f8bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -315,6 +315,7 @@ public class TaskManagerTest { @Test public void shouldPauseActivePartitions() { mockSingleActiveTask(); + expect(consumer.assignment()).andReturn(taskId0Partitions).times(2); consumer.pause(taskId0Partitions); expectLastCall(); replay(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java index 034ae7b..dc54c86 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java @@ -70,7 +70,7 @@ public class ClientStateTest { final TaskId tid1 = new TaskId(0, 1); final TaskId tid2 = new TaskId(0, 2); - client.addPreviousActiveTasks(Utils.mkSet(tid1, tid2)); + client.addPreviousActiveTasks("consumer", Utils.mkSet(tid1, tid2)); assertThat(client.previousActiveTasks(), equalTo(Utils.mkSet(tid1, tid2))); assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2))); } @@ -80,7 +80,7 @@ public class ClientStateTest { final TaskId tid1 = new TaskId(0, 1); final TaskId tid2 = new TaskId(0, 2); - client.addPreviousStandbyTasks(Utils.mkSet(tid1, tid2)); + client.addPreviousStandbyTasks("consumer", Utils.mkSet(tid1, tid2)); assertThat(client.previousActiveTasks().size(), equalTo(0)); assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2))); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 17d403f..19d7730 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -207,11 +207,11 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignTasksToClientWithPreviousStandbyTasks() { final ClientState client1 = createClient(p1, 1); - client1.addPreviousStandbyTasks(Utils.mkSet(task02)); + client1.addPreviousStandbyTasks("consumer", Utils.mkSet(task02)); final ClientState client2 = createClient(p2, 1); - client2.addPreviousStandbyTasks(Utils.mkSet(task01)); + client2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01)); final ClientState client3 = createClient(p3, 1); - client3.addPreviousStandbyTasks(Utils.mkSet(task00)); + client3.addPreviousStandbyTasks("consumer", Utils.mkSet(task00)); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02); @@ -225,9 +225,9 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() { final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00); - c1.addPreviousStandbyTasks(Utils.mkSet(task01)); + c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task01)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 2, task02); - c2.addPreviousStandbyTasks(Utils.mkSet(task01)); + c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01)); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02); @@ -455,9 +455,9 @@ public class StickyTaskAssignorTest { @Test public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() { final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task02); - c1.addPreviousStandbyTasks(Utils.mkSet(task03, task00)); + c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task00)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task00); - c2.addPreviousStandbyTasks(Utils.mkSet(task01, task02)); + c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task02)); createClient(p3, 1); createClient(p4, 1); @@ -577,14 +577,14 @@ public class StickyTaskAssignorTest { final TaskId task23 = new TaskId(2, 3); final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13); - c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23)); + c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, task11, task20, task21, task23)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22); - c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); + c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); final ClientState c3 = createClientWithPreviousActiveTasks(p3, 1, task20, task21, task23); - c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12)); + c3.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, task12)); final ClientState newClient = createClient(p4, 1); - newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23)); + newClient.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23)); final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23); taskAssignor.assign(0); @@ -607,15 +607,15 @@ public class StickyTaskAssignorTest { final TaskId task23 = new TaskId(2, 3); final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13); - c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23)); + c1.addPreviousStandbyTasks("c1onsumer", Utils.mkSet(task00, task11, task20, task21, task23)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22); - c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); + c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); final ClientState bounce1 = createClient(p3, 1); - bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23)); + bounce1.addPreviousStandbyTasks("consumer", Utils.mkSet(task20, task21, task23)); final ClientState bounce2 = createClient(p4, 1); - bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10)); + bounce2.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, task03, task10)); final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23); taskAssignor.assign(0); @@ -658,7 +658,7 @@ public class StickyTaskAssignorTest { final TaskId task06 = new TaskId(0, 6); final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, task06); final ClientState c2 = createClient(p2, 1); - c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05)); + c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task04, task05)); final ClientState newClient = createClient(p3, 1); final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task01, task02, task03, task04, task05, task06); @@ -705,7 +705,7 @@ public class StickyTaskAssignorTest { private ClientState createClientWithPreviousActiveTasks(final Integer processId, final int capacity, final TaskId... taskIds) { final ClientState clientState = new ClientState(capacity); - clientState.addPreviousActiveTasks(Utils.mkSet(taskIds)); + clientState.addPreviousActiveTasks("consumer", Utils.mkSet(taskIds)); clients.put(processId, clientState); return clientState; } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 27bee81..ac56371 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -163,6 +163,11 @@ public class StreamsUpgradeTest { final AssignmentInfo info = AssignmentInfo.decode( assignment.userData().putInt(0, AssignmentInfo.LATEST_SUPPORTED_VERSION)); + if (super.maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion())) { + setAssignmentErrorCode(Error.VERSION_PROBING.code()); + return; + } + final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions()); partitions.sort(PARTITION_COMPARATOR); @@ -292,6 +297,7 @@ public class StreamsUpgradeTest { private FutureAssignmentInfo(final boolean bumpUsedVersion, final boolean bumpSupportedVersion, final ByteBuffer bytes) { + super(LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION); this.bumpUsedVersion = bumpUsedVersion; this.bumpSupportedVersion = bumpSupportedVersion; originalUserMetadata = bytes; diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 4f8dc47..7b9a310 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -349,8 +349,6 @@ class StreamsUpgradeTest(Test): for p in self.processors: found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True)) if len(found) >= self.leader_counter[p] + 1: - if self.leader is not None: - raise Exception("Could not uniquely identify leader") self.leader = p self.leader_counter[p] = self.leader_counter[p] + 1 @@ -547,39 +545,25 @@ class StreamsUpgradeTest(Test): else: self.leader_counter[self.leader] = self.leader_counter[self.leader] + 1 - if processor == self.leader: - leader_monitor = log_monitor - elif first_other_processor == self.leader: - leader_monitor = first_other_monitor - elif second_other_processor == self.leader: - leader_monitor = second_other_monitor - else: - raise Exception("Could not identify leader.") - monitors = {} monitors[processor] = log_monitor monitors[first_other_processor] = first_other_monitor monitors[second_other_processor] = second_other_monitor - leader_monitor.wait_until("Received a future (version probing) subscription (version: 5). Sending empty assignment back (with supported version 4).", - timeout_sec=60, - err_msg="Could not detect 'version probing' attempt at leader " + str(self.leader.node.account)) - if len(self.old_processors) > 0: - log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.", + log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) else: - log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Setting subscription metadata to leaders supported version 5 and trigger new rebalance.", + log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account)) - first_other_monitor.wait_until("Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.", + first_other_monitor.wait_until("Sent a version 4 subscription and group.s latest commonly supported version is 5 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 5 for next rebalance.", timeout_sec=60, - err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(first_other_node.account)) - second_other_monitor.wait_until("Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.", + err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(first_other_node.account)) + second_other_monitor.wait_until("Sent a version 4 subscription and group.s latest commonly supported version is 5 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 5 for next rebalance.", timeout_sec=60, - err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(second_other_node.account)) - + err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(second_other_node.account)) log_monitor.wait_until("Version probing detected. Triggering new rebalance.", timeout_sec=60, err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))