This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push: new 133c33f KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386) 133c33f is described below commit 133c33fde1c4d3b79196b72522f83a75cb6b0e65 Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Mon Oct 7 09:27:09 2019 -0700 KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386) Key improvements with this PR: * tasks will remain available for IQ during a rebalance (but not during restore) * continue restoring and processing standby tasks during a rebalance * continue processing active tasks during rebalance until the RecordQueue is empty* * only revoked tasks must suspended/closed * StreamsPartitionAssignor tries to return tasks to their previous consumers within a client * but do not try to commit, for now (pending KAFKA-7312) Reviewers: John Roesler <j...@confluent.io>, Boyang Chen <boy...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- checkstyle/suppressions.xml | 3 + .../consumer/internals/ConsumerCoordinator.java | 20 +- .../consumer/internals/SubscriptionState.java | 15 +- .../clients/consumer/internals/FetcherTest.java | 31 +- .../processor/internals/AssignedStreamsTasks.java | 15 +- .../streams/processor/internals/StandbyTask.java | 19 - .../processor/internals/StoreChangelogReader.java | 16 +- .../streams/processor/internals/StreamTask.java | 8 +- .../streams/processor/internals/StreamThread.java | 27 +- .../internals/StreamsPartitionAssignor.java | 612 +++++++++++++++------ .../kafka/streams/processor/internals/Task.java | 8 +- .../streams/processor/internals/TaskManager.java | 34 +- .../assignment/AssignorConfiguration.java | 12 +- .../internals/assignment/ClientState.java | 56 +- .../internals/assignment/StickyTaskAssignor.java | 8 +- .../processor/internals/AbstractTaskTest.java | 6 - .../processor/internals/StandbyTaskTest.java | 23 - .../internals/StreamsPartitionAssignorTest.java | 481 ++++++++++++++-- .../processor/internals/TaskManagerTest.java | 28 +- .../internals/assignment/ClientStateTest.java | 8 +- .../assignment/StickyTaskAssignorTest.java | 34 +- .../kafka/streams/tests/SmokeTestDriver.java | 2 +- .../kafka/streams/tests/StreamsUpgradeTest.java | 26 +- tests/kafkatest/tests/streams/streams_eos_test.py | 2 +- 24 files changed, 1058 insertions(+), 436 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 8927849..2f21309 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -197,6 +197,9 @@ <suppress checks="MethodLength" files="RocksDBWindowStoreTest.java"/> + <suppress checks="MemberName" + files="StreamsPartitionAssignorTest.java"/> + <suppress checks="ClassDataAbstractionCoupling" files=".*[/\\]streams[/\\].*test[/\\].*.java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index b5b5ce2..6b39acb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -355,26 +355,29 @@ public final class ConsumerCoordinator extends AbstractCoordinator { Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions); addedPartitions.removeAll(ownedPartitions); - // Invoke user's revocation callback before changing assignment or updating state if (protocol == RebalanceProtocol.COOPERATIVE) { Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions); revokedPartitions.removeAll(assignedPartitions); - log.info("Updating with newly assigned partitions: {}, compare with already owned partitions: {}, " + - "newly added partitions: {}, revoking partitions: {}", + log.info("Updating assignment with\n" + + "now assigned partitions: {}\n" + + "compare with previously owned partitions: {}\n" + + "newly added partitions: {}\n" + + "revoked partitions: {}\n", Utils.join(assignedPartitions, ", "), Utils.join(ownedPartitions, ", "), Utils.join(addedPartitions, ", "), - Utils.join(revokedPartitions, ", ")); - + Utils.join(revokedPartitions, ", ") + ); if (!revokedPartitions.isEmpty()) { - // revoke partitions that was previously owned but no longer assigned; - // note that we should only change the assignment AFTER we've triggered - // the revoke callback + // revoke partitions that were previously owned but no longer assigned; + // note that we should only change the assignment (or update the assignor's state) + // AFTER we've triggered the revoke callback firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions)); // if revoked any partitions, need to re-join the group afterwards + log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions); requestRejoin(); } } @@ -679,7 +682,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } - isLeader = false; subscriptions.resetGroupSubscription(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 4641e5c..953505f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -270,8 +270,14 @@ public class SubscriptionState { if (!this.partitionsAutoAssigned()) throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use"); + Map<TopicPartition, TopicPartitionState> assignedPartitionStates = new HashMap<>(assignments.size()); + for (TopicPartition tp : assignments) { + TopicPartitionState state = this.assignment.stateValue(tp); + if (state == null) + state = new TopicPartitionState(); + assignedPartitionStates.put(tp, state); + } - Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap(assignments); assignmentId++; this.assignment.set(assignedPartitionStates); } @@ -674,13 +680,6 @@ public class SubscriptionState { return rebalanceListener; } - private static Map<TopicPartition, TopicPartitionState> partitionToStateMap(Collection<TopicPartition> assignments) { - Map<TopicPartition, TopicPartitionState> map = new HashMap<>(assignments.size()); - for (TopicPartition tp : assignments) - map.put(tp, new TopicPartitionState()); - return map; - } - private static class TopicPartitionState { private FetchState fetchState; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index ff0afe9..9e281a7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -848,7 +848,7 @@ public class FetcherTest { } @Test - public void testFetchDuringRebalance() { + public void testFetchDuringEagerRebalance() { buildFetcher(); subscriptions.subscribe(singleton(topicName), listener); @@ -859,7 +859,9 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); - // Now the rebalance happens and fetch positions are cleared + // Now the eager rebalance happens and fetch positions are cleared + subscriptions.assignFromSubscribed(Collections.emptyList()); + subscriptions.assignFromSubscribed(singleton(tp0)); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); @@ -869,6 +871,31 @@ public class FetcherTest { } @Test + public void testFetchDuringCooperativeRebalance() { + buildFetcher(); + + subscriptions.subscribe(singleton(topicName), listener); + subscriptions.assignFromSubscribed(singleton(tp0)); + subscriptions.seek(tp0, 0); + + client.updateMetadata(initialUpdateResponse); + + assertEquals(1, fetcher.sendFetches()); + + // Now the cooperative rebalance happens and fetch positions are NOT cleared for unrevoked partitions + subscriptions.assignFromSubscribed(singleton(tp0)); + + client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); + consumerClient.poll(time.timer(0)); + + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchedRecords(); + + // The active fetch should NOT be ignored since the position for tp0 is still valid + assertEquals(1, fetchedRecords.size()); + assertEquals(3, fetchedRecords.get(tp0).size()); + } + + @Test public void testInFlightFetchOnPausedPartition() { buildFetcher(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java index da0fc20..65c4c95 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -74,11 +74,15 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @Override void closeTask(final StreamTask task, final boolean clean) { if (suspended.containsKey(task.id())) { - task.closeSuspended(clean, false, null); + task.closeSuspended(clean, null); } else { task.close(clean, false); } } + + boolean hasRestoringTasks() { + return !restoring.isEmpty(); + } Set<TaskId> suspendedTaskIds() { return suspended.keySet(); @@ -107,7 +111,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin } else if (restoring.containsKey(task)) { revokedRestoringTasks.add(task); } else if (!suspended.containsKey(task)) { - log.warn("Task {} was revoked but cannot be found in the assignment", task); + log.warn("Task {} was revoked but cannot be found in the assignment, may have been closed due to error", task); } } @@ -131,7 +135,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin task.suspend(); suspended.put(id, task); } catch (final TaskMigratedException closeAsZombieAndSwallow) { - // as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on + // swallow and move on since we are rebalancing log.info("Failed to suspend {} {} since it got migrated to another thread already. " + "Closing it as zombie and move on.", taskTypeName, id); firstException.compareAndSet(null, closeZombieTask(task)); @@ -248,7 +252,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin try { final boolean clean = !isZombie; - task.closeSuspended(clean, isZombie, null); + task.closeSuspended(clean, null); } catch (final RuntimeException e) { log.error("Failed to close suspended {} {} due to the following error:", taskTypeName, task.id(), e); return e; @@ -264,7 +268,6 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin for (final TaskId revokedTask : revokedTasks) { final StreamTask suspendedTask = suspended.get(revokedTask); - // task may not be in the suspended tasks if it was closed due to some error if (suspendedTask != null) { firstException.compareAndSet(null, closeSuspended(false, suspendedTask)); } else { @@ -335,7 +338,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin return true; } else { log.warn("Couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions()); - task.closeSuspended(true, false, null); + task.closeSuspended(true, null); } } return false; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index f10c25b..fbc116a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -120,18 +120,6 @@ public class StandbyTask extends AbstractTask { commitNeeded = false; } - /** - * <pre> - * - flush store - * - checkpoint store - * </pre> - */ - @Override - public void suspend() { - log.debug("Suspending"); - flushAndCheckpointState(); - } - private void flushAndCheckpointState() { stateMgr.flush(); stateMgr.checkpoint(Collections.emptyMap()); @@ -163,13 +151,6 @@ public class StandbyTask extends AbstractTask { taskClosed = true; } - @Override - public void closeSuspended(final boolean clean, - final boolean isZombie, - final RuntimeException e) { - close(clean, isZombie); - } - /** * Updates a state store using records from one change log partition * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 6c6a1c4..55a33c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -79,8 +79,7 @@ public class StoreChangelogReader implements ChangelogReader { initialize(active); } - if (needsRestoring.isEmpty() || restoreConsumer.assignment().isEmpty()) { - restoreConsumer.unsubscribe(); + if (checkForCompletedRestoration()) { return completedRestorers; } @@ -116,9 +115,7 @@ public class StoreChangelogReader implements ChangelogReader { needsRestoring.removeAll(completedRestorers); - if (needsRestoring.isEmpty()) { - restoreConsumer.unsubscribe(); - } + checkForCompletedRestoration(); return completedRestorers; } @@ -337,7 +334,14 @@ public class StoreChangelogReader implements ChangelogReader { return nextPosition; } - + private boolean checkForCompletedRestoration() { + if (needsRestoring.isEmpty()) { + log.info("Finished restoring all active tasks"); + restoreConsumer.unsubscribe(); + return true; + } + return false; + } private boolean hasPartition(final TopicPartition topicPartition) { final List<PartitionInfo> partitions = partitionInfo.get(topicPartition.topic()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 40466b3..780fe1f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -573,7 +573,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) */ - @Override public void suspend() { log.debug("Suspending"); suspend(true, false); @@ -687,10 +686,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } // helper to avoid calling suspend() twice if a suspended task is not reassigned and closed - @Override - public void closeSuspended(final boolean clean, - final boolean isZombie, - RuntimeException firstException) { + void closeSuspended(final boolean clean, RuntimeException firstException) { try { closeStateManager(clean); } catch (final RuntimeException e) { @@ -742,7 +738,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator log.error("Could not close task due to the following error:", e); } - closeSuspended(clean, isZombie, firstException); + closeSuspended(clean, firstException); taskClosed = true; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 29e1bc7..c71ff27 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -132,13 +132,13 @@ public class StreamThread extends Thread { */ public enum State implements ThreadStateTransitionValidator { - CREATED(1, 5), // 0 - STARTING(2, 3, 5), // 1 - PARTITIONS_REVOKED(3, 5), // 2 - PARTITIONS_ASSIGNED(2, 3, 4, 5), // 3 - RUNNING(2, 3, 5), // 4 - PENDING_SHUTDOWN(6), // 5 - DEAD; // 6 + CREATED(1, 5), // 0 + STARTING(2, 3, 5), // 1 + PARTITIONS_REVOKED(2, 3, 5), // 2 + PARTITIONS_ASSIGNED(2, 3, 4, 5), // 3 + RUNNING(2, 3, 5), // 4 + PENDING_SHUTDOWN(6), // 5 + DEAD; // 6 private final Set<Integer> validTransitions = new HashSet<>(); @@ -734,9 +734,9 @@ public class StreamThread extends Thread { // to unblock the restoration as soon as possible records = pollRequests(Duration.ZERO); } else if (state == State.PARTITIONS_REVOKED) { - // try to fetch some records with normal poll time - // in order to wait long enough to get the join response - records = pollRequests(pollTime); + // try to fetch som records with zero poll millis to unblock + // other useful work while waiting for the join response + records = pollRequests(Duration.ZERO); } else if (state == State.RUNNING || state == State.STARTING) { // try to fetch some records with normal poll time // in order to get long polling @@ -970,7 +970,12 @@ public class StreamThread extends Thread { } } - lastCommitMs = now; + if (committed == -1) { + log.trace("Unable to commit as we are in the middle of a rebalance, will try again when it completes."); + } else { + lastCommitMs = now; + } + processStandbyRecords = true; } else { committed = taskManager.maybeCommitActiveTasksPerUserRequested(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 2e6c9c0..8b2c95a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -47,15 +47,18 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.TreeMap; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static java.util.UUID.randomUUID; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION; @@ -63,20 +66,21 @@ import static org.apache.kafka.streams.processor.internals.assignment.StreamsAss import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_FIVE; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_FOUR; -import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_ONE; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_THREE; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_TWO; +import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_ONE; public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable { + private Logger log; private String logPrefix; private static class AssignedPartition implements Comparable<AssignedPartition> { + private final TaskId taskId; private final TopicPartition partition; - AssignedPartition(final TaskId taskId, - final TopicPartition partition) { + AssignedPartition(final TaskId taskId, final TopicPartition partition) { this.taskId = taskId; this.partition = partition; } @@ -103,6 +107,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf } private static class ClientMetadata { + private final HostInfo hostInfo; private final Set<String> consumers; private final ClientState state; @@ -132,12 +137,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf state = new ClientState(); } - void addConsumer(final String consumerMemberId, - final SubscriptionInfo info) { + void addConsumer(final String consumerMemberId, final List<TopicPartition> ownedPartitions) { consumers.add(consumerMemberId); - state.addPreviousActiveTasks(consumerMemberId, info.prevTasks()); - state.addPreviousStandbyTasks(consumerMemberId, info.standbyTasks()); state.incrementCapacity(); + state.addOwnedPartitions(ownedPartitions, consumerMemberId); + } + + void addPreviousTasks(final SubscriptionInfo info) { + state.addPreviousActiveTasks(info.prevTasks()); + state.addPreviousStandbyTasks(info.standbyTasks()); } @Override @@ -177,9 +185,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf } /** - * We need to have the PartitionAssignor and its StreamThread to be mutually accessible - * since the former needs later's cached metadata while sending subscriptions, - * and the latter needs former's returned assignment when adding tasks. + * We need to have the PartitionAssignor and its StreamThread to be mutually accessible since the former needs + * later's cached metadata while sending subscriptions, and the latter needs former's returned assignment when + * adding tasks. * * @throws KafkaException if the stream thread is not specified */ @@ -189,7 +197,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf logPrefix = assignorConfiguration.logPrefix(); log = new LogContext(logPrefix).logger(getClass()); - usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion); + usedSubscriptionMetadataVersion = assignorConfiguration + .configuredMetadataVersion(usedSubscriptionMetadataVersion); taskManager = assignorConfiguration.getTaskManager(); assignmentErrorCode = assignorConfiguration.getAssignmentErrorCode(configs); numStandbyReplicas = assignorConfiguration.getNumStandbyReplicas(); @@ -221,37 +230,64 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) // 2. Task ids of previously running tasks // 3. Task ids of valid local states on the client's state directory. - - final Set<TaskId> previousActiveTasks = taskManager.previousRunningTaskIds(); final Set<TaskId> standbyTasks = taskManager.cachedTasksIds(); - standbyTasks.removeAll(previousActiveTasks); - final SubscriptionInfo data = new SubscriptionInfo( + final Set<TaskId> activeTasks = prepareForSubscription(taskManager, + topics, + standbyTasks, + rebalanceProtocol); + return new SubscriptionInfo( usedSubscriptionMetadataVersion, taskManager.processId(), - previousActiveTasks, + activeTasks, standbyTasks, - userEndPoint); + userEndPoint) + .encode(); + } + + protected static Set<TaskId> prepareForSubscription(final TaskManager taskManager, + final Set<String> topics, + final Set<TaskId> standbyTasks, + final RebalanceProtocol rebalanceProtocol) { + // Any tasks that are not yet running are counted as standby tasks for assignment purposes, + // along with any old tasks for which we still found state on disk + final Set<TaskId> activeTasks; + + switch (rebalanceProtocol) { + case EAGER: + // In eager, onPartitionsRevoked is called first and we must get the previously saved running task ids + activeTasks = taskManager.previousRunningTaskIds(); + standbyTasks.removeAll(activeTasks); + break; + case COOPERATIVE: + // In cooperative, we will use the encoded ownedPartitions to determine the running tasks + activeTasks = Collections.emptySet(); + standbyTasks.removeAll(taskManager.activeTaskIds()); + break; + default: + throw new IllegalStateException("Streams partition assignor's rebalance protocol is unknown"); + } taskManager.updateSubscriptionsFromMetadata(topics); + taskManager.setRebalanceInProgress(true); - return data.encode(); + return activeTasks; } private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata, final String topic, final int errorCode) { log.error("{} is unknown yet during rebalance," + - " please make sure they have been pre-created before starting the Streams application.", topic); + " please make sure they have been pre-created before starting the Streams application.", topic); final Map<String, Assignment> assignment = new HashMap<>(); for (final ClientMetadata clientMetadata : clientsMetadata.values()) { for (final String consumerId : clientMetadata.consumers) { assignment.put(consumerId, new Assignment( Collections.emptyList(), new AssignmentInfo(LATEST_SUPPORTED_VERSION, - Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap(), - errorCode).encode() + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + errorCode).encode() )); } } @@ -283,7 +319,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf final Map<String, Subscription> subscriptions = groupSubscription.groupSubscription(); // construct the client metadata from the decoded subscription info final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>(); - final Set<String> futureConsumers = new HashSet<>(); + final Set<TopicPartition> allOwnedPartitions = new HashSet<>(); + + // keep track of any future consumers in a "dummy" Client since we can't decipher their subscription + final UUID futureId = randomUUID(); + final ClientMetadata futureClient = new ClientMetadata(null); + clientMetadataMap.put(futureId, futureClient); int minReceivedMetadataVersion = LATEST_SUPPORTED_VERSION; int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION; @@ -292,58 +333,59 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) { final String consumerId = entry.getKey(); final Subscription subscription = entry.getValue(); - final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); final int usedVersion = info.version(); + + minReceivedMetadataVersion = updateMinReceivedVersion(usedVersion, minReceivedMetadataVersion); + minSupportedMetadataVersion = updateMinSupportedVersion(info.latestSupportedVersion(), minSupportedMetadataVersion); + + final UUID processId; if (usedVersion > LATEST_SUPPORTED_VERSION) { futureMetadataVersion = usedVersion; - futureConsumers.add(consumerId); - continue; - } - if (usedVersion < minReceivedMetadataVersion) { - minReceivedMetadataVersion = usedVersion; + processId = futureId; + } else { + processId = info.processId(); } - final int latestSupportedVersion = info.latestSupportedVersion(); - if (latestSupportedVersion < minSupportedMetadataVersion) { - minSupportedMetadataVersion = latestSupportedVersion; - } + ClientMetadata clientMetadata = clientMetadataMap.get(processId); // create the new client metadata if necessary - ClientMetadata clientMetadata = clientMetadataMap.get(info.processId()); - if (clientMetadata == null) { clientMetadata = new ClientMetadata(info.userEndPoint()); clientMetadataMap.put(info.processId(), clientMetadata); } - // add the consumer to the client - clientMetadata.addConsumer(consumerId, info); + // add the consumer and any info its its subscription to the client + clientMetadata.addConsumer(consumerId, subscription.ownedPartitions()); + allOwnedPartitions.addAll(subscription.ownedPartitions()); + if (info.prevTasks() != null && info.standbyTasks() != null) { + clientMetadata.addPreviousTasks(info); + } } final boolean versionProbing; if (futureMetadataVersion == UNKNOWN) { versionProbing = false; + clientMetadataMap.remove(futureId); + } else if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) { + versionProbing = true; + log.info("Received a future (version probing) subscription (version: {})." + + " Sending assignment back (with supported version {}).", + futureMetadataVersion, + minSupportedMetadataVersion); + } else { - if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) { - log.info("Received a future (version probing) subscription (version: {})." - + " Sending empty assignment back (with supported version {}).", - futureMetadataVersion, - LATEST_SUPPORTED_VERSION); - versionProbing = true; - } else { - throw new IllegalStateException( - "Received a future (version probing) subscription (version: " + futureMetadataVersion - + ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion - + ") at the same time." - ); - } + throw new IllegalStateException( + "Received a future (version probing) subscription (version: " + futureMetadataVersion + + ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion + + ") at the same time." + ); } if (minReceivedMetadataVersion < LATEST_SUPPORTED_VERSION) { log.info("Downgrading metadata to version {}. Latest supported version is {}.", - minReceivedMetadataVersion, - LATEST_SUPPORTED_VERSION); + minReceivedMetadataVersion, + LATEST_SUPPORTED_VERSION); } log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap); @@ -361,9 +403,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) && !metadata.topics().contains(topic)) { log.error("Missing source topic {} during assignment. Returning error {}.", - topic, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name()); + topic, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name()); return new GroupAssignment( - errorAssignment(clientMetadataMap, topic, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) + errorAssignment(clientMetadataMap, topic, + AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) ); } } @@ -378,7 +421,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) { - final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName).numberOfPartitions(); + final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName) + .numberOfPartitions(); Integer numPartitions = null; if (!maybeNumPartitions.isPresent()) { @@ -395,7 +439,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf // map().join().join(map()) if (repartitionTopicMetadata.containsKey(sourceTopicName)) { if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) { - numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get(); + numPartitionsCandidate = + repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get(); } } else { final Integer count = metadata.partitionCountForTopic(sourceTopicName); @@ -427,7 +472,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf } } while (numPartitionsNeeded); - // ensure the co-partitioning topics within the group have the same number of partitions, // and enforce the number of partitions for those repartition topics to be the same if they // are co-partitioned as well. @@ -470,19 +514,23 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf final Map<TaskId, Set<TopicPartition>> partitionsForTask = partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata); + final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>(); + // check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks final Set<TopicPartition> allAssignedPartitions = new HashSet<>(); final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>(); for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) { + final TaskId id = entry.getKey(); final Set<TopicPartition> partitions = entry.getValue(); + for (final TopicPartition partition : partitions) { + taskForPartition.put(partition, id); if (allAssignedPartitions.contains(partition)) { log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask); } } allAssignedPartitions.addAll(partitions); - final TaskId id = entry.getKey(); tasksByTopicGroup.computeIfAbsent(id.topicGroupId, k -> new HashSet<>()).add(id); } for (final String topic : allSourceTopics) { @@ -491,13 +539,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf log.warn("No partitions found for topic {}", topic); } else { for (final PartitionInfo partitionInfo : partitionInfoList) { - final TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); + final TopicPartition partition = new TopicPartition(partitionInfo.topic(), + partitionInfo.partition()); if (!allAssignedPartitions.contains(partition)) { log.warn("Partition {} is not assigned to any tasks: {}" - + " Possible causes of a partition not getting assigned" - + " is that another topic defined in the topology has not been" - + " created when starting your streams application," - + " resulting in no tasks created for this topology at all.", partition, partitionsForTask); + + " Possible causes of a partition not getting assigned" + + " is that another topic defined in the topology has not been" + + " created when starting your streams application," + + " resulting in no tasks created for this topology at all.", partition, + partitionsForTask); } } } @@ -533,15 +583,32 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf // ---------------- Step Two ---------------- // - // assign tasks to clients final Map<UUID, ClientState> states = new HashMap<>(); for (final Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet()) { - states.put(entry.getKey(), entry.getValue().state); + final ClientState state = entry.getValue().state; + states.put(entry.getKey(), state); + + // Either the active tasks (eager) OR the owned partitions (cooperative) were encoded in the subscription + // according to the rebalancing protocol, so convert any partitions in a client to tasks where necessary + if (!state.ownedPartitions().isEmpty()) { + final Set<TaskId> previousActiveTasks = new HashSet<>(); + for (final Map.Entry<TopicPartition, String> partitionEntry : state.ownedPartitions().entrySet()) { + final TopicPartition tp = partitionEntry.getKey(); + final TaskId task = taskForPartition.get(tp); + if (task != null) { + previousActiveTasks.add(task); + } else { + log.error("No task found for topic partition {}", tp); + } + } + state.addPreviousActiveTasks(previousActiveTasks); + } } log.debug("Assigning tasks {} to clients {} with number of replicas {}", - partitionsForTask.keySet(), states, numStandbyReplicas); + partitionsForTask.keySet(), states, numStandbyReplicas); + // assign tasks to clients final StickyTaskAssignor<UUID> taskAssignor = new StickyTaskAssignor<>(states, partitionsForTask.keySet()); taskAssignor.assign(numStandbyReplicas); @@ -577,7 +644,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf clientMetadataMap, partitionsForTask, partitionsByHostState, - futureConsumers, + allOwnedPartitions, minReceivedMetadataVersion, minSupportedMetadataVersion ); @@ -586,6 +653,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf clientMetadataMap, partitionsForTask, partitionsByHostState, + allOwnedPartitions, minReceivedMetadataVersion, minSupportedMetadataVersion ); @@ -594,132 +662,165 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf return new GroupAssignment(assignment); } - private static Map<String, Assignment> computeNewAssignment(final Map<UUID, ClientMetadata> clientsMetadata, - final Map<TaskId, Set<TopicPartition>> partitionsForTask, - final Map<HostInfo, Set<TopicPartition>> partitionsByHostState, - final int minUserMetadataVersion, - final int minSupportedMetadataVersion) { + private Map<String, Assignment> computeNewAssignment(final Map<UUID, ClientMetadata> clientsMetadata, + final Map<TaskId, Set<TopicPartition>> partitionsForTask, + final Map<HostInfo, Set<TopicPartition>> partitionsByHostState, + final Set<TopicPartition> allOwnedPartitions, + final int minUserMetadataVersion, + final int minSupportedMetadataVersion) { + // keep track of whether a 2nd rebalance is unavoidable so we can skip trying to get a completely sticky assignment + boolean rebalanceRequired = false; final Map<String, Assignment> assignment = new HashMap<>(); // within the client, distribute tasks to its owned consumers - for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) { - final Set<String> consumers = entry.getValue().consumers; - final ClientState state = entry.getValue().state; - - final List<List<TaskId>> interleavedActive = - interleaveTasksByGroupId(state.activeTasks(), consumers.size()); - final List<List<TaskId>> interleavedStandby = - interleaveTasksByGroupId(state.standbyTasks(), consumers.size()); - - int consumerTaskIndex = 0; - - for (final String consumer : consumers) { - final List<TaskId> activeTasks = interleavedActive.get(consumerTaskIndex); - - // These will be filled in by buildAssignedActiveTaskAndPartitionsList below - final List<TopicPartition> activePartitionsList = new ArrayList<>(); - final List<TaskId> assignedActiveList = new ArrayList<>(); - - buildAssignedActiveTaskAndPartitionsList(activeTasks, activePartitionsList, assignedActiveList, partitionsForTask); + for (final ClientMetadata clientMetadata : clientsMetadata.values()) { + final ClientState state = clientMetadata.state; + final Set<String> consumers = clientMetadata.consumers; + Map<String, List<TaskId>> activeTaskAssignments; + + // Try to avoid triggering another rebalance by giving active tasks back to their previous owners within a + // client, without violating load balance. If we already know another rebalance will be required, or the + // client had no owned partitions, try to balance the workload as evenly as possible by interleaving the + // tasks among consumers and hopefully spreading the heavier subtopologies evenly across threads. + if (rebalanceRequired || state.ownedPartitions().isEmpty()) { + activeTaskAssignments = interleaveConsumerTasksByGroupId(state.activeTasks(), consumers); + } else if ((activeTaskAssignments = tryStickyAndBalancedTaskAssignmentWithinClient(state, consumers, partitionsForTask, allOwnedPartitions)) + .equals(Collections.emptyMap())) { + rebalanceRequired = true; + activeTaskAssignments = interleaveConsumerTasksByGroupId(state.activeTasks(), consumers); + } - final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>(); - if (!state.standbyTasks().isEmpty()) { - final List<TaskId> assignedStandbyList = interleavedStandby.get(consumerTaskIndex); - for (final TaskId taskId : assignedStandbyList) { - standby.computeIfAbsent(taskId, k -> new HashSet<>()).addAll(partitionsForTask.get(taskId)); - } - } + final Map<String, List<TaskId>> interleavedStandby = + interleaveConsumerTasksByGroupId(state.standbyTasks(), consumers); - consumerTaskIndex++; - - // finally, encode the assignment before sending back to coordinator - assignment.put( - consumer, - new Assignment( - activePartitionsList, - new AssignmentInfo( - minUserMetadataVersion, - minSupportedMetadataVersion, - assignedActiveList, - standby, - partitionsByHostState, - 0 - ).encode() - ) - ); - } + addClientAssignments( + assignment, + clientMetadata, + partitionsForTask, + partitionsByHostState, + allOwnedPartitions, + activeTaskAssignments, + interleavedStandby, + minUserMetadataVersion, + minSupportedMetadataVersion); } return assignment; } - private static Map<String, Assignment> versionProbingAssignment(final Map<UUID, ClientMetadata> clientsMetadata, - final Map<TaskId, Set<TopicPartition>> partitionsForTask, - final Map<HostInfo, Set<TopicPartition>> partitionsByHostState, - final Set<String> futureConsumers, - final int minUserMetadataVersion, - final int minSupportedMetadataVersion) { + private Map<String, Assignment> versionProbingAssignment(final Map<UUID, ClientMetadata> clientsMetadata, + final Map<TaskId, Set<TopicPartition>> partitionsForTask, + final Map<HostInfo, Set<TopicPartition>> partitionsByHostState, + final Set<TopicPartition> allOwnedPartitions, + final int minUserMetadataVersion, + final int minSupportedMetadataVersion) { final Map<String, Assignment> assignment = new HashMap<>(); - // assign previously assigned tasks to "old consumers" + // Since we know another rebalance will be triggered anyway, just try and generate a balanced assignment + // (without violating cooperative protocol) now so that on the second rebalance we can just give tasks + // back to their previous owners + // within the client, distribute tasks to its owned consumers for (final ClientMetadata clientMetadata : clientsMetadata.values()) { - for (final String consumerId : clientMetadata.consumers) { + final ClientState state = clientMetadata.state; - if (futureConsumers.contains(consumerId)) { - continue; - } + final Map<String, List<TaskId>> interleavedActive = + interleaveConsumerTasksByGroupId(state.activeTasks(), clientMetadata.consumers); + final Map<String, List<TaskId>> interleavedStandby = + interleaveConsumerTasksByGroupId(state.standbyTasks(), clientMetadata.consumers); - // Return the same active tasks that were claimed in the subscription - final List<TaskId> activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasksForConsumer(consumerId)); - - // These will be filled in by buildAssignedActiveTaskAndPartitionsList below - final List<TopicPartition> activePartitionsList = new ArrayList<>(); - final List<TaskId> assignedActiveList = new ArrayList<>(); - - buildAssignedActiveTaskAndPartitionsList(activeTasks, activePartitionsList, assignedActiveList, partitionsForTask); + addClientAssignments( + assignment, + clientMetadata, + partitionsForTask, + partitionsByHostState, + allOwnedPartitions, + interleavedActive, + interleavedStandby, + minUserMetadataVersion, + minSupportedMetadataVersion); + } - // Return the same standby tasks that were claimed in the subscription - final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); - for (final TaskId taskId : clientMetadata.state.prevStandbyTasksForConsumer(consumerId)) { - standbyTasks.put(taskId, partitionsForTask.get(taskId)); - } + return assignment; + } - assignment.put(consumerId, new Assignment( + private void addClientAssignments(final Map<String, Assignment> assignment, + final ClientMetadata clientMetadata, + final Map<TaskId, Set<TopicPartition>> partitionsForTask, + final Map<HostInfo, Set<TopicPartition>> partitionsByHostState, + final Set<TopicPartition> allOwnedPartitions, + final Map<String, List<TaskId>> activeTaskAssignments, + final Map<String, List<TaskId>> standbyTaskAssignments, + final int minUserMetadataVersion, + final int minSupportedMetadataVersion) { + + // Loop through the consumers and build their assignment + for (final String consumer : clientMetadata.consumers) { + final List<TaskId> activeTasksForConsumer = activeTaskAssignments.get(consumer); + + // These will be filled in by buildAssignedActiveTaskAndPartitionsList below + final List<TopicPartition> activePartitionsList = new ArrayList<>(); + final List<TaskId> assignedActiveList = new ArrayList<>(); + + buildAssignedActiveTaskAndPartitionsList(consumer, + clientMetadata.state, + activeTasksForConsumer, + partitionsForTask, + allOwnedPartitions, + activePartitionsList, + assignedActiveList); + + final Map<TaskId, Set<TopicPartition>> standbyTaskMap = + buildStandbyTaskMap(standbyTaskAssignments.get(consumer), partitionsForTask); + + // finally, encode the assignment and insert into map with all assignments + assignment.put( + consumer, + new Assignment( activePartitionsList, new AssignmentInfo( minUserMetadataVersion, minSupportedMetadataVersion, assignedActiveList, - standbyTasks, + standbyTaskMap, partitionsByHostState, - 0) - .encode() - )); - } - } - - // add empty assignment for "future version" clients (ie, empty version probing response) - for (final String consumerId : futureConsumers) { - assignment.put(consumerId, new Assignment( - Collections.emptyList(), - new AssignmentInfo(minUserMetadataVersion, minSupportedMetadataVersion).encode() - )); + AssignorError.NONE.code() + ).encode() + ) + ); } - - return assignment; } - private static void buildAssignedActiveTaskAndPartitionsList(final List<TaskId> activeTasks, - final List<TopicPartition> activePartitionsList, - final List<TaskId> assignedActiveList, - final Map<TaskId, Set<TopicPartition>> partitionsForTask) { + private void buildAssignedActiveTaskAndPartitionsList(final String consumer, + final ClientState clientState, + final List<TaskId> activeTasksForConsumer, + final Map<TaskId, Set<TopicPartition>> partitionsForTask, + final Set<TopicPartition> allOwnedPartitions, + final List<TopicPartition> activePartitionsList, + final List<TaskId> assignedActiveList) { final List<AssignedPartition> assignedPartitions = new ArrayList<>(); // Build up list of all assigned partition-task pairs - for (final TaskId taskId : activeTasks) { + for (final TaskId taskId : activeTasksForConsumer) { + final List<AssignedPartition> assignedPartitionsForTask = new ArrayList<>(); for (final TopicPartition partition : partitionsForTask.get(taskId)) { - assignedPartitions.add(new AssignedPartition(taskId, partition)); + final String oldOwner = clientState.ownedPartitions().get(partition); + final boolean newPartitionForConsumer = oldOwner == null || !oldOwner.equals(consumer); + + // If the partition is new to this consumer but is still owned by another, remove from the assignment + // until it has been revoked and can safely be reassigned according the COOPERATIVE protocol + if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) { + log.debug("Removing task {} from assignment until it is safely revoked", taskId); + clientState.removeFromAssignment(taskId); + // Clear the assigned partitions list for this task if any partition can not safely be assigned, + // so as not to encode a partial task + assignedPartitionsForTask.clear(); + break; + } else { + assignedPartitionsForTask.add(new AssignedPartition(taskId, partition)); + } } + // assignedPartitionsForTask will either contain all partitions for the task or be empty, so just add all + assignedPartitions.addAll(assignedPartitionsForTask); } // Add one copy of a task for each corresponding partition, so the receiver can determine the task <-> tp mapping @@ -730,17 +831,175 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf } } - // visible for testing - static List<List<TaskId>> interleaveTasksByGroupId(final Collection<TaskId> taskIds, final int numberThreads) { + private static Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final Collection<TaskId> standbys, + final Map<TaskId, Set<TopicPartition>> partitionsForTask) { + final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<>(); + for (final TaskId task : standbys) { + standbyTaskMap.put(task, partitionsForTask.get(task)); + } + return standbyTaskMap; + } + + /** + * Generates an assignment that tries to satisfy two conditions: no active task previously owned by a consumer + * be assigned to another (ie nothing gets revoked), and the number of tasks is evenly distributed throughout + * the client. + * <p> + * If it is impossible to satisfy both constraints we abort early and return an empty map so we can use a + * different assignment strategy that tries to distribute tasks of a single subtopology across different threads. + * + * @param state state for this client + * @param consumers the consumers in this client + * @param partitionsForTask mapping from task to its associated partitions + * @param allOwnedPartitions set of all partitions claimed as owned by the group + * @return task assignment for the consumers of this client + * empty map if it is not possible to generate a balanced assignment without moving a task to a new consumer + */ + Map<String, List<TaskId>> tryStickyAndBalancedTaskAssignmentWithinClient(final ClientState state, + final Set<String> consumers, + final Map<TaskId, Set<TopicPartition>> partitionsForTask, + final Set<TopicPartition> allOwnedPartitions) { + final Map<String, List<TaskId>> assignments = new HashMap<>(); + final LinkedList<TaskId> newTasks = new LinkedList<>(); + final Set<String> unfilledConsumers = new HashSet<>(consumers); + + final int maxTasksPerClient = (int) Math.ceil(((double) state.activeTaskCount()) / consumers.size()); + + // initialize task list for consumers + for (final String consumer : consumers) { + assignments.put(consumer, new ArrayList<>()); + } + + for (final TaskId task : state.activeTasks()) { + final Set<String> previousConsumers = previousConsumersOfTaskPartitions(partitionsForTask.get(task), state.ownedPartitions(), allOwnedPartitions); + + // If this task's partitions were owned by different consumers, we can't avoid revoking partitions + if (previousConsumers.size() > 1) { + log.warn("The partitions of task {} were claimed as owned by different StreamThreads. " + + "This indicates the mapping from partitions to tasks has changed!", task); + return Collections.emptyMap(); + } + + // If this is a new task, or its old consumer no longer exists, it can be freely (re)assigned + if (previousConsumers.isEmpty()) { + log.debug("Task {} was not previously owned by any consumers still in the group. It's owner may " + + "have died or it may be a new task", task); + newTasks.add(task); + } else { + final String consumer = previousConsumers.iterator().next(); + + // If the previous consumer was from another client, these partitions will have to be revoked + if (!consumers.contains(consumer)) { + log.debug("This client was assigned a task {} whose partition(s) were previously owned by another " + + "client, falling back to an interleaved assignment since a rebalance is inevitable.", task); + return Collections.emptyMap(); + } + + // If this consumer previously owned more tasks than it has capacity for, some must be revoked + if (assignments.get(consumer).size() >= maxTasksPerClient) { + log.debug("Cannot create a sticky and balanced assignment as this client's consumers owned more " + + "previous tasks than it has capacity for during this assignment, falling back to interleaved " + + "assignment since a realance is inevitable."); + return Collections.emptyMap(); + } + + assignments.get(consumer).add(task); + + // If we have now reached capacity, remove it from set of consumers who still need more tasks + if (assignments.get(consumer).size() == maxTasksPerClient) { + unfilledConsumers.remove(consumer); + } + } + } + + // Interleave any remaining tasks by groupId among the consumers with remaining capacity. For further + // explanation, see the javadocs for #interleaveConsumerTasksByGroupId + Collections.sort(newTasks); + while (!newTasks.isEmpty()) { + if (unfilledConsumers.isEmpty()) { + throw new IllegalStateException("Some tasks could not be distributed"); + } + + final Iterator<String> consumerIt = unfilledConsumers.iterator(); + + // Loop through the unfilled consumers and distribute tasks until newTasks is empty + while (consumerIt.hasNext()) { + final String consumer = consumerIt.next(); + final List<TaskId> consumerAssignment = assignments.get(consumer); + final TaskId task = newTasks.poll(); + if (task == null) { + break; + } + + consumerAssignment.add(task); + if (consumerAssignment.size() == maxTasksPerClient) { + consumerIt.remove(); + } + } + } + + return assignments; + } + + /** + * Get the previous consumer for the partitions of a task + * + * @param taskPartitions the TopicPartitions for a single given task + * @param clientOwnedPartitions the partitions owned by all consumers in a client + * @param allOwnedPartitions all partitions claimed as owned by any consumer in any client + * @return set of consumer(s) that previously owned the partitions in this task + * empty set signals that it is a new task, or its previous owner is no longer in the group + */ + Set<String> previousConsumersOfTaskPartitions(final Set<TopicPartition> taskPartitions, + final Map<TopicPartition, String> clientOwnedPartitions, + final Set<TopicPartition> allOwnedPartitions) { + // this "foreignConsumer" indicates a partition was owned by someone from another client -- we don't really care who + final String foreignConsumer = ""; + final Set<String> previousConsumers = new HashSet<>(); + + for (final TopicPartition tp : taskPartitions) { + final String currentPartitionConsumer = clientOwnedPartitions.get(tp); + if (currentPartitionConsumer != null) { + previousConsumers.add(currentPartitionConsumer); + } else if (allOwnedPartitions.contains(tp)) { + previousConsumers.add(foreignConsumer); + } + } + + return previousConsumers; + } + + /** + * Generate an assignment that attempts to maximize load balance without regard for stickiness, by spreading + * tasks of the same groupId (subtopology) over different consumers. + * + * @param taskIds the set of tasks to be distributed + * @param consumers the set of consumers to receive tasks + * @return a map of task assignments keyed by the consumer id + */ + static Map<String, List<TaskId>> interleaveConsumerTasksByGroupId(final Collection<TaskId> taskIds, + final Set<String> consumers) { + // First we make a sorted list of the tasks, grouping them by groupId final LinkedList<TaskId> sortedTasks = new LinkedList<>(taskIds); Collections.sort(sortedTasks); - final List<List<TaskId>> taskIdsForConsumerAssignment = new ArrayList<>(numberThreads); - for (int i = 0; i < numberThreads; i++) { - taskIdsForConsumerAssignment.add(new ArrayList<>()); + + // Initialize the assignment map and task list for each consumer. We use a TreeMap here for a consistent + // ordering of the consumers in the hope they will end up with the same set of tasks in subsequent assignments + final Map<String, List<TaskId>> taskIdsForConsumerAssignment = new TreeMap<>(); + for (final String consumer : consumers) { + taskIdsForConsumerAssignment.put(consumer, new ArrayList<>()); } + + // We loop until the tasks have all been assigned, removing them from the list when they are given to a + // consumer. To interleave the tasks, we loop through the consumers and give each one task from the head + // of the list. When we finish going through the list of consumers we start over at the beginning of the + // consumers list, continuing until we run out of tasks. while (!sortedTasks.isEmpty()) { - for (final List<TaskId> taskIdList : taskIdsForConsumerAssignment) { + for (final Map.Entry<String, List<TaskId>> consumerTaskIds : taskIdsForConsumerAssignment.entrySet()) { + final List<TaskId> taskIdList = consumerTaskIds.getValue(); final TaskId taskId = sortedTasks.poll(); + + // Check for null here as we may run out of tasks before giving every consumer exactly the same number if (taskId == null) { break; } @@ -774,7 +1033,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMetadataVersion, final int latestCommonlySupportedVersion) { if (receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) { - // If the latest commonly supported version is now greater than our used version, this indicates we have just // completed the rolling upgrade and can now update our subscription version for the final rebalance if (latestCommonlySupportedVersion > usedSubscriptionMetadataVersion) { @@ -880,6 +1138,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf taskManager.setPartitionsToTaskId(partitionsToTaskId); taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks()); taskManager.updateSubscriptionsFromAssignment(partitions); + taskManager.setRebalanceInProgress(false); } private static void processVersionOneAssignment(final String logPrefix, @@ -973,12 +1232,23 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf } } + private int updateMinReceivedVersion(final int usedVersion, final int minReceivedMetadataVersion) { + return usedVersion < minReceivedMetadataVersion ? usedVersion : minReceivedMetadataVersion; + } + + private int updateMinSupportedVersion(final int supportedVersion, final int minSupportedMetadataVersion) { + return supportedVersion < minSupportedMetadataVersion ? supportedVersion : minSupportedMetadataVersion; + } + protected void setAssignmentErrorCode(final Integer errorCode) { assignmentErrorCode.set(errorCode); } - // following functions are for test only + void setRebalanceProtocol(final RebalanceProtocol rebalanceProtocol) { + this.rebalanceProtocol = rebalanceProtocol; + } + void setInternalTopicManager(final InternalTopicManager internalTopicManager) { this.internalTopicManager = internalTopicManager; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index da9e656..af1b4bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -27,7 +27,7 @@ import java.util.Set; public interface Task { /** - * Initialize the task and return {@code true} if the task is ready to run, i.e, it has not state stores + * Initialize the task and return {@code true} if the task is ready to run, i.e, it has no state stores * @return true if this task has no state stores that may need restoring. * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition @@ -40,14 +40,8 @@ public interface Task { void commit(); - void suspend(); - void resume(); - void closeSuspended(final boolean clean, - final boolean isZombie, - final RuntimeException e); - void close(final boolean clean, final boolean isZombie); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index cd90fad..a2dac40 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -60,6 +60,7 @@ public class TaskManager { private final Admin adminClient; private DeleteRecordsResult deleteRecordsResult; + private boolean rebalanceInProgress = false; // if we are in the middle of a rebalance, it is not safe to commit // the restore consumer is only ever assigned changelogs from restoring tasks or standbys (but not both) private boolean restoreConsumerAssignedStandbys = false; @@ -144,7 +145,7 @@ public class TaskManager { addedActiveTasks.put(taskId, partitions); } } catch (final StreamsException e) { - log.error("Failed to resume an active task {} due to the following error:", taskId, e); + log.error("Failed to resume a suspended active task {} due to the following error:", taskId, e); throw e; } } @@ -303,7 +304,11 @@ public class TaskManager { } } - Set<TaskId> activeTaskIds() { + public Set<TaskId> previousRunningTaskIds() { + return active.previousRunningTaskIds(); + } + + public Set<TaskId> activeTaskIds() { return active.allAssignedTaskIds(); } @@ -319,10 +324,6 @@ public class TaskManager { return revokedStandbyTasks.keySet(); } - public Set<TaskId> previousRunningTaskIds() { - return active.previousRunningTaskIds(); - } - Set<TaskId> previousActiveTaskIds() { final HashSet<TaskId> previousActiveTasks = new HashSet<>(assignedActiveTasks.keySet()); previousActiveTasks.addAll(revokedActiveTasks.keySet()); @@ -378,9 +379,11 @@ public class TaskManager { active.initializeNewTasks(); standby.initializeNewTasks(); - final Collection<TopicPartition> restored = changelogReader.restore(active); - active.updateRestored(restored); - removeChangelogsFromRestoreConsumer(restored, false); + if (active.hasRestoringTasks()) { + final Collection<TopicPartition> restored = changelogReader.restore(active); + active.updateRestored(restored); + removeChangelogsFromRestoreConsumer(restored, false); + } if (active.allTasksRunning()) { final Set<TopicPartition> assignment = consumer.assignment(); @@ -420,6 +423,10 @@ public class TaskManager { } } + public void setRebalanceInProgress(final boolean rebalanceInProgress) { + this.rebalanceInProgress = rebalanceInProgress; + } + public void setClusterMetadata(final Cluster cluster) { this.cluster = cluster; } @@ -493,10 +500,10 @@ public class TaskManager { /** * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) + * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit */ int commitAll() { - final int committed = active.commit(); - return committed + standby.commit(); + return rebalanceInProgress ? -1 : active.commit() + standby.commit(); } /** @@ -518,7 +525,7 @@ public class TaskManager { * or if the task producer got fenced (EOS) */ int maybeCommitActiveTasksPerUserRequested() { - return active.maybeCommitPerUserRequested(); + return rebalanceInProgress ? -1 : active.maybeCommitPerUserRequested(); } void maybePurgeCommitedRecords() { @@ -528,7 +535,8 @@ public class TaskManager { if (deleteRecordsResult == null || deleteRecordsResult.all().isDone()) { if (deleteRecordsResult != null && deleteRecordsResult.all().isCompletedExceptionally()) { - log.debug("Previous delete-records request has failed: {}. Try sending the new request now", deleteRecordsResult.lowWatermarks()); + log.debug("Previous delete-records request has failed: {}. Try sending the new request now", + deleteRecordsResult.lowWatermarks()); } final Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index ac88f2f..1e406e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -153,7 +153,8 @@ public final class AssignorConfiguration { throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom); } } - return RebalanceProtocol.EAGER; + + return RebalanceProtocol.COOPERATIVE; } public String logPrefix() { @@ -181,14 +182,19 @@ public final class AssignorConfiguration { upgradeFrom ); return VERSION_TWO; + case StreamsConfig.UPGRADE_FROM_20: + case StreamsConfig.UPGRADE_FROM_21: + case StreamsConfig.UPGRADE_FROM_22: + case StreamsConfig.UPGRADE_FROM_23: + // These configs are for cooperative rebalancing and should not affect the metadata version + break; default: throw new IllegalArgumentException( "Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom ); } - } else { - return priorVersion; } + return priorVersion; } public int getNumStandbyReplicas() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index ab213d5..df42b14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -16,11 +16,13 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Map; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; +import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; public class ClientState { @@ -31,8 +33,7 @@ public class ClientState { private final Set<TaskId> prevStandbyTasks; private final Set<TaskId> prevAssignedTasks; - private final Map<String, Set<TaskId>> prevActiveTasksByConsumer; - private final Map<String, Set<TaskId>> prevStandbyTasksByConsumer; + private final Map<TopicPartition, String> ownedPartitions; private int capacity; @@ -48,7 +49,6 @@ public class ClientState { new HashSet<>(), new HashSet<>(), new HashMap<>(), - new HashMap<>(), capacity); } @@ -58,8 +58,7 @@ public class ClientState { final Set<TaskId> prevActiveTasks, final Set<TaskId> prevStandbyTasks, final Set<TaskId> prevAssignedTasks, - final Map<String, Set<TaskId>> prevActiveTasksByConsumer, - final Map<String, Set<TaskId>> prevStandbyTasksByConsumer, + final Map<TopicPartition, String> ownedPartitions, final int capacity) { this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; @@ -67,8 +66,7 @@ public class ClientState { this.prevActiveTasks = prevActiveTasks; this.prevStandbyTasks = prevStandbyTasks; this.prevAssignedTasks = prevAssignedTasks; - this.prevActiveTasksByConsumer = prevActiveTasksByConsumer; - this.prevStandbyTasksByConsumer = prevStandbyTasksByConsumer; + this.ownedPartitions = ownedPartitions; this.capacity = capacity; } @@ -80,8 +78,7 @@ public class ClientState { new HashSet<>(prevActiveTasks), new HashSet<>(prevStandbyTasks), new HashSet<>(prevAssignedTasks), - new HashMap<>(prevActiveTasksByConsumer), - new HashMap<>(prevStandbyTasksByConsumer), + new HashMap<>(ownedPartitions), capacity); } @@ -111,6 +108,10 @@ public class ClientState { return prevStandbyTasks; } + public Map<TopicPartition, String> ownedPartitions() { + return ownedPartitions; + } + @SuppressWarnings("WeakerAccess") public int assignedTaskCount() { return assignedTasks.size(); @@ -125,24 +126,25 @@ public class ClientState { return activeTasks.size(); } - public void addPreviousActiveTasks(final String consumer, final Set<TaskId> prevTasks) { + public void addPreviousActiveTasks(final Set<TaskId> prevTasks) { prevActiveTasks.addAll(prevTasks); prevAssignedTasks.addAll(prevTasks); - prevActiveTasksByConsumer.put(consumer, prevTasks); } - public void addPreviousStandbyTasks(final String consumer, final Set<TaskId> standbyTasks) { + public void addPreviousStandbyTasks(final Set<TaskId> standbyTasks) { prevStandbyTasks.addAll(standbyTasks); prevAssignedTasks.addAll(standbyTasks); - prevStandbyTasksByConsumer.put(consumer, standbyTasks); } - public Set<TaskId> prevActiveTasksForConsumer(final String consumer) { - return prevActiveTasksByConsumer.get(consumer); + public void addOwnedPartitions(final Collection<TopicPartition> ownedPartitions, final String consumer) { + for (final TopicPartition tp : ownedPartitions) { + this.ownedPartitions.put(tp, consumer); + } } - public Set<TaskId> prevStandbyTasksForConsumer(final String consumer) { - return prevStandbyTasksByConsumer.get(consumer); + public void removeFromAssignment(final TaskId task) { + activeTasks.remove(task); + assignedTasks.remove(task); } @Override @@ -153,6 +155,7 @@ public class ClientState { ") prevActiveTasks: (" + prevActiveTasks + ") prevStandbyTasks: (" + prevStandbyTasks + ") prevAssignedTasks: (" + prevAssignedTasks + + ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() + ") capacity: " + capacity + "]"; } @@ -182,16 +185,6 @@ public class ClientState { } } - Set<TaskId> previousStandbyTasks() { - final Set<TaskId> standby = new HashSet<>(prevAssignedTasks); - standby.removeAll(prevActiveTasks); - return standby; - } - - Set<TaskId> previousActiveTasks() { - return prevActiveTasks; - } - boolean hasAssignedTask(final TaskId taskId) { return assignedTasks.contains(taskId); } @@ -212,4 +205,9 @@ public class ClientState { boolean hasUnfulfilledQuota(final int tasksPerThread) { return activeTasks.size() < capacity * tasksPerThread; } + + // the following methods are used for testing only + public void assignActiveTasks(final Collection<TaskId> tasks) { + activeTasks.addAll(tasks); + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java index 157497d..d1da8b8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java @@ -228,14 +228,12 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> { private void mapPreviousTaskAssignment(final Map<ID, ClientState> clients) { for (final Map.Entry<ID, ClientState> clientState : clients.entrySet()) { - for (final TaskId activeTask : clientState.getValue().previousActiveTasks()) { + for (final TaskId activeTask : clientState.getValue().prevActiveTasks()) { previousActiveTaskAssignment.put(activeTask, clientState.getKey()); } - for (final TaskId prevAssignedTask : clientState.getValue().previousStandbyTasks()) { - if (!previousStandbyTaskAssignment.containsKey(prevAssignedTask)) { - previousStandbyTaskAssignment.put(prevAssignedTask, new HashSet<>()); - } + for (final TaskId prevAssignedTask : clientState.getValue().prevStandbyTasks()) { + previousStandbyTaskAssignment.computeIfAbsent(prevAssignedTask, t -> new HashSet<>()); previousStandbyTaskAssignment.get(prevAssignedTask).add(clientState.getKey()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index a8526bf..7ce9712 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -227,15 +227,9 @@ public class AbstractTaskTest { public void commit() {} @Override - public void suspend() {} - - @Override public void close(final boolean clean, final boolean isZombie) {} @Override - public void closeSuspended(final boolean clean, final boolean isZombie, final RuntimeException e) {} - - @Override public boolean initializeStateStores() { return false; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 88d4d6f..c2b9cdb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -443,7 +443,6 @@ public class StandbyTaskTest { singletonList(makeWindowedConsumerRecord(changelogName, 10, 1, 0L, 60_000L)) ); - task.suspend(); task.close(true, false); final File taskDir = stateDirectory.directoryForTask(taskId); @@ -817,26 +816,4 @@ public class StandbyTaskTest { final double expectedCloseTaskMetric = 1.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); } - - @Test - public void shouldRecordTaskClosedMetricOnCloseSuspended() throws IOException { - final MetricName metricName = setupCloseTaskMetric(); - final StandbyTask task = new StandbyTask( - taskId, - ktablePartitions, - ktableTopology, - consumer, - changelogReader, - createConfig(baseDir), - streamsMetrics, - stateDirectory - ); - - final boolean clean = true; - final boolean isZombie = false; - task.closeSuspended(clean, isZombie, new RuntimeException()); - - final double expectedCloseTaskMetric = 1.0; - verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index e997f2e..9ff6e33 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Arrays; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -37,6 +39,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; +import org.apache.kafka.streams.processor.internals.assignment.ClientState; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.test.MockClientSupplier; @@ -66,11 +69,17 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @SuppressWarnings("unchecked") public class StreamsPartitionAssignorTest { + private final String c1 = "consumer1"; + private final String c2 = "consumer2"; + private final String c3 = "consumer3"; + private final String c4 = "consumer4"; private final TopicPartition t1p0 = new TopicPartition("topic1", 0); private final TopicPartition t1p1 = new TopicPartition("topic1", 1); @@ -84,6 +93,40 @@ public class StreamsPartitionAssignorTest { private final TopicPartition t3p1 = new TopicPartition("topic3", 1); private final TopicPartition t3p2 = new TopicPartition("topic3", 2); private final TopicPartition t3p3 = new TopicPartition("topic3", 3); + private final TopicPartition t4p0 = new TopicPartition("topic4", 0); + private final TopicPartition t4p1 = new TopicPartition("topic4", 1); + private final TopicPartition t4p2 = new TopicPartition("topic4", 2); + private final TopicPartition t4p3 = new TopicPartition("topic4", 3); + + private final TaskId task0_0 = new TaskId(0, 0); + private final TaskId task0_1 = new TaskId(0, 1); + private final TaskId task0_2 = new TaskId(0, 2); + private final TaskId task0_3 = new TaskId(0, 3); + private final TaskId task1_0 = new TaskId(1, 0); + private final TaskId task1_1 = new TaskId(1, 1); + private final TaskId task1_2 = new TaskId(1, 2); + private final TaskId task1_3 = new TaskId(1, 3); + private final TaskId task2_0 = new TaskId(2, 0); + private final TaskId task2_1 = new TaskId(2, 1); + private final TaskId task2_2 = new TaskId(2, 2); + private final TaskId task2_3 = new TaskId(2, 3); + + private final Map<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<TaskId, Set<TopicPartition>>() {{ + put(task0_0, Utils.mkSet(t1p0, t2p0)); + put(task0_1, Utils.mkSet(t1p1, t2p1)); + put(task0_2, Utils.mkSet(t1p2, t2p2)); + put(task0_3, Utils.mkSet(t1p3, t2p3)); + + put(task1_0, Utils.mkSet(t3p0)); + put(task1_1, Utils.mkSet(t3p1)); + put(task1_2, Utils.mkSet(t3p2)); + put(task1_3, Utils.mkSet(t3p3)); + + put(task2_0, Utils.mkSet(t4p0)); + put(task2_1, Utils.mkSet(t4p1)); + put(task2_2, Utils.mkSet(t4p2)); + put(task2_3, Utils.mkSet(t4p3)); + }}; private final Set<String> allTopics = Utils.mkSet("topic1", "topic2"); @@ -109,10 +152,6 @@ public class StreamsPartitionAssignorTest { Collections.emptySet(), Collections.emptySet()); - private final TaskId task0 = new TaskId(0, 0); - private final TaskId task1 = new TaskId(0, 1); - private final TaskId task2 = new TaskId(0, 2); - private final TaskId task3 = new TaskId(0, 3); private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor(); private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); private final InternalTopologyBuilder builder = new InternalTopologyBuilder(); @@ -137,6 +176,11 @@ public class StreamsPartitionAssignorTest { partitionAssignor.configure(configurationMap); } + private void configureDefault() { + createMockTaskManager(); + partitionAssignor.configure(configProps()); + } + private void createMockTaskManager() { final StreamsBuilder builder = new StreamsBuilder(); final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build()); @@ -153,6 +197,7 @@ public class StreamsPartitionAssignorTest { EasyMock.expect(taskManager.adminClient()).andReturn(null).anyTimes(); EasyMock.expect(taskManager.builder()).andReturn(builder).anyTimes(); EasyMock.expect(taskManager.previousRunningTaskIds()).andReturn(prevTasks).anyTimes(); + EasyMock.expect(taskManager.activeTaskIds()).andReturn(prevTasks).anyTimes(); EasyMock.expect(taskManager.cachedTasksIds()).andReturn(cachedTasks).anyTimes(); EasyMock.expect(taskManager.processId()).andReturn(processId).anyTimes(); } @@ -169,6 +214,141 @@ public class StreamsPartitionAssignorTest { } @Test + public void shouldUseEagerRebalancingProtocol() { + createMockTaskManager(); + final Map<String, Object> props = configProps(); + props.put(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23); + partitionAssignor.configure(props); + + assertEquals(1, partitionAssignor.supportedProtocols().size()); + assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.EAGER)); + assertFalse(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE)); + } + + @Test + public void shouldUseCooperativeRebalancingProtocol() { + createMockTaskManager(); + final Map<String, Object> props = configProps(); + partitionAssignor.configure(props); + + assertEquals(2, partitionAssignor.supportedProtocols().size()); + assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE)); + } + + @Test + public void shouldProduceStickyAndBalancedAssignmentWhenNothingChanges() { + configureDefault(); + final ClientState state = new ClientState(); + final List<TaskId> allTasks = Arrays.asList(task0_0, task0_1, task0_2, task0_3, task1_0, task1_1, task1_2, task1_3); + + final Map<String, List<TaskId>> previousAssignment = new HashMap<String, List<TaskId>>() {{ + put(c1, Arrays.asList(task0_0, task1_1, task1_3)); + put(c2, Arrays.asList(task0_3, task1_0)); + put(c3, Arrays.asList(task0_1, task0_2, task1_2)); + }}; + + for (final Map.Entry<String, List<TaskId>> entry : previousAssignment.entrySet()) { + for (final TaskId task : entry.getValue()) { + state.addOwnedPartitions(partitionsForTask.get(task), entry.getKey()); + } + } + + final Set<String> consumers = Utils.mkSet(c1, c2, c3); + state.assignActiveTasks(allTasks); + + assertEquivalentAssignment(previousAssignment, + partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(state, consumers, partitionsForTask, Collections.emptySet())); + } + + @Test + public void shouldProduceStickyAndBalancedAssignmentWhenNewTasksAreAdded() { + configureDefault(); + final ClientState state = new ClientState(); + + final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2, task0_3, task1_0, task1_1, task1_2, task1_3); + + final Map<String, List<TaskId>> previousAssignment = new HashMap<String, List<TaskId>>() {{ + put(c1, new ArrayList<>(Arrays.asList(task0_0, task1_1, task1_3))); + put(c2, new ArrayList<>(Arrays.asList(task0_3, task1_0))); + put(c3, new ArrayList<>(Arrays.asList(task0_1, task0_2, task1_2))); + }}; + + for (final Map.Entry<String, List<TaskId>> entry : previousAssignment.entrySet()) { + for (final TaskId task : entry.getValue()) { + state.addOwnedPartitions(partitionsForTask.get(task), entry.getKey()); + } + } + + final Set<String> consumers = Utils.mkSet(c1, c2, c3); + + // We should be able to add a new task without sacrificing stickyness + final TaskId newTask = task2_0; + allTasks.add(newTask); + state.assignActiveTasks(allTasks); + + final Map<String, List<TaskId>> newAssignment = partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(state, consumers, partitionsForTask, Collections.emptySet()); + + previousAssignment.get(c2).add(newTask); + assertEquivalentAssignment(previousAssignment, newAssignment); + } + + @Test + public void shouldReturnEmptyMapWhenStickyAndBalancedAssignmentIsNotPossibleBecauseNewConsumerJoined() { + configureDefault(); + final ClientState state = new ClientState(); + + final List<TaskId> allTasks = Arrays.asList(task0_0, task0_1, task0_2, task0_3, task1_0, task1_1, task1_2, task1_3); + + final Map<String, List<TaskId>> previousAssignment = new HashMap<String, List<TaskId>>() {{ + put(c1, Arrays.asList(task0_0, task1_1, task1_3)); + put(c2, Arrays.asList(task0_3, task1_0)); + put(c3, Arrays.asList(task0_1, task0_2, task1_2)); + }}; + + for (final Map.Entry<String, List<TaskId>> entry : previousAssignment.entrySet()) { + for (final TaskId task : entry.getValue()) { + state.addOwnedPartitions(partitionsForTask.get(task), entry.getKey()); + } + } + + // If we add a new consumer here, we cannot produce an assignment that is both sticky and balanced + final Set<String> consumers = Utils.mkSet(c1, c2, c3, c4); + state.assignActiveTasks(allTasks); + + assertThat(partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(state, consumers, partitionsForTask, Collections.emptySet()), + equalTo(Collections.emptyMap())); + } + + @Test + public void shouldReturnEmptyMapWhenStickyAndBalancedAssignmentIsNotPossibleBecauseOtherClientOwnedPartition() { + configureDefault(); + final ClientState state = new ClientState(); + + final List<TaskId> allTasks = Arrays.asList(task0_0, task0_1, task0_2, task0_3, task1_0, task1_1, task1_2, task1_3); + + final Map<String, List<TaskId>> previousAssignment = new HashMap<String, List<TaskId>>() {{ + put(c1, new ArrayList<>(Arrays.asList(task1_1, task1_3))); + put(c2, new ArrayList<>(Arrays.asList(task0_3, task1_0))); + put(c3, new ArrayList<>(Arrays.asList(task0_1, task0_2, task1_2))); + }}; + + for (final Map.Entry<String, List<TaskId>> entry : previousAssignment.entrySet()) { + for (final TaskId task : entry.getValue()) { + state.addOwnedPartitions(partitionsForTask.get(task), entry.getKey()); + } + } + + // Add the partitions of task0_0 to allOwnedPartitions but not c1's ownedPartitions/previousAssignment + final Set<TopicPartition> allOwnedPartitions = new HashSet<>(partitionsForTask.get(task0_0)); + + final Set<String> consumers = Utils.mkSet(c1, c2, c3); + state.assignActiveTasks(allTasks); + + assertThat(partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(state, consumers, partitionsForTask, allOwnedPartitions), + equalTo(Collections.emptyMap())); + } + + @Test public void shouldInterleaveTasksByGroupId() { final TaskId taskIdA0 = new TaskId(0, 0); final TaskId taskIdA1 = new TaskId(0, 1); @@ -182,21 +362,31 @@ public class StreamsPartitionAssignorTest { final TaskId taskIdC0 = new TaskId(2, 0); final TaskId taskIdC1 = new TaskId(2, 1); + final String c1 = "c1"; + final String c2 = "c2"; + final String c3 = "c3"; + + final Set<String> consumers = Utils.mkSet(c1, c2, c3); + final List<TaskId> expectedSubList1 = asList(taskIdA0, taskIdA3, taskIdB2); final List<TaskId> expectedSubList2 = asList(taskIdA1, taskIdB0, taskIdC0); final List<TaskId> expectedSubList3 = asList(taskIdA2, taskIdB1, taskIdC1); - final List<List<TaskId>> embeddedList = asList(expectedSubList1, expectedSubList2, expectedSubList3); + + final Map<String, List<TaskId>> assignment = new HashMap<>(); + assignment.put(c1, expectedSubList1); + assignment.put(c2, expectedSubList2); + assignment.put(c3, expectedSubList3); final List<TaskId> tasks = asList(taskIdC0, taskIdC1, taskIdB0, taskIdB1, taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3); Collections.shuffle(tasks); - final List<List<TaskId>> interleavedTaskIds = StreamsPartitionAssignor.interleaveTasksByGroupId(tasks, 3); + final Map<String, List<TaskId>> interleavedTaskIds = StreamsPartitionAssignor.interleaveConsumerTasksByGroupId(tasks, consumers); - assertThat(interleavedTaskIds, equalTo(embeddedList)); + assertThat(interleavedTaskIds, equalTo(assignment)); } @Test - public void testSubscription() { + public void testEagerSubscription() { builder.addSource(null, "source1", null, null, null, "topic1"); builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -212,6 +402,7 @@ public class StreamsPartitionAssignorTest { EasyMock.replay(taskManager); configurePartitionAssignor(Collections.emptyMap()); + partitionAssignor.setRebalanceProtocol(RebalanceProtocol.EAGER); final Set<String> topics = Utils.mkSet("topic1", "topic2"); final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); @@ -222,24 +413,60 @@ public class StreamsPartitionAssignorTest { final Set<TaskId> standbyTasks = new HashSet<>(cachedTasks); standbyTasks.removeAll(prevTasks); + // When following the eager protocol, we must encode the previous tasks ourselves since we must revoke + // everything and thus the "ownedPartitions" field in the subscription will be empty final SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null); assertEquals(info.encode(), subscription.userData()); } @Test + public void testCooperativeSubscription() { + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addSource(null, "source2", null, null, null, "topic2"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); + + final Set<TaskId> prevTasks = Utils.mkSet( + new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)); + final Set<TaskId> cachedTasks = Utils.mkSet( + new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), + new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)); + + final UUID processId = UUID.randomUUID(); + createMockTaskManager(prevTasks, cachedTasks, processId, builder); + EasyMock.replay(taskManager); + + configurePartitionAssignor(Collections.emptyMap()); + + final Set<String> topics = Utils.mkSet("topic1", "topic2"); + final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription( + new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); + + Collections.sort(subscription.topics()); + assertEquals(asList("topic1", "topic2"), subscription.topics()); + + final Set<TaskId> standbyTasks = new HashSet<>(cachedTasks); + standbyTasks.removeAll(prevTasks); + + // We don't encode the active tasks when following the cooperative protocol, as these are inferred from the + // ownedPartitions encoded in the subscription + final SubscriptionInfo info = new SubscriptionInfo(processId, Collections.emptySet(), standbyTasks, null); + assertEquals(info.encode(), subscription.userData()); + } + + @Test public void testAssignBasic() { builder.addSource(null, "source1", null, null, null, "topic1"); builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); final List<String> topics = asList("topic1", "topic2"); - final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2); - final Set<TaskId> prevTasks10 = Utils.mkSet(task0); - final Set<TaskId> prevTasks11 = Utils.mkSet(task1); - final Set<TaskId> prevTasks20 = Utils.mkSet(task2); - final Set<TaskId> standbyTasks10 = Utils.mkSet(task1); - final Set<TaskId> standbyTasks11 = Utils.mkSet(task2); - final Set<TaskId> standbyTasks20 = Utils.mkSet(task0); + final Set<TaskId> prevTasks10 = Utils.mkSet(task0_0); + final Set<TaskId> prevTasks11 = Utils.mkSet(task0_1); + final Set<TaskId> prevTasks20 = Utils.mkSet(task0_2); + final Set<TaskId> standbyTasks10 = Utils.mkSet(task0_1); + final Set<TaskId> standbyTasks11 = Utils.mkSet(task0_2); + final Set<TaskId> standbyTasks20 = Utils.mkSet(task0_0); final UUID uuid1 = UUID.randomUUID(); final UUID uuid2 = UUID.randomUUID(); @@ -251,14 +478,20 @@ public class StreamsPartitionAssignorTest { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer10", - new ConsumerPartitionAssignor.Subscription(topics, - new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode())); + new ConsumerPartitionAssignor.Subscription( + topics, + new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode(), + Collections.singletonList(t1p0))); subscriptions.put("consumer11", - new ConsumerPartitionAssignor.Subscription(topics, - new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode())); + new ConsumerPartitionAssignor.Subscription( + topics, + new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode(), + Collections.singletonList(t1p1))); subscriptions.put("consumer20", - new ConsumerPartitionAssignor.Subscription(topics, - new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode())); + new ConsumerPartitionAssignor.Subscription( + topics, + new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode(), + Collections.singletonList(t1p2))); final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -277,7 +510,7 @@ public class StreamsPartitionAssignorTest { final AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); allActiveTasks.addAll(info11.activeTasks()); - assertEquals(Utils.mkSet(task0, task1), allActiveTasks); + assertEquals(Utils.mkSet(task0_0, task0_1), allActiveTasks); // the third consumer final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); @@ -351,12 +584,12 @@ public class StreamsPartitionAssignorTest { // the first consumer final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); - final List<TaskId> expectedInfo10TaskIds = asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3); + final List<TaskId> expectedInfo10TaskIds = asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2); assertEquals(expectedInfo10TaskIds, info10.activeTasks()); // the second consumer final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); - final List<TaskId> expectedInfo11TaskIds = asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2); + final List<TaskId> expectedInfo11TaskIds = asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3); assertEquals(expectedInfo11TaskIds, info11.activeTasks()); } @@ -371,7 +604,7 @@ public class StreamsPartitionAssignorTest { builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), "processor2"); final List<String> topics = asList("topic1", "topic2"); - final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2); final UUID uuid1 = UUID.randomUUID(); @@ -403,10 +636,10 @@ public class StreamsPartitionAssignorTest { builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); final List<String> topics = asList("topic1", "topic2"); - final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2); - final Set<TaskId> prevTasks10 = Utils.mkSet(task0); - final Set<TaskId> standbyTasks10 = Utils.mkSet(task1); + final Set<TaskId> prevTasks10 = Utils.mkSet(task0_0); + final Set<TaskId> standbyTasks10 = Utils.mkSet(task0_1); final Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.emptySet(), Collections.emptySet(), @@ -459,12 +692,12 @@ public class StreamsPartitionAssignorTest { builder.addSource(null, "source3", null, null, null, "topic3"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3"); final List<String> topics = asList("topic1", "topic2", "topic3"); - final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3); + final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2, task0_3); // assuming that previous tasks do not have topic3 - final Set<TaskId> prevTasks10 = Utils.mkSet(task0); - final Set<TaskId> prevTasks11 = Utils.mkSet(task1); - final Set<TaskId> prevTasks20 = Utils.mkSet(task2); + final Set<TaskId> prevTasks10 = Utils.mkSet(task0_0); + final Set<TaskId> prevTasks11 = Utils.mkSet(task0_1); + final Set<TaskId> prevTasks20 = Utils.mkSet(task0_2); final UUID uuid1 = UUID.randomUUID(); final UUID uuid2 = UUID.randomUUID(); @@ -606,15 +839,15 @@ public class StreamsPartitionAssignorTest { builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); final List<String> topics = asList("topic1", "topic2"); - final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2); - final Set<TaskId> prevTasks00 = Utils.mkSet(task0); - final Set<TaskId> prevTasks01 = Utils.mkSet(task1); - final Set<TaskId> prevTasks02 = Utils.mkSet(task2); - final Set<TaskId> standbyTasks01 = Utils.mkSet(task1); - final Set<TaskId> standbyTasks02 = Utils.mkSet(task2); - final Set<TaskId> standbyTasks00 = Utils.mkSet(task0); + final Set<TaskId> prevTasks00 = Utils.mkSet(task0_0); + final Set<TaskId> prevTasks01 = Utils.mkSet(task0_1); + final Set<TaskId> prevTasks02 = Utils.mkSet(task0_2); + final Set<TaskId> standbyTasks01 = Utils.mkSet(task0_1); + final Set<TaskId> standbyTasks02 = Utils.mkSet(task0_2); + final Set<TaskId> standbyTasks00 = Utils.mkSet(task0_0); final UUID uuid1 = UUID.randomUUID(); final UUID uuid2 = UUID.randomUUID(); @@ -651,8 +884,8 @@ public class StreamsPartitionAssignorTest { assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks().keySet(), info10.standbyTasks().keySet()); // check active tasks assigned to the first client - assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks)); - assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks)); + assertEquals(Utils.mkSet(task0_0, task0_1), new HashSet<>(allActiveTasks)); + assertEquals(Utils.mkSet(task0_2), new HashSet<>(allStandbyTasks)); // the third consumer final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); @@ -678,11 +911,11 @@ public class StreamsPartitionAssignorTest { EasyMock.expectLastCall(); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); - activeTasks.put(task0, Utils.mkSet(t3p0)); - activeTasks.put(task3, Utils.mkSet(t3p3)); + activeTasks.put(task0_0, Utils.mkSet(t3p0)); + activeTasks.put(task0_3, Utils.mkSet(t3p3)); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); - standbyTasks.put(task1, Utils.mkSet(t3p1)); - standbyTasks.put(task2, Utils.mkSet(t3p2)); + standbyTasks.put(task0_1, Utils.mkSet(t3p1)); + standbyTasks.put(task0_2, Utils.mkSet(t3p2)); taskManager.setAssignmentMetadata(activeTasks, standbyTasks); EasyMock.expectLastCall(); @@ -693,7 +926,7 @@ public class StreamsPartitionAssignorTest { EasyMock.replay(taskManager); configurePartitionAssignor(Collections.emptyMap()); - final List<TaskId> activeTaskList = asList(task0, task3); + final List<TaskId> activeTaskList = asList(task0_0, task0_3); final AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, hostState); final ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode()); @@ -715,7 +948,7 @@ public class StreamsPartitionAssignorTest { builder.addSource(null, "source2", null, null, null, "topicX"); builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); final List<String> topics = asList("topic1", applicationId + "-topicX"); - final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2); final UUID uuid1 = UUID.randomUUID(); createMockTaskManager(emptyTasks, emptyTasks, uuid1, builder); @@ -749,7 +982,7 @@ public class StreamsPartitionAssignorTest { builder.addSink("sink2", "topicZ", null, null, null, "processor2"); builder.addSource(null, "source3", null, null, null, "topicZ"); final List<String> topics = asList("topic1", "test-topicX", "test-topicZ"); - final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2); final UUID uuid1 = UUID.randomUUID(); createMockTaskManager(emptyTasks, emptyTasks, uuid1, builder); @@ -1197,36 +1430,95 @@ public class StreamsPartitionAssignorTest { } @Test - public void shouldReturnUnchangedAssignmentForOldInstancesAndEmptyAssignmentForFutureInstances() { + public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins() { builder.addSource(null, "source1", null, null, null, "topic1"); - final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2); + + subscriptions.put(c1, + new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(UUID.randomUUID(), allTasks, Collections.emptySet(), null).encode(), + Arrays.asList(t1p0, t1p1, t1p2)) + ); + subscriptions.put(c2, + new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(UUID.randomUUID(), Collections.emptySet(), Collections.emptySet(), null).encode(), + Collections.emptyList()) + ); - final Set<TaskId> activeTasks = Utils.mkSet(task0, task1); - final Set<TaskId> standbyTasks = Utils.mkSet(task2); + createMockTaskManager(allTasks, allTasks, UUID.randomUUID(), builder); + EasyMock.replay(taskManager); + partitionAssignor.configure(configProps()); + + final Map<String, ConsumerPartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + + assertThat(assignment.size(), equalTo(2)); + + assertThat(assignment.get(c1).partitions(), equalTo(asList(t1p0, t1p2))); + assertThat( + AssignmentInfo.decode(assignment.get(c1).userData()), + equalTo(new AssignmentInfo( + Arrays.asList(task0_0, task0_2), + Collections.emptyMap(), + Collections.emptyMap() + ))); + + // The new consumer's assignment should be empty until c1 has the chance to revoke its partitions/tasks + assertThat(assignment.get(c2).partitions(), equalTo(Collections.emptyList())); + assertThat( + AssignmentInfo.decode(assignment.get(c2).userData()), + equalTo(new AssignmentInfo( + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap() + ))); + } + + @Test + public void shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() { + builder.addSource(null, "source1", null, null, null, "topic1"); + + final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2); + + final Set<TaskId> activeTasks = Utils.mkSet(task0_0, task0_1); + final Set<TaskId> standbyTasks = Utils.mkSet(task0_2); final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<TaskId, Set<TopicPartition>>() { { - put(task2, Collections.singleton(t1p2)); + put(task0_2, Collections.singleton(t1p2)); + } + }; + final Map<TaskId, Set<TopicPartition>> futureStandbyTaskMap = new HashMap<TaskId, Set<TopicPartition>>() { + { + put(task0_0, Collections.singleton(t1p0)); + put(task0_1, Collections.singleton(t1p1)); } }; subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), - new SubscriptionInfo(UUID.randomUUID(), activeTasks, standbyTasks, null).encode()) + new SubscriptionInfo(UUID.randomUUID(), activeTasks, standbyTasks, null).encode(), + Arrays.asList(t1p0, t1p1)) ); subscriptions.put("future-consumer", new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), - encodeFutureSubscription()) + encodeFutureSubscription(), + Collections.singletonList(t1p2)) ); createMockTaskManager(allTasks, allTasks, UUID.randomUUID(), builder); EasyMock.replay(taskManager); - partitionAssignor.configure(configProps()); + final Map<String, Object> props = configProps(); + props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + partitionAssignor.configure(props); final Map<String, ConsumerPartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); assertThat(assignment.size(), equalTo(2)); + + assertThat(assignment.get("consumer1").partitions(), equalTo(asList(t1p0, t1p1))); assertThat( AssignmentInfo.decode(assignment.get("consumer1").userData()), equalTo(new AssignmentInfo( @@ -1234,10 +1526,64 @@ public class StreamsPartitionAssignorTest { standbyTaskMap, Collections.emptyMap() ))); - assertThat(assignment.get("consumer1").partitions(), equalTo(asList(t1p0, t1p1))); - assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION))); - assertThat(assignment.get("future-consumer").partitions().size(), equalTo(0)); + + assertThat(assignment.get("future-consumer").partitions(), equalTo(Collections.singletonList(t1p2))); + assertThat( + AssignmentInfo.decode(assignment.get("future-consumer").userData()), + equalTo(new AssignmentInfo( + Collections.singletonList(task0_2), + futureStandbyTaskMap, + Collections.emptyMap() + ))); + } + + @Test + public void shouldReturnInterleavedAssignmentForOnlyFutureInstancesDuringVersionProbing() { + builder.addSource(null, "source1", null, null, null, "topic1"); + + final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2); + + subscriptions.put(c1, + new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("topic1"), + encodeFutureSubscription(), + Collections.emptyList()) + ); + subscriptions.put(c2, + new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("topic1"), + encodeFutureSubscription(), + Collections.emptyList()) + ); + + createMockTaskManager(allTasks, allTasks, UUID.randomUUID(), builder); + EasyMock.replay(taskManager); + final Map<String, Object> props = configProps(); + props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + partitionAssignor.configure(props); + final Map<String, ConsumerPartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + + assertThat(assignment.size(), equalTo(2)); + + assertThat(assignment.get(c1).partitions(), equalTo(asList(t1p0, t1p2))); + assertThat( + AssignmentInfo.decode(assignment.get(c1).userData()), + equalTo(new AssignmentInfo( + Arrays.asList(task0_0, task0_2), + Collections.emptyMap(), + Collections.emptyMap() + ))); + + + assertThat(assignment.get(c2).partitions(), equalTo(Collections.singletonList(t1p1))); + assertThat( + AssignmentInfo.decode(assignment.get(c2).userData()), + equalTo(new AssignmentInfo( + Collections.singletonList(task0_1), + Collections.emptyMap(), + Collections.emptyMap() + ))); } @Test @@ -1334,4 +1680,21 @@ public class StreamsPartitionAssignorTest { return info; } + + private void assertEquivalentAssignment(final Map<String, List<TaskId>> thisAssignment, + final Map<String, List<TaskId>> otherAssignment) { + assertEquals(thisAssignment.size(), otherAssignment.size()); + for (final Map.Entry<String, List<TaskId>> entry : thisAssignment.entrySet()) { + final String consumer = entry.getKey(); + assertTrue(otherAssignment.containsKey(consumer)); + + final List<TaskId> thisTaskList = entry.getValue(); + Collections.sort(thisTaskList); + final List<TaskId> otherTaskList = otherAssignment.get(consumer); + Collections.sort(otherTaskList); + + assertThat(thisTaskList, equalTo(otherTaskList)); + } + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 7e1ca7f..e46a4cc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -42,7 +42,6 @@ import org.junit.runner.RunWith; import java.io.File; import java.io.IOException; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -245,7 +244,8 @@ public class TaskManagerTest { @Test public void shouldCloseActiveUnAssignedSuspendedTasksWhenClosingRevokedTasks() { mockSingleActiveTask(); - EasyMock.expect(active.closeNotAssignedSuspendedTasks(taskId0Assignment.keySet())).andReturn(null).once(); + + expect(active.closeNotAssignedSuspendedTasks(taskId0Assignment.keySet())).andReturn(null).once(); expect(restoreConsumer.assignment()).andReturn(Collections.emptySet()); replay(); @@ -281,6 +281,7 @@ public class TaskManagerTest { // Need to call this twice so task manager doesn't consider all partitions "new" taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); + taskManager.setPartitionsToTaskId(taskId0PartitionToTaskId); taskManager.createTasks(taskId0Partitions); @@ -404,9 +405,7 @@ public class TaskManagerTest { @Test public void shouldInitializeNewActiveTasks() { - EasyMock.expect(restoreConsumer.assignment()).andReturn(Collections.emptySet()).once(); - EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions).once(); - active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject()); + active.initializeNewTasks(); expectLastCall(); replay(); @@ -416,9 +415,7 @@ public class TaskManagerTest { @Test public void shouldInitializeNewStandbyTasks() { - EasyMock.expect(restoreConsumer.assignment()).andReturn(Collections.emptySet()).once(); - EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions).once(); - active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject()); + standby.initializeNewTasks(); expectLastCall(); replay(); @@ -428,6 +425,7 @@ public class TaskManagerTest { @Test public void shouldRestoreStateFromChangeLogReader() { + EasyMock.expect(active.hasRestoringTasks()).andReturn(true).once(); EasyMock.expect(restoreConsumer.assignment()).andReturn(taskId0Partitions).once(); expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); active.updateRestored(taskId0Partitions); @@ -440,11 +438,9 @@ public class TaskManagerTest { @Test public void shouldResumeRestoredPartitions() { - EasyMock.expect(restoreConsumer.assignment()).andReturn(taskId0Partitions).once(); - expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); - expect(active.allTasksRunning()).andReturn(true); + expect(active.allTasksRunning()).andReturn(true).once(); expect(consumer.assignment()).andReturn(taskId0Partitions); - expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet()); + expect(standby.running()).andReturn(Collections.emptySet()); consumer.resume(taskId0Partitions); expectLastCall(); @@ -666,6 +662,7 @@ public class TaskManagerTest { } private void mockAssignStandbyPartitions(final long offset) { + expect(active.hasRestoringTasks()).andReturn(true).once(); final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class); expect(active.allTasksRunning()).andReturn(true); expect(standby.running()).andReturn(Collections.singletonList(task)); @@ -679,13 +676,6 @@ public class TaskManagerTest { EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions).once(); } - private void mockStandbyTaskExpectations() { - expect(standbyTaskCreator.createTasks(EasyMock.<Consumer<byte[], byte[]>>anyObject(), - EasyMock.eq(taskId0Assignment))) - .andReturn(Collections.singletonList(standbyTask)); - - } - private void mockSingleActiveTask() { expect(activeTaskCreator.createTasks(EasyMock.<Consumer<byte[], byte[]>>anyObject(), EasyMock.eq(taskId0Assignment))) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java index dc54c86..1443edf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java @@ -70,8 +70,8 @@ public class ClientStateTest { final TaskId tid1 = new TaskId(0, 1); final TaskId tid2 = new TaskId(0, 2); - client.addPreviousActiveTasks("consumer", Utils.mkSet(tid1, tid2)); - assertThat(client.previousActiveTasks(), equalTo(Utils.mkSet(tid1, tid2))); + client.addPreviousActiveTasks(Utils.mkSet(tid1, tid2)); + assertThat(client.prevActiveTasks(), equalTo(Utils.mkSet(tid1, tid2))); assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2))); } @@ -80,8 +80,8 @@ public class ClientStateTest { final TaskId tid1 = new TaskId(0, 1); final TaskId tid2 = new TaskId(0, 2); - client.addPreviousStandbyTasks("consumer", Utils.mkSet(tid1, tid2)); - assertThat(client.previousActiveTasks().size(), equalTo(0)); + client.addPreviousStandbyTasks(Utils.mkSet(tid1, tid2)); + assertThat(client.prevActiveTasks().size(), equalTo(0)); assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2))); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 19d7730..17d403f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -207,11 +207,11 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignTasksToClientWithPreviousStandbyTasks() { final ClientState client1 = createClient(p1, 1); - client1.addPreviousStandbyTasks("consumer", Utils.mkSet(task02)); + client1.addPreviousStandbyTasks(Utils.mkSet(task02)); final ClientState client2 = createClient(p2, 1); - client2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01)); + client2.addPreviousStandbyTasks(Utils.mkSet(task01)); final ClientState client3 = createClient(p3, 1); - client3.addPreviousStandbyTasks("consumer", Utils.mkSet(task00)); + client3.addPreviousStandbyTasks(Utils.mkSet(task00)); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02); @@ -225,9 +225,9 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() { final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00); - c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task01)); + c1.addPreviousStandbyTasks(Utils.mkSet(task01)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 2, task02); - c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01)); + c2.addPreviousStandbyTasks(Utils.mkSet(task01)); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02); @@ -455,9 +455,9 @@ public class StickyTaskAssignorTest { @Test public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() { final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task02); - c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task00)); + c1.addPreviousStandbyTasks(Utils.mkSet(task03, task00)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task00); - c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task02)); + c2.addPreviousStandbyTasks(Utils.mkSet(task01, task02)); createClient(p3, 1); createClient(p4, 1); @@ -577,14 +577,14 @@ public class StickyTaskAssignorTest { final TaskId task23 = new TaskId(2, 3); final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13); - c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, task11, task20, task21, task23)); + c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22); - c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); + c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); final ClientState c3 = createClientWithPreviousActiveTasks(p3, 1, task20, task21, task23); - c3.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, task12)); + c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12)); final ClientState newClient = createClient(p4, 1); - newClient.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23)); + newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23)); final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23); taskAssignor.assign(0); @@ -607,15 +607,15 @@ public class StickyTaskAssignorTest { final TaskId task23 = new TaskId(2, 3); final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13); - c1.addPreviousStandbyTasks("c1onsumer", Utils.mkSet(task00, task11, task20, task21, task23)); + c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22); - c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); + c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); final ClientState bounce1 = createClient(p3, 1); - bounce1.addPreviousStandbyTasks("consumer", Utils.mkSet(task20, task21, task23)); + bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23)); final ClientState bounce2 = createClient(p4, 1); - bounce2.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, task03, task10)); + bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10)); final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23); taskAssignor.assign(0); @@ -658,7 +658,7 @@ public class StickyTaskAssignorTest { final TaskId task06 = new TaskId(0, 6); final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, task06); final ClientState c2 = createClient(p2, 1); - c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task04, task05)); + c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05)); final ClientState newClient = createClient(p3, 1); final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task01, task02, task03, task04, task05, task06); @@ -705,7 +705,7 @@ public class StickyTaskAssignorTest { private ClientState createClientWithPreviousActiveTasks(final Integer processId, final int capacity, final TaskId... taskIds) { final ClientState clientState = new ClientState(capacity); - clientState.addPreviousActiveTasks("consumer", Utils.mkSet(taskIds)); + clientState.addPreviousActiveTasks(Utils.mkSet(taskIds)); clients.put(processId, clientState); return clientState; } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 98e6e8f..496f89a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -569,7 +569,7 @@ public class SmokeTestDriver extends SmokeTestUtil { } if (entry.getValue().getLast().value().longValue() != expectedCount) { - resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key)); + resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount); resultStream.println("\t outputEvents: " + entry.getValue()); return false; } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 0e07cac..185fa7c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; @@ -61,6 +62,8 @@ import static org.apache.kafka.streams.processor.internals.assignment.StreamsAss public class StreamsUpgradeTest { + private static final RebalanceProtocol REBALANCE_PROTOCOL = RebalanceProtocol.COOPERATIVE; + @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { if (args.length < 1) { @@ -123,26 +126,26 @@ public class StreamsUpgradeTest { // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) // 2. Task ids of previously running tasks // 3. Task ids of valid local states on the client's state directory. - final TaskManager taskManager = taskManger(); - final Set<TaskId> previousActiveTasks = taskManager.previousRunningTaskIds(); + final Set<TaskId> standbyTasks = taskManager.cachedTasksIds(); - standbyTasks.removeAll(previousActiveTasks); - final FutureSubscriptionInfo data = new FutureSubscriptionInfo( + final Set<TaskId> activeTasks = prepareForSubscription(taskManager, + topics, + standbyTasks, + REBALANCE_PROTOCOL); + return new FutureSubscriptionInfo( usedSubscriptionMetadataVersion, LATEST_SUPPORTED_VERSION + 1, taskManager.processId(), - previousActiveTasks, + activeTasks, standbyTasks, - userEndPoint()); - - taskManager.updateSubscriptionsFromMetadata(topics); - - return data.encode(); + userEndPoint()) + .encode(); } @Override - public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment, final ConsumerGroupMetadata metadata) { + public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment, + final ConsumerGroupMetadata metadata) { try { super.onAssignment(assignment, metadata); return; @@ -193,6 +196,7 @@ public class StreamsUpgradeTest { taskManager.setPartitionsToTaskId(partitionsToTaskId); taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks()); taskManager.updateSubscriptionsFromAssignment(partitions); + taskManager.setRebalanceInProgress(false); } @Override diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py index 7e5cc26..428db9b 100644 --- a/tests/kafkatest/tests/streams/streams_eos_test.py +++ b/tests/kafkatest/tests/streams/streams_eos_test.py @@ -159,7 +159,7 @@ class StreamsEosTest(KafkaTest): def wait_for_startup(self, monitor, processor): self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING") - self.wait_for(monitor, processor, "processed 500 records from topic") + self.wait_for(monitor, processor, "processed [0-9]* records from topic") def wait_for(self, monitor, processor, output): monitor.wait_until(output,