This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 133c33f KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386)
133c33f is described below
commit 133c33fde1c4d3b79196b72522f83a75cb6b0e65
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Mon Oct 7 09:27:09 2019 -0700
KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386)
Key improvements with this PR:
* tasks will remain available for IQ during a rebalance (but not during
restore)
* continue restoring and processing standby tasks during a rebalance
* continue processing active tasks during rebalance until the RecordQueue
is empty*
* only revoked tasks must suspended/closed
* StreamsPartitionAssignor tries to return tasks to their previous
consumers within a client
* but do not try to commit, for now (pending KAFKA-7312)
Reviewers: John Roesler <[email protected]>, Boyang Chen
<[email protected]>, Guozhang Wang <[email protected]>
---
checkstyle/suppressions.xml | 3 +
.../consumer/internals/ConsumerCoordinator.java | 20 +-
.../consumer/internals/SubscriptionState.java | 15 +-
.../clients/consumer/internals/FetcherTest.java | 31 +-
.../processor/internals/AssignedStreamsTasks.java | 15 +-
.../streams/processor/internals/StandbyTask.java | 19 -
.../processor/internals/StoreChangelogReader.java | 16 +-
.../streams/processor/internals/StreamTask.java | 8 +-
.../streams/processor/internals/StreamThread.java | 27 +-
.../internals/StreamsPartitionAssignor.java | 612 +++++++++++++++------
.../kafka/streams/processor/internals/Task.java | 8 +-
.../streams/processor/internals/TaskManager.java | 34 +-
.../assignment/AssignorConfiguration.java | 12 +-
.../internals/assignment/ClientState.java | 56 +-
.../internals/assignment/StickyTaskAssignor.java | 8 +-
.../processor/internals/AbstractTaskTest.java | 6 -
.../processor/internals/StandbyTaskTest.java | 23 -
.../internals/StreamsPartitionAssignorTest.java | 481 ++++++++++++++--
.../processor/internals/TaskManagerTest.java | 28 +-
.../internals/assignment/ClientStateTest.java | 8 +-
.../assignment/StickyTaskAssignorTest.java | 34 +-
.../kafka/streams/tests/SmokeTestDriver.java | 2 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 26 +-
tests/kafkatest/tests/streams/streams_eos_test.py | 2 +-
24 files changed, 1058 insertions(+), 436 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 8927849..2f21309 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -197,6 +197,9 @@
<suppress checks="MethodLength"
files="RocksDBWindowStoreTest.java"/>
+ <suppress checks="MemberName"
+ files="StreamsPartitionAssignorTest.java"/>
+
<suppress checks="ClassDataAbstractionCoupling"
files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b5b5ce2..6b39acb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -355,26 +355,29 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
Set<TopicPartition> addedPartitions = new
HashSet<>(assignedPartitions);
addedPartitions.removeAll(ownedPartitions);
- // Invoke user's revocation callback before changing assignment or
updating state
if (protocol == RebalanceProtocol.COOPERATIVE) {
Set<TopicPartition> revokedPartitions = new
HashSet<>(ownedPartitions);
revokedPartitions.removeAll(assignedPartitions);
- log.info("Updating with newly assigned partitions: {}, compare
with already owned partitions: {}, " +
- "newly added partitions: {}, revoking partitions: {}",
+ log.info("Updating assignment with\n" +
+ "now assigned partitions: {}\n" +
+ "compare with previously owned partitions: {}\n" +
+ "newly added partitions: {}\n" +
+ "revoked partitions: {}\n",
Utils.join(assignedPartitions, ", "),
Utils.join(ownedPartitions, ", "),
Utils.join(addedPartitions, ", "),
- Utils.join(revokedPartitions, ", "));
-
+ Utils.join(revokedPartitions, ", ")
+ );
if (!revokedPartitions.isEmpty()) {
- // revoke partitions that was previously owned but no longer
assigned;
- // note that we should only change the assignment AFTER we've
triggered
- // the revoke callback
+ // revoke partitions that were previously owned but no longer
assigned;
+ // note that we should only change the assignment (or update
the assignor's state)
+ // AFTER we've triggered the revoke callback
firstException.compareAndSet(null,
invokePartitionsRevoked(revokedPartitions));
// if revoked any partitions, need to re-join the group
afterwards
+ log.debug("Need to revoke partitions {} and re-join the
group", revokedPartitions);
requestRejoin();
}
}
@@ -679,7 +682,6 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
}
}
-
isLeader = false;
subscriptions.resetGroupSubscription();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 4641e5c..953505f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -270,8 +270,14 @@ public class SubscriptionState {
if (!this.partitionsAutoAssigned())
throw new IllegalArgumentException("Attempt to dynamically assign
partitions while manual assignment in use");
+ Map<TopicPartition, TopicPartitionState> assignedPartitionStates = new
HashMap<>(assignments.size());
+ for (TopicPartition tp : assignments) {
+ TopicPartitionState state = this.assignment.stateValue(tp);
+ if (state == null)
+ state = new TopicPartitionState();
+ assignedPartitionStates.put(tp, state);
+ }
- Map<TopicPartition, TopicPartitionState> assignedPartitionStates =
partitionToStateMap(assignments);
assignmentId++;
this.assignment.set(assignedPartitionStates);
}
@@ -674,13 +680,6 @@ public class SubscriptionState {
return rebalanceListener;
}
- private static Map<TopicPartition, TopicPartitionState>
partitionToStateMap(Collection<TopicPartition> assignments) {
- Map<TopicPartition, TopicPartitionState> map = new
HashMap<>(assignments.size());
- for (TopicPartition tp : assignments)
- map.put(tp, new TopicPartitionState());
- return map;
- }
-
private static class TopicPartitionState {
private FetchState fetchState;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index ff0afe9..9e281a7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -848,7 +848,7 @@ public class FetcherTest {
}
@Test
- public void testFetchDuringRebalance() {
+ public void testFetchDuringEagerRebalance() {
buildFetcher();
subscriptions.subscribe(singleton(topicName), listener);
@@ -859,7 +859,9 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
- // Now the rebalance happens and fetch positions are cleared
+ // Now the eager rebalance happens and fetch positions are cleared
+ subscriptions.assignFromSubscribed(Collections.emptyList());
+
subscriptions.assignFromSubscribed(singleton(tp0));
client.prepareResponse(fullFetchResponse(tp0, this.records,
Errors.NONE, 100L, 0));
consumerClient.poll(time.timer(0));
@@ -869,6 +871,31 @@ public class FetcherTest {
}
@Test
+ public void testFetchDuringCooperativeRebalance() {
+ buildFetcher();
+
+ subscriptions.subscribe(singleton(topicName), listener);
+ subscriptions.assignFromSubscribed(singleton(tp0));
+ subscriptions.seek(tp0, 0);
+
+ client.updateMetadata(initialUpdateResponse);
+
+ assertEquals(1, fetcher.sendFetches());
+
+ // Now the cooperative rebalance happens and fetch positions are NOT
cleared for unrevoked partitions
+ subscriptions.assignFromSubscribed(singleton(tp0));
+
+ client.prepareResponse(fullFetchResponse(tp0, this.records,
Errors.NONE, 100L, 0));
+ consumerClient.poll(time.timer(0));
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords = fetchedRecords();
+
+ // The active fetch should NOT be ignored since the position for tp0
is still valid
+ assertEquals(1, fetchedRecords.size());
+ assertEquals(3, fetchedRecords.get(tp0).size());
+ }
+
+ @Test
public void testInFlightFetchOnPausedPartition() {
buildFetcher();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index da0fc20..65c4c95 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -74,11 +74,15 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
@Override
void closeTask(final StreamTask task, final boolean clean) {
if (suspended.containsKey(task.id())) {
- task.closeSuspended(clean, false, null);
+ task.closeSuspended(clean, null);
} else {
task.close(clean, false);
}
}
+
+ boolean hasRestoringTasks() {
+ return !restoring.isEmpty();
+ }
Set<TaskId> suspendedTaskIds() {
return suspended.keySet();
@@ -107,7 +111,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
} else if (restoring.containsKey(task)) {
revokedRestoringTasks.add(task);
} else if (!suspended.containsKey(task)) {
- log.warn("Task {} was revoked but cannot be found in the
assignment", task);
+ log.warn("Task {} was revoked but cannot be found in the
assignment, may have been closed due to error", task);
}
}
@@ -131,7 +135,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
task.suspend();
suspended.put(id, task);
} catch (final TaskMigratedException closeAsZombieAndSwallow) {
- // as we suspend a task, we are either shutting down or
rebalancing, thus, we swallow and move on
+ // swallow and move on since we are rebalancing
log.info("Failed to suspend {} {} since it got migrated to
another thread already. " +
"Closing it as zombie and move on.", taskTypeName, id);
firstException.compareAndSet(null, closeZombieTask(task));
@@ -248,7 +252,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
try {
final boolean clean = !isZombie;
- task.closeSuspended(clean, isZombie, null);
+ task.closeSuspended(clean, null);
} catch (final RuntimeException e) {
log.error("Failed to close suspended {} {} due to the following
error:", taskTypeName, task.id(), e);
return e;
@@ -264,7 +268,6 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
for (final TaskId revokedTask : revokedTasks) {
final StreamTask suspendedTask = suspended.get(revokedTask);
- // task may not be in the suspended tasks if it was closed due to
some error
if (suspendedTask != null) {
firstException.compareAndSet(null, closeSuspended(false,
suspendedTask));
} else {
@@ -335,7 +338,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
return true;
} else {
log.warn("Couldn't resume task {} assigned partitions {}, task
partitions {}", taskId, partitions, task.partitions());
- task.closeSuspended(true, false, null);
+ task.closeSuspended(true, null);
}
}
return false;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index f10c25b..fbc116a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -120,18 +120,6 @@ public class StandbyTask extends AbstractTask {
commitNeeded = false;
}
- /**
- * <pre>
- * - flush store
- * - checkpoint store
- * </pre>
- */
- @Override
- public void suspend() {
- log.debug("Suspending");
- flushAndCheckpointState();
- }
-
private void flushAndCheckpointState() {
stateMgr.flush();
stateMgr.checkpoint(Collections.emptyMap());
@@ -163,13 +151,6 @@ public class StandbyTask extends AbstractTask {
taskClosed = true;
}
- @Override
- public void closeSuspended(final boolean clean,
- final boolean isZombie,
- final RuntimeException e) {
- close(clean, isZombie);
- }
-
/**
* Updates a state store using records from one change log partition
*
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 6c6a1c4..55a33c0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -79,8 +79,7 @@ public class StoreChangelogReader implements ChangelogReader {
initialize(active);
}
- if (needsRestoring.isEmpty() ||
restoreConsumer.assignment().isEmpty()) {
- restoreConsumer.unsubscribe();
+ if (checkForCompletedRestoration()) {
return completedRestorers;
}
@@ -116,9 +115,7 @@ public class StoreChangelogReader implements
ChangelogReader {
needsRestoring.removeAll(completedRestorers);
- if (needsRestoring.isEmpty()) {
- restoreConsumer.unsubscribe();
- }
+ checkForCompletedRestoration();
return completedRestorers;
}
@@ -337,7 +334,14 @@ public class StoreChangelogReader implements
ChangelogReader {
return nextPosition;
}
-
+ private boolean checkForCompletedRestoration() {
+ if (needsRestoring.isEmpty()) {
+ log.info("Finished restoring all active tasks");
+ restoreConsumer.unsubscribe();
+ return true;
+ }
+ return false;
+ }
private boolean hasPartition(final TopicPartition topicPartition) {
final List<PartitionInfo> partitions =
partitionInfo.get(topicPartition.topic());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 40466b3..780fe1f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -573,7 +573,6 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
*/
- @Override
public void suspend() {
log.debug("Suspending");
suspend(true, false);
@@ -687,10 +686,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
}
// helper to avoid calling suspend() twice if a suspended task is not
reassigned and closed
- @Override
- public void closeSuspended(final boolean clean,
- final boolean isZombie,
- RuntimeException firstException) {
+ void closeSuspended(final boolean clean, RuntimeException firstException) {
try {
closeStateManager(clean);
} catch (final RuntimeException e) {
@@ -742,7 +738,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
log.error("Could not close task due to the following error:", e);
}
- closeSuspended(clean, isZombie, firstException);
+ closeSuspended(clean, firstException);
taskClosed = true;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 29e1bc7..c71ff27 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -132,13 +132,13 @@ public class StreamThread extends Thread {
*/
public enum State implements ThreadStateTransitionValidator {
- CREATED(1, 5), // 0
- STARTING(2, 3, 5), // 1
- PARTITIONS_REVOKED(3, 5), // 2
- PARTITIONS_ASSIGNED(2, 3, 4, 5), // 3
- RUNNING(2, 3, 5), // 4
- PENDING_SHUTDOWN(6), // 5
- DEAD; // 6
+ CREATED(1, 5), // 0
+ STARTING(2, 3, 5), // 1
+ PARTITIONS_REVOKED(2, 3, 5), // 2
+ PARTITIONS_ASSIGNED(2, 3, 4, 5), // 3
+ RUNNING(2, 3, 5), // 4
+ PENDING_SHUTDOWN(6), // 5
+ DEAD; // 6
private final Set<Integer> validTransitions = new HashSet<>();
@@ -734,9 +734,9 @@ public class StreamThread extends Thread {
// to unblock the restoration as soon as possible
records = pollRequests(Duration.ZERO);
} else if (state == State.PARTITIONS_REVOKED) {
- // try to fetch some records with normal poll time
- // in order to wait long enough to get the join response
- records = pollRequests(pollTime);
+ // try to fetch som records with zero poll millis to unblock
+ // other useful work while waiting for the join response
+ records = pollRequests(Duration.ZERO);
} else if (state == State.RUNNING || state == State.STARTING) {
// try to fetch some records with normal poll time
// in order to get long polling
@@ -970,7 +970,12 @@ public class StreamThread extends Thread {
}
}
- lastCommitMs = now;
+ if (committed == -1) {
+ log.trace("Unable to commit as we are in the middle of a
rebalance, will try again when it completes.");
+ } else {
+ lastCommitMs = now;
+ }
+
processStandbyRecords = true;
} else {
committed = taskManager.maybeCommitActiveTasksPerUserRequested();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 2e6c9c0..8b2c95a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -47,15 +47,18 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TreeMap;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static java.util.UUID.randomUUID;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION;
@@ -63,20 +66,21 @@ import static
org.apache.kafka.streams.processor.internals.assignment.StreamsAss
import static
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
import static
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_FIVE;
import static
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_FOUR;
-import static
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_ONE;
import static
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_THREE;
import static
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_TWO;
+import static
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.VERSION_ONE;
public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Configurable {
+
private Logger log;
private String logPrefix;
private static class AssignedPartition implements
Comparable<AssignedPartition> {
+
private final TaskId taskId;
private final TopicPartition partition;
- AssignedPartition(final TaskId taskId,
- final TopicPartition partition) {
+ AssignedPartition(final TaskId taskId, final TopicPartition partition)
{
this.taskId = taskId;
this.partition = partition;
}
@@ -103,6 +107,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
private static class ClientMetadata {
+
private final HostInfo hostInfo;
private final Set<String> consumers;
private final ClientState state;
@@ -132,12 +137,15 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
state = new ClientState();
}
- void addConsumer(final String consumerMemberId,
- final SubscriptionInfo info) {
+ void addConsumer(final String consumerMemberId, final
List<TopicPartition> ownedPartitions) {
consumers.add(consumerMemberId);
- state.addPreviousActiveTasks(consumerMemberId, info.prevTasks());
- state.addPreviousStandbyTasks(consumerMemberId,
info.standbyTasks());
state.incrementCapacity();
+ state.addOwnedPartitions(ownedPartitions, consumerMemberId);
+ }
+
+ void addPreviousTasks(final SubscriptionInfo info) {
+ state.addPreviousActiveTasks(info.prevTasks());
+ state.addPreviousStandbyTasks(info.standbyTasks());
}
@Override
@@ -177,9 +185,9 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
/**
- * We need to have the PartitionAssignor and its StreamThread to be
mutually accessible
- * since the former needs later's cached metadata while sending
subscriptions,
- * and the latter needs former's returned assignment when adding tasks.
+ * We need to have the PartitionAssignor and its StreamThread to be
mutually accessible since the former needs
+ * later's cached metadata while sending subscriptions, and the latter
needs former's returned assignment when
+ * adding tasks.
*
* @throws KafkaException if the stream thread is not specified
*/
@@ -189,7 +197,8 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
logPrefix = assignorConfiguration.logPrefix();
log = new LogContext(logPrefix).logger(getClass());
- usedSubscriptionMetadataVersion =
assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion);
+ usedSubscriptionMetadataVersion = assignorConfiguration
+ .configuredMetadataVersion(usedSubscriptionMetadataVersion);
taskManager = assignorConfiguration.getTaskManager();
assignmentErrorCode =
assignorConfiguration.getAssignmentErrorCode(configs);
numStandbyReplicas = assignorConfiguration.getNumStandbyReplicas();
@@ -221,37 +230,64 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Task ids of previously running tasks
// 3. Task ids of valid local states on the client's state directory.
-
- final Set<TaskId> previousActiveTasks =
taskManager.previousRunningTaskIds();
final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
- standbyTasks.removeAll(previousActiveTasks);
- final SubscriptionInfo data = new SubscriptionInfo(
+ final Set<TaskId> activeTasks = prepareForSubscription(taskManager,
+ topics,
+ standbyTasks,
+ rebalanceProtocol);
+ return new SubscriptionInfo(
usedSubscriptionMetadataVersion,
taskManager.processId(),
- previousActiveTasks,
+ activeTasks,
standbyTasks,
- userEndPoint);
+ userEndPoint)
+ .encode();
+ }
+
+ protected static Set<TaskId> prepareForSubscription(final TaskManager
taskManager,
+ final Set<String> topics,
+ final Set<TaskId> standbyTasks,
+ final RebalanceProtocol rebalanceProtocol) {
+ // Any tasks that are not yet running are counted as standby tasks for
assignment purposes,
+ // along with any old tasks for which we still found state on disk
+ final Set<TaskId> activeTasks;
+
+ switch (rebalanceProtocol) {
+ case EAGER:
+ // In eager, onPartitionsRevoked is called first and we must
get the previously saved running task ids
+ activeTasks = taskManager.previousRunningTaskIds();
+ standbyTasks.removeAll(activeTasks);
+ break;
+ case COOPERATIVE:
+ // In cooperative, we will use the encoded ownedPartitions to
determine the running tasks
+ activeTasks = Collections.emptySet();
+ standbyTasks.removeAll(taskManager.activeTaskIds());
+ break;
+ default:
+ throw new IllegalStateException("Streams partition assignor's
rebalance protocol is unknown");
+ }
taskManager.updateSubscriptionsFromMetadata(topics);
+ taskManager.setRebalanceInProgress(true);
- return data.encode();
+ return activeTasks;
}
private Map<String, Assignment> errorAssignment(final Map<UUID,
ClientMetadata> clientsMetadata,
final String topic,
final int errorCode) {
log.error("{} is unknown yet during rebalance," +
- " please make sure they have been pre-created before
starting the Streams application.", topic);
+ " please make sure they have been pre-created before starting the
Streams application.", topic);
final Map<String, Assignment> assignment = new HashMap<>();
for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
for (final String consumerId : clientMetadata.consumers) {
assignment.put(consumerId, new Assignment(
Collections.emptyList(),
new AssignmentInfo(LATEST_SUPPORTED_VERSION,
- Collections.emptyList(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- errorCode).encode()
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ errorCode).encode()
));
}
}
@@ -283,7 +319,12 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
final Map<String, Subscription> subscriptions =
groupSubscription.groupSubscription();
// construct the client metadata from the decoded subscription info
final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>();
- final Set<String> futureConsumers = new HashSet<>();
+ final Set<TopicPartition> allOwnedPartitions = new HashSet<>();
+
+ // keep track of any future consumers in a "dummy" Client since we
can't decipher their subscription
+ final UUID futureId = randomUUID();
+ final ClientMetadata futureClient = new ClientMetadata(null);
+ clientMetadataMap.put(futureId, futureClient);
int minReceivedMetadataVersion = LATEST_SUPPORTED_VERSION;
int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
@@ -292,58 +333,59 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
for (final Map.Entry<String, Subscription> entry :
subscriptions.entrySet()) {
final String consumerId = entry.getKey();
final Subscription subscription = entry.getValue();
-
final SubscriptionInfo info =
SubscriptionInfo.decode(subscription.userData());
final int usedVersion = info.version();
+
+ minReceivedMetadataVersion = updateMinReceivedVersion(usedVersion,
minReceivedMetadataVersion);
+ minSupportedMetadataVersion =
updateMinSupportedVersion(info.latestSupportedVersion(),
minSupportedMetadataVersion);
+
+ final UUID processId;
if (usedVersion > LATEST_SUPPORTED_VERSION) {
futureMetadataVersion = usedVersion;
- futureConsumers.add(consumerId);
- continue;
- }
- if (usedVersion < minReceivedMetadataVersion) {
- minReceivedMetadataVersion = usedVersion;
+ processId = futureId;
+ } else {
+ processId = info.processId();
}
- final int latestSupportedVersion = info.latestSupportedVersion();
- if (latestSupportedVersion < minSupportedMetadataVersion) {
- minSupportedMetadataVersion = latestSupportedVersion;
- }
+ ClientMetadata clientMetadata = clientMetadataMap.get(processId);
// create the new client metadata if necessary
- ClientMetadata clientMetadata =
clientMetadataMap.get(info.processId());
-
if (clientMetadata == null) {
clientMetadata = new ClientMetadata(info.userEndPoint());
clientMetadataMap.put(info.processId(), clientMetadata);
}
- // add the consumer to the client
- clientMetadata.addConsumer(consumerId, info);
+ // add the consumer and any info its its subscription to the client
+ clientMetadata.addConsumer(consumerId,
subscription.ownedPartitions());
+ allOwnedPartitions.addAll(subscription.ownedPartitions());
+ if (info.prevTasks() != null && info.standbyTasks() != null) {
+ clientMetadata.addPreviousTasks(info);
+ }
}
final boolean versionProbing;
if (futureMetadataVersion == UNKNOWN) {
versionProbing = false;
+ clientMetadataMap.remove(futureId);
+ } else if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
+ versionProbing = true;
+ log.info("Received a future (version probing) subscription
(version: {})."
+ + " Sending assignment back (with supported version {}).",
+ futureMetadataVersion,
+ minSupportedMetadataVersion);
+
} else {
- if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
- log.info("Received a future (version probing) subscription
(version: {})."
- + " Sending empty assignment back (with supported
version {}).",
- futureMetadataVersion,
- LATEST_SUPPORTED_VERSION);
- versionProbing = true;
- } else {
- throw new IllegalStateException(
- "Received a future (version probing) subscription
(version: " + futureMetadataVersion
- + ") and an incompatible pre Kafka 2.0 subscription
(version: " + minReceivedMetadataVersion
- + ") at the same time."
- );
- }
+ throw new IllegalStateException(
+ "Received a future (version probing) subscription (version: "
+ futureMetadataVersion
+ + ") and an incompatible pre Kafka 2.0 subscription
(version: " + minReceivedMetadataVersion
+ + ") at the same time."
+ );
}
if (minReceivedMetadataVersion < LATEST_SUPPORTED_VERSION) {
log.info("Downgrading metadata to version {}. Latest supported
version is {}.",
- minReceivedMetadataVersion,
- LATEST_SUPPORTED_VERSION);
+ minReceivedMetadataVersion,
+ LATEST_SUPPORTED_VERSION);
}
log.debug("Constructed client metadata {} from the member
subscriptions.", clientMetadataMap);
@@ -361,9 +403,10 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
if
(!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
!metadata.topics().contains(topic)) {
log.error("Missing source topic {} during assignment.
Returning error {}.",
- topic,
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
+ topic,
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
return new GroupAssignment(
- errorAssignment(clientMetadataMap, topic,
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+ errorAssignment(clientMetadataMap, topic,
+
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
);
}
}
@@ -378,7 +421,8 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
for (final InternalTopologyBuilder.TopicsInfo topicsInfo :
topicGroups.values()) {
for (final String topicName :
topicsInfo.repartitionSourceTopics.keySet()) {
- final Optional<Integer> maybeNumPartitions =
repartitionTopicMetadata.get(topicName).numberOfPartitions();
+ final Optional<Integer> maybeNumPartitions =
repartitionTopicMetadata.get(topicName)
+ .numberOfPartitions();
Integer numPartitions = null;
if (!maybeNumPartitions.isPresent()) {
@@ -395,7 +439,8 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
// map().join().join(map())
if
(repartitionTopicMetadata.containsKey(sourceTopicName)) {
if
(repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent())
{
- numPartitionsCandidate =
repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
+ numPartitionsCandidate =
+
repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
}
} else {
final Integer count =
metadata.partitionCountForTopic(sourceTopicName);
@@ -427,7 +472,6 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
} while (numPartitionsNeeded);
-
// ensure the co-partitioning topics within the group have the same
number of partitions,
// and enforce the number of partitions for those repartition topics
to be the same if they
// are co-partitioned as well.
@@ -470,19 +514,23 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
final Map<TaskId, Set<TopicPartition>> partitionsForTask =
partitionGrouper.partitionGroups(sourceTopicsByGroup,
fullMetadata);
+ final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
+
// check if all partitions are assigned, and there are no duplicates
of partitions in multiple tasks
final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> entry :
partitionsForTask.entrySet()) {
+ final TaskId id = entry.getKey();
final Set<TopicPartition> partitions = entry.getValue();
+
for (final TopicPartition partition : partitions) {
+ taskForPartition.put(partition, id);
if (allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is assigned to more than one tasks:
{}", partition, partitionsForTask);
}
}
allAssignedPartitions.addAll(partitions);
- final TaskId id = entry.getKey();
tasksByTopicGroup.computeIfAbsent(id.topicGroupId, k -> new
HashSet<>()).add(id);
}
for (final String topic : allSourceTopics) {
@@ -491,13 +539,15 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
log.warn("No partitions found for topic {}", topic);
} else {
for (final PartitionInfo partitionInfo : partitionInfoList) {
- final TopicPartition partition = new
TopicPartition(partitionInfo.topic(), partitionInfo.partition());
+ final TopicPartition partition = new
TopicPartition(partitionInfo.topic(),
+ partitionInfo.partition());
if (!allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is not assigned to any tasks:
{}"
- + " Possible causes of a partition not
getting assigned"
- + " is that another topic defined in the
topology has not been"
- + " created when starting your streams
application,"
- + " resulting in no tasks created for
this topology at all.", partition, partitionsForTask);
+ + " Possible causes of a partition not getting
assigned"
+ + " is that another topic defined in the
topology has not been"
+ + " created when starting your streams
application,"
+ + " resulting in no tasks created for this
topology at all.", partition,
+ partitionsForTask);
}
}
}
@@ -533,15 +583,32 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
// ---------------- Step Two ---------------- //
- // assign tasks to clients
final Map<UUID, ClientState> states = new HashMap<>();
for (final Map.Entry<UUID, ClientMetadata> entry :
clientMetadataMap.entrySet()) {
- states.put(entry.getKey(), entry.getValue().state);
+ final ClientState state = entry.getValue().state;
+ states.put(entry.getKey(), state);
+
+ // Either the active tasks (eager) OR the owned partitions
(cooperative) were encoded in the subscription
+ // according to the rebalancing protocol, so convert any
partitions in a client to tasks where necessary
+ if (!state.ownedPartitions().isEmpty()) {
+ final Set<TaskId> previousActiveTasks = new HashSet<>();
+ for (final Map.Entry<TopicPartition, String> partitionEntry :
state.ownedPartitions().entrySet()) {
+ final TopicPartition tp = partitionEntry.getKey();
+ final TaskId task = taskForPartition.get(tp);
+ if (task != null) {
+ previousActiveTasks.add(task);
+ } else {
+ log.error("No task found for topic partition {}", tp);
+ }
+ }
+ state.addPreviousActiveTasks(previousActiveTasks);
+ }
}
log.debug("Assigning tasks {} to clients {} with number of replicas
{}",
- partitionsForTask.keySet(), states, numStandbyReplicas);
+ partitionsForTask.keySet(), states, numStandbyReplicas);
+ // assign tasks to clients
final StickyTaskAssignor<UUID> taskAssignor = new
StickyTaskAssignor<>(states, partitionsForTask.keySet());
taskAssignor.assign(numStandbyReplicas);
@@ -577,7 +644,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
clientMetadataMap,
partitionsForTask,
partitionsByHostState,
- futureConsumers,
+ allOwnedPartitions,
minReceivedMetadataVersion,
minSupportedMetadataVersion
);
@@ -586,6 +653,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
clientMetadataMap,
partitionsForTask,
partitionsByHostState,
+ allOwnedPartitions,
minReceivedMetadataVersion,
minSupportedMetadataVersion
);
@@ -594,132 +662,165 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
return new GroupAssignment(assignment);
}
- private static Map<String, Assignment> computeNewAssignment(final
Map<UUID, ClientMetadata> clientsMetadata,
- final
Map<TaskId, Set<TopicPartition>> partitionsForTask,
- final
Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
- final int
minUserMetadataVersion,
- final int
minSupportedMetadataVersion) {
+ private Map<String, Assignment> computeNewAssignment(final Map<UUID,
ClientMetadata> clientsMetadata,
+ final Map<TaskId,
Set<TopicPartition>> partitionsForTask,
+ final Map<HostInfo,
Set<TopicPartition>> partitionsByHostState,
+ final
Set<TopicPartition> allOwnedPartitions,
+ final int
minUserMetadataVersion,
+ final int
minSupportedMetadataVersion) {
+ // keep track of whether a 2nd rebalance is unavoidable so we can skip
trying to get a completely sticky assignment
+ boolean rebalanceRequired = false;
final Map<String, Assignment> assignment = new HashMap<>();
// within the client, distribute tasks to its owned consumers
- for (final Map.Entry<UUID, ClientMetadata> entry :
clientsMetadata.entrySet()) {
- final Set<String> consumers = entry.getValue().consumers;
- final ClientState state = entry.getValue().state;
-
- final List<List<TaskId>> interleavedActive =
- interleaveTasksByGroupId(state.activeTasks(),
consumers.size());
- final List<List<TaskId>> interleavedStandby =
- interleaveTasksByGroupId(state.standbyTasks(),
consumers.size());
-
- int consumerTaskIndex = 0;
-
- for (final String consumer : consumers) {
- final List<TaskId> activeTasks =
interleavedActive.get(consumerTaskIndex);
-
- // These will be filled in by
buildAssignedActiveTaskAndPartitionsList below
- final List<TopicPartition> activePartitionsList = new
ArrayList<>();
- final List<TaskId> assignedActiveList = new ArrayList<>();
-
- buildAssignedActiveTaskAndPartitionsList(activeTasks,
activePartitionsList, assignedActiveList, partitionsForTask);
+ for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
+ final ClientState state = clientMetadata.state;
+ final Set<String> consumers = clientMetadata.consumers;
+ Map<String, List<TaskId>> activeTaskAssignments;
+
+ // Try to avoid triggering another rebalance by giving active
tasks back to their previous owners within a
+ // client, without violating load balance. If we already know
another rebalance will be required, or the
+ // client had no owned partitions, try to balance the workload as
evenly as possible by interleaving the
+ // tasks among consumers and hopefully spreading the heavier
subtopologies evenly across threads.
+ if (rebalanceRequired || state.ownedPartitions().isEmpty()) {
+ activeTaskAssignments =
interleaveConsumerTasksByGroupId(state.activeTasks(), consumers);
+ } else if ((activeTaskAssignments =
tryStickyAndBalancedTaskAssignmentWithinClient(state, consumers,
partitionsForTask, allOwnedPartitions))
+ .equals(Collections.emptyMap())) {
+ rebalanceRequired = true;
+ activeTaskAssignments =
interleaveConsumerTasksByGroupId(state.activeTasks(), consumers);
+ }
- final Map<TaskId, Set<TopicPartition>> standby = new
HashMap<>();
- if (!state.standbyTasks().isEmpty()) {
- final List<TaskId> assignedStandbyList =
interleavedStandby.get(consumerTaskIndex);
- for (final TaskId taskId : assignedStandbyList) {
- standby.computeIfAbsent(taskId, k -> new
HashSet<>()).addAll(partitionsForTask.get(taskId));
- }
- }
+ final Map<String, List<TaskId>> interleavedStandby =
+ interleaveConsumerTasksByGroupId(state.standbyTasks(),
consumers);
- consumerTaskIndex++;
-
- // finally, encode the assignment before sending back to
coordinator
- assignment.put(
- consumer,
- new Assignment(
- activePartitionsList,
- new AssignmentInfo(
- minUserMetadataVersion,
- minSupportedMetadataVersion,
- assignedActiveList,
- standby,
- partitionsByHostState,
- 0
- ).encode()
- )
- );
- }
+ addClientAssignments(
+ assignment,
+ clientMetadata,
+ partitionsForTask,
+ partitionsByHostState,
+ allOwnedPartitions,
+ activeTaskAssignments,
+ interleavedStandby,
+ minUserMetadataVersion,
+ minSupportedMetadataVersion);
}
return assignment;
}
- private static Map<String, Assignment> versionProbingAssignment(final
Map<UUID, ClientMetadata> clientsMetadata,
- final
Map<TaskId, Set<TopicPartition>> partitionsForTask,
- final
Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
- final
Set<String> futureConsumers,
- final int
minUserMetadataVersion,
- final int
minSupportedMetadataVersion) {
+ private Map<String, Assignment> versionProbingAssignment(final Map<UUID,
ClientMetadata> clientsMetadata,
+ final Map<TaskId,
Set<TopicPartition>> partitionsForTask,
+ final
Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
+ final
Set<TopicPartition> allOwnedPartitions,
+ final int
minUserMetadataVersion,
+ final int
minSupportedMetadataVersion) {
final Map<String, Assignment> assignment = new HashMap<>();
- // assign previously assigned tasks to "old consumers"
+ // Since we know another rebalance will be triggered anyway, just try
and generate a balanced assignment
+ // (without violating cooperative protocol) now so that on the second
rebalance we can just give tasks
+ // back to their previous owners
+ // within the client, distribute tasks to its owned consumers
for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
- for (final String consumerId : clientMetadata.consumers) {
+ final ClientState state = clientMetadata.state;
- if (futureConsumers.contains(consumerId)) {
- continue;
- }
+ final Map<String, List<TaskId>> interleavedActive =
+ interleaveConsumerTasksByGroupId(state.activeTasks(),
clientMetadata.consumers);
+ final Map<String, List<TaskId>> interleavedStandby =
+ interleaveConsumerTasksByGroupId(state.standbyTasks(),
clientMetadata.consumers);
- // Return the same active tasks that were claimed in the
subscription
- final List<TaskId> activeTasks = new
ArrayList<>(clientMetadata.state.prevActiveTasksForConsumer(consumerId));
-
- // These will be filled in by
buildAssignedActiveTaskAndPartitionsList below
- final List<TopicPartition> activePartitionsList = new
ArrayList<>();
- final List<TaskId> assignedActiveList = new ArrayList<>();
-
- buildAssignedActiveTaskAndPartitionsList(activeTasks,
activePartitionsList, assignedActiveList, partitionsForTask);
+ addClientAssignments(
+ assignment,
+ clientMetadata,
+ partitionsForTask,
+ partitionsByHostState,
+ allOwnedPartitions,
+ interleavedActive,
+ interleavedStandby,
+ minUserMetadataVersion,
+ minSupportedMetadataVersion);
+ }
- // Return the same standby tasks that were claimed in the
subscription
- final Map<TaskId, Set<TopicPartition>> standbyTasks = new
HashMap<>();
- for (final TaskId taskId :
clientMetadata.state.prevStandbyTasksForConsumer(consumerId)) {
- standbyTasks.put(taskId, partitionsForTask.get(taskId));
- }
+ return assignment;
+ }
- assignment.put(consumerId, new Assignment(
+ private void addClientAssignments(final Map<String, Assignment> assignment,
+ final ClientMetadata clientMetadata,
+ final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
+ final Map<HostInfo, Set<TopicPartition>>
partitionsByHostState,
+ final Set<TopicPartition>
allOwnedPartitions,
+ final Map<String, List<TaskId>>
activeTaskAssignments,
+ final Map<String, List<TaskId>>
standbyTaskAssignments,
+ final int minUserMetadataVersion,
+ final int minSupportedMetadataVersion) {
+
+ // Loop through the consumers and build their assignment
+ for (final String consumer : clientMetadata.consumers) {
+ final List<TaskId> activeTasksForConsumer =
activeTaskAssignments.get(consumer);
+
+ // These will be filled in by
buildAssignedActiveTaskAndPartitionsList below
+ final List<TopicPartition> activePartitionsList = new
ArrayList<>();
+ final List<TaskId> assignedActiveList = new ArrayList<>();
+
+ buildAssignedActiveTaskAndPartitionsList(consumer,
+ clientMetadata.state,
+ activeTasksForConsumer,
+ partitionsForTask,
+ allOwnedPartitions,
+ activePartitionsList,
+ assignedActiveList);
+
+ final Map<TaskId, Set<TopicPartition>> standbyTaskMap =
+ buildStandbyTaskMap(standbyTaskAssignments.get(consumer),
partitionsForTask);
+
+ // finally, encode the assignment and insert into map with all
assignments
+ assignment.put(
+ consumer,
+ new Assignment(
activePartitionsList,
new AssignmentInfo(
minUserMetadataVersion,
minSupportedMetadataVersion,
assignedActiveList,
- standbyTasks,
+ standbyTaskMap,
partitionsByHostState,
- 0)
- .encode()
- ));
- }
- }
-
- // add empty assignment for "future version" clients (ie, empty
version probing response)
- for (final String consumerId : futureConsumers) {
- assignment.put(consumerId, new Assignment(
- Collections.emptyList(),
- new AssignmentInfo(minUserMetadataVersion,
minSupportedMetadataVersion).encode()
- ));
+ AssignorError.NONE.code()
+ ).encode()
+ )
+ );
}
-
- return assignment;
}
- private static void buildAssignedActiveTaskAndPartitionsList(final
List<TaskId> activeTasks,
- final
List<TopicPartition> activePartitionsList,
- final
List<TaskId> assignedActiveList,
- final
Map<TaskId, Set<TopicPartition>> partitionsForTask) {
+ private void buildAssignedActiveTaskAndPartitionsList(final String
consumer,
+ final ClientState
clientState,
+ final List<TaskId>
activeTasksForConsumer,
+ final Map<TaskId,
Set<TopicPartition>> partitionsForTask,
+ final
Set<TopicPartition> allOwnedPartitions,
+ final
List<TopicPartition> activePartitionsList,
+ final List<TaskId>
assignedActiveList) {
final List<AssignedPartition> assignedPartitions = new ArrayList<>();
// Build up list of all assigned partition-task pairs
- for (final TaskId taskId : activeTasks) {
+ for (final TaskId taskId : activeTasksForConsumer) {
+ final List<AssignedPartition> assignedPartitionsForTask = new
ArrayList<>();
for (final TopicPartition partition :
partitionsForTask.get(taskId)) {
- assignedPartitions.add(new AssignedPartition(taskId,
partition));
+ final String oldOwner =
clientState.ownedPartitions().get(partition);
+ final boolean newPartitionForConsumer = oldOwner == null ||
!oldOwner.equals(consumer);
+
+ // If the partition is new to this consumer but is still owned
by another, remove from the assignment
+ // until it has been revoked and can safely be reassigned
according the COOPERATIVE protocol
+ if (newPartitionForConsumer &&
allOwnedPartitions.contains(partition)) {
+ log.debug("Removing task {} from assignment until it is
safely revoked", taskId);
+ clientState.removeFromAssignment(taskId);
+ // Clear the assigned partitions list for this task if any
partition can not safely be assigned,
+ // so as not to encode a partial task
+ assignedPartitionsForTask.clear();
+ break;
+ } else {
+ assignedPartitionsForTask.add(new
AssignedPartition(taskId, partition));
+ }
}
+ // assignedPartitionsForTask will either contain all partitions
for the task or be empty, so just add all
+ assignedPartitions.addAll(assignedPartitionsForTask);
}
// Add one copy of a task for each corresponding partition, so the
receiver can determine the task <-> tp mapping
@@ -730,17 +831,175 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
}
- // visible for testing
- static List<List<TaskId>> interleaveTasksByGroupId(final
Collection<TaskId> taskIds, final int numberThreads) {
+ private static Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final
Collection<TaskId> standbys,
+ final
Map<TaskId, Set<TopicPartition>> partitionsForTask) {
+ final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new
HashMap<>();
+ for (final TaskId task : standbys) {
+ standbyTaskMap.put(task, partitionsForTask.get(task));
+ }
+ return standbyTaskMap;
+ }
+
+ /**
+ * Generates an assignment that tries to satisfy two conditions: no active
task previously owned by a consumer
+ * be assigned to another (ie nothing gets revoked), and the number of
tasks is evenly distributed throughout
+ * the client.
+ * <p>
+ * If it is impossible to satisfy both constraints we abort early and
return an empty map so we can use a
+ * different assignment strategy that tries to distribute tasks of a
single subtopology across different threads.
+ *
+ * @param state state for this client
+ * @param consumers the consumers in this client
+ * @param partitionsForTask mapping from task to its associated partitions
+ * @param allOwnedPartitions set of all partitions claimed as owned by the
group
+ * @return task assignment for the consumers of this client
+ * empty map if it is not possible to generate a balanced
assignment without moving a task to a new consumer
+ */
+ Map<String, List<TaskId>>
tryStickyAndBalancedTaskAssignmentWithinClient(final ClientState state,
+
final Set<String> consumers,
+
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+
final Set<TopicPartition> allOwnedPartitions) {
+ final Map<String, List<TaskId>> assignments = new HashMap<>();
+ final LinkedList<TaskId> newTasks = new LinkedList<>();
+ final Set<String> unfilledConsumers = new HashSet<>(consumers);
+
+ final int maxTasksPerClient = (int) Math.ceil(((double)
state.activeTaskCount()) / consumers.size());
+
+ // initialize task list for consumers
+ for (final String consumer : consumers) {
+ assignments.put(consumer, new ArrayList<>());
+ }
+
+ for (final TaskId task : state.activeTasks()) {
+ final Set<String> previousConsumers =
previousConsumersOfTaskPartitions(partitionsForTask.get(task),
state.ownedPartitions(), allOwnedPartitions);
+
+ // If this task's partitions were owned by different consumers, we
can't avoid revoking partitions
+ if (previousConsumers.size() > 1) {
+ log.warn("The partitions of task {} were claimed as owned by
different StreamThreads. " +
+ "This indicates the mapping from partitions to tasks has
changed!", task);
+ return Collections.emptyMap();
+ }
+
+ // If this is a new task, or its old consumer no longer exists, it
can be freely (re)assigned
+ if (previousConsumers.isEmpty()) {
+ log.debug("Task {} was not previously owned by any consumers
still in the group. It's owner may " +
+ "have died or it may be a new task", task);
+ newTasks.add(task);
+ } else {
+ final String consumer = previousConsumers.iterator().next();
+
+ // If the previous consumer was from another client, these
partitions will have to be revoked
+ if (!consumers.contains(consumer)) {
+ log.debug("This client was assigned a task {} whose
partition(s) were previously owned by another " +
+ "client, falling back to an interleaved assignment
since a rebalance is inevitable.", task);
+ return Collections.emptyMap();
+ }
+
+ // If this consumer previously owned more tasks than it has
capacity for, some must be revoked
+ if (assignments.get(consumer).size() >= maxTasksPerClient) {
+ log.debug("Cannot create a sticky and balanced assignment
as this client's consumers owned more " +
+ "previous tasks than it has capacity for during this
assignment, falling back to interleaved " +
+ "assignment since a realance is inevitable.");
+ return Collections.emptyMap();
+ }
+
+ assignments.get(consumer).add(task);
+
+ // If we have now reached capacity, remove it from set of
consumers who still need more tasks
+ if (assignments.get(consumer).size() == maxTasksPerClient) {
+ unfilledConsumers.remove(consumer);
+ }
+ }
+ }
+
+ // Interleave any remaining tasks by groupId among the consumers with
remaining capacity. For further
+ // explanation, see the javadocs for #interleaveConsumerTasksByGroupId
+ Collections.sort(newTasks);
+ while (!newTasks.isEmpty()) {
+ if (unfilledConsumers.isEmpty()) {
+ throw new IllegalStateException("Some tasks could not be
distributed");
+ }
+
+ final Iterator<String> consumerIt = unfilledConsumers.iterator();
+
+ // Loop through the unfilled consumers and distribute tasks until
newTasks is empty
+ while (consumerIt.hasNext()) {
+ final String consumer = consumerIt.next();
+ final List<TaskId> consumerAssignment =
assignments.get(consumer);
+ final TaskId task = newTasks.poll();
+ if (task == null) {
+ break;
+ }
+
+ consumerAssignment.add(task);
+ if (consumerAssignment.size() == maxTasksPerClient) {
+ consumerIt.remove();
+ }
+ }
+ }
+
+ return assignments;
+ }
+
+ /**
+ * Get the previous consumer for the partitions of a task
+ *
+ * @param taskPartitions the TopicPartitions for a single given task
+ * @param clientOwnedPartitions the partitions owned by all consumers in a
client
+ * @param allOwnedPartitions all partitions claimed as owned by any
consumer in any client
+ * @return set of consumer(s) that previously owned the partitions in this
task
+ * empty set signals that it is a new task, or its previous owner
is no longer in the group
+ */
+ Set<String> previousConsumersOfTaskPartitions(final Set<TopicPartition>
taskPartitions,
+ final Map<TopicPartition,
String> clientOwnedPartitions,
+ final Set<TopicPartition>
allOwnedPartitions) {
+ // this "foreignConsumer" indicates a partition was owned by someone
from another client -- we don't really care who
+ final String foreignConsumer = "";
+ final Set<String> previousConsumers = new HashSet<>();
+
+ for (final TopicPartition tp : taskPartitions) {
+ final String currentPartitionConsumer =
clientOwnedPartitions.get(tp);
+ if (currentPartitionConsumer != null) {
+ previousConsumers.add(currentPartitionConsumer);
+ } else if (allOwnedPartitions.contains(tp)) {
+ previousConsumers.add(foreignConsumer);
+ }
+ }
+
+ return previousConsumers;
+ }
+
+ /**
+ * Generate an assignment that attempts to maximize load balance without
regard for stickiness, by spreading
+ * tasks of the same groupId (subtopology) over different consumers.
+ *
+ * @param taskIds the set of tasks to be distributed
+ * @param consumers the set of consumers to receive tasks
+ * @return a map of task assignments keyed by the consumer id
+ */
+ static Map<String, List<TaskId>> interleaveConsumerTasksByGroupId(final
Collection<TaskId> taskIds,
+ final
Set<String> consumers) {
+ // First we make a sorted list of the tasks, grouping them by groupId
final LinkedList<TaskId> sortedTasks = new LinkedList<>(taskIds);
Collections.sort(sortedTasks);
- final List<List<TaskId>> taskIdsForConsumerAssignment = new
ArrayList<>(numberThreads);
- for (int i = 0; i < numberThreads; i++) {
- taskIdsForConsumerAssignment.add(new ArrayList<>());
+
+ // Initialize the assignment map and task list for each consumer. We
use a TreeMap here for a consistent
+ // ordering of the consumers in the hope they will end up with the
same set of tasks in subsequent assignments
+ final Map<String, List<TaskId>> taskIdsForConsumerAssignment = new
TreeMap<>();
+ for (final String consumer : consumers) {
+ taskIdsForConsumerAssignment.put(consumer, new ArrayList<>());
}
+
+ // We loop until the tasks have all been assigned, removing them from
the list when they are given to a
+ // consumer. To interleave the tasks, we loop through the consumers
and give each one task from the head
+ // of the list. When we finish going through the list of consumers we
start over at the beginning of the
+ // consumers list, continuing until we run out of tasks.
while (!sortedTasks.isEmpty()) {
- for (final List<TaskId> taskIdList : taskIdsForConsumerAssignment)
{
+ for (final Map.Entry<String, List<TaskId>> consumerTaskIds :
taskIdsForConsumerAssignment.entrySet()) {
+ final List<TaskId> taskIdList = consumerTaskIds.getValue();
final TaskId taskId = sortedTasks.poll();
+
+ // Check for null here as we may run out of tasks before
giving every consumer exactly the same number
if (taskId == null) {
break;
}
@@ -774,7 +1033,6 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
protected boolean maybeUpdateSubscriptionVersion(final int
receivedAssignmentMetadataVersion,
final int
latestCommonlySupportedVersion) {
if (receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
-
// If the latest commonly supported version is now greater than
our used version, this indicates we have just
// completed the rolling upgrade and can now update our
subscription version for the final rebalance
if (latestCommonlySupportedVersion >
usedSubscriptionMetadataVersion) {
@@ -880,6 +1138,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
taskManager.setPartitionsToTaskId(partitionsToTaskId);
taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
taskManager.updateSubscriptionsFromAssignment(partitions);
+ taskManager.setRebalanceInProgress(false);
}
private static void processVersionOneAssignment(final String logPrefix,
@@ -973,12 +1232,23 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
}
+ private int updateMinReceivedVersion(final int usedVersion, final int
minReceivedMetadataVersion) {
+ return usedVersion < minReceivedMetadataVersion ? usedVersion :
minReceivedMetadataVersion;
+ }
+
+ private int updateMinSupportedVersion(final int supportedVersion, final
int minSupportedMetadataVersion) {
+ return supportedVersion < minSupportedMetadataVersion ?
supportedVersion : minSupportedMetadataVersion;
+ }
+
protected void setAssignmentErrorCode(final Integer errorCode) {
assignmentErrorCode.set(errorCode);
}
-
// following functions are for test only
+ void setRebalanceProtocol(final RebalanceProtocol rebalanceProtocol) {
+ this.rebalanceProtocol = rebalanceProtocol;
+ }
+
void setInternalTopicManager(final InternalTopicManager
internalTopicManager) {
this.internalTopicManager = internalTopicManager;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index da9e656..af1b4bd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -27,7 +27,7 @@ import java.util.Set;
public interface Task {
/**
- * Initialize the task and return {@code true} if the task is ready to
run, i.e, it has not state stores
+ * Initialize the task and return {@code true} if the task is ready to
run, i.e, it has no state stores
* @return true if this task has no state stores that may need restoring.
* @throws IllegalStateException If store gets registered after
initialized is already finished
* @throws StreamsException if the store's change log does not contain the
partition
@@ -40,14 +40,8 @@ public interface Task {
void commit();
- void suspend();
-
void resume();
- void closeSuspended(final boolean clean,
- final boolean isZombie,
- final RuntimeException e);
-
void close(final boolean clean,
final boolean isZombie);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index cd90fad..a2dac40 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -60,6 +60,7 @@ public class TaskManager {
private final Admin adminClient;
private DeleteRecordsResult deleteRecordsResult;
+ private boolean rebalanceInProgress = false; // if we are in the middle
of a rebalance, it is not safe to commit
// the restore consumer is only ever assigned changelogs from restoring
tasks or standbys (but not both)
private boolean restoreConsumerAssignedStandbys = false;
@@ -144,7 +145,7 @@ public class TaskManager {
addedActiveTasks.put(taskId, partitions);
}
} catch (final StreamsException e) {
- log.error("Failed to resume an active task {} due to the
following error:", taskId, e);
+ log.error("Failed to resume a suspended active task {} due to
the following error:", taskId, e);
throw e;
}
}
@@ -303,7 +304,11 @@ public class TaskManager {
}
}
- Set<TaskId> activeTaskIds() {
+ public Set<TaskId> previousRunningTaskIds() {
+ return active.previousRunningTaskIds();
+ }
+
+ public Set<TaskId> activeTaskIds() {
return active.allAssignedTaskIds();
}
@@ -319,10 +324,6 @@ public class TaskManager {
return revokedStandbyTasks.keySet();
}
- public Set<TaskId> previousRunningTaskIds() {
- return active.previousRunningTaskIds();
- }
-
Set<TaskId> previousActiveTaskIds() {
final HashSet<TaskId> previousActiveTasks = new
HashSet<>(assignedActiveTasks.keySet());
previousActiveTasks.addAll(revokedActiveTasks.keySet());
@@ -378,9 +379,11 @@ public class TaskManager {
active.initializeNewTasks();
standby.initializeNewTasks();
- final Collection<TopicPartition> restored =
changelogReader.restore(active);
- active.updateRestored(restored);
- removeChangelogsFromRestoreConsumer(restored, false);
+ if (active.hasRestoringTasks()) {
+ final Collection<TopicPartition> restored =
changelogReader.restore(active);
+ active.updateRestored(restored);
+ removeChangelogsFromRestoreConsumer(restored, false);
+ }
if (active.allTasksRunning()) {
final Set<TopicPartition> assignment = consumer.assignment();
@@ -420,6 +423,10 @@ public class TaskManager {
}
}
+ public void setRebalanceInProgress(final boolean rebalanceInProgress) {
+ this.rebalanceInProgress = rebalanceInProgress;
+ }
+
public void setClusterMetadata(final Cluster cluster) {
this.cluster = cluster;
}
@@ -493,10 +500,10 @@ public class TaskManager {
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
+ * @return number of committed offsets, or -1 if we are in the middle of a
rebalance and cannot commit
*/
int commitAll() {
- final int committed = active.commit();
- return committed + standby.commit();
+ return rebalanceInProgress ? -1 : active.commit() + standby.commit();
}
/**
@@ -518,7 +525,7 @@ public class TaskManager {
* or if the task producer got fenced (EOS)
*/
int maybeCommitActiveTasksPerUserRequested() {
- return active.maybeCommitPerUserRequested();
+ return rebalanceInProgress ? -1 : active.maybeCommitPerUserRequested();
}
void maybePurgeCommitedRecords() {
@@ -528,7 +535,8 @@ public class TaskManager {
if (deleteRecordsResult == null || deleteRecordsResult.all().isDone())
{
if (deleteRecordsResult != null &&
deleteRecordsResult.all().isCompletedExceptionally()) {
- log.debug("Previous delete-records request has failed: {}. Try
sending the new request now", deleteRecordsResult.lowWatermarks());
+ log.debug("Previous delete-records request has failed: {}. Try
sending the new request now",
+ deleteRecordsResult.lowWatermarks());
}
final Map<TopicPartition, RecordsToDelete> recordsToDelete = new
HashMap<>();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index ac88f2f..1e406e2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -153,7 +153,8 @@ public final class AssignorConfiguration {
throw new IllegalArgumentException("Unknown configuration
value for parameter 'upgrade.from': " + upgradeFrom);
}
}
- return RebalanceProtocol.EAGER;
+
+ return RebalanceProtocol.COOPERATIVE;
}
public String logPrefix() {
@@ -181,14 +182,19 @@ public final class AssignorConfiguration {
upgradeFrom
);
return VERSION_TWO;
+ case StreamsConfig.UPGRADE_FROM_20:
+ case StreamsConfig.UPGRADE_FROM_21:
+ case StreamsConfig.UPGRADE_FROM_22:
+ case StreamsConfig.UPGRADE_FROM_23:
+ // These configs are for cooperative rebalancing and
should not affect the metadata version
+ break;
default:
throw new IllegalArgumentException(
"Unknown configuration value for parameter
'upgrade.from': " + upgradeFrom
);
}
- } else {
- return priorVersion;
}
+ return priorVersion;
}
public int getNumStandbyReplicas() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index ab213d5..df42b14 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -16,11 +16,13 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
public class ClientState {
@@ -31,8 +33,7 @@ public class ClientState {
private final Set<TaskId> prevStandbyTasks;
private final Set<TaskId> prevAssignedTasks;
- private final Map<String, Set<TaskId>> prevActiveTasksByConsumer;
- private final Map<String, Set<TaskId>> prevStandbyTasksByConsumer;
+ private final Map<TopicPartition, String> ownedPartitions;
private int capacity;
@@ -48,7 +49,6 @@ public class ClientState {
new HashSet<>(),
new HashSet<>(),
new HashMap<>(),
- new HashMap<>(),
capacity);
}
@@ -58,8 +58,7 @@ public class ClientState {
final Set<TaskId> prevActiveTasks,
final Set<TaskId> prevStandbyTasks,
final Set<TaskId> prevAssignedTasks,
- final Map<String, Set<TaskId>>
prevActiveTasksByConsumer,
- final Map<String, Set<TaskId>>
prevStandbyTasksByConsumer,
+ final Map<TopicPartition, String> ownedPartitions,
final int capacity) {
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
@@ -67,8 +66,7 @@ public class ClientState {
this.prevActiveTasks = prevActiveTasks;
this.prevStandbyTasks = prevStandbyTasks;
this.prevAssignedTasks = prevAssignedTasks;
- this.prevActiveTasksByConsumer = prevActiveTasksByConsumer;
- this.prevStandbyTasksByConsumer = prevStandbyTasksByConsumer;
+ this.ownedPartitions = ownedPartitions;
this.capacity = capacity;
}
@@ -80,8 +78,7 @@ public class ClientState {
new HashSet<>(prevActiveTasks),
new HashSet<>(prevStandbyTasks),
new HashSet<>(prevAssignedTasks),
- new HashMap<>(prevActiveTasksByConsumer),
- new HashMap<>(prevStandbyTasksByConsumer),
+ new HashMap<>(ownedPartitions),
capacity);
}
@@ -111,6 +108,10 @@ public class ClientState {
return prevStandbyTasks;
}
+ public Map<TopicPartition, String> ownedPartitions() {
+ return ownedPartitions;
+ }
+
@SuppressWarnings("WeakerAccess")
public int assignedTaskCount() {
return assignedTasks.size();
@@ -125,24 +126,25 @@ public class ClientState {
return activeTasks.size();
}
- public void addPreviousActiveTasks(final String consumer, final
Set<TaskId> prevTasks) {
+ public void addPreviousActiveTasks(final Set<TaskId> prevTasks) {
prevActiveTasks.addAll(prevTasks);
prevAssignedTasks.addAll(prevTasks);
- prevActiveTasksByConsumer.put(consumer, prevTasks);
}
- public void addPreviousStandbyTasks(final String consumer, final
Set<TaskId> standbyTasks) {
+ public void addPreviousStandbyTasks(final Set<TaskId> standbyTasks) {
prevStandbyTasks.addAll(standbyTasks);
prevAssignedTasks.addAll(standbyTasks);
- prevStandbyTasksByConsumer.put(consumer, standbyTasks);
}
- public Set<TaskId> prevActiveTasksForConsumer(final String consumer) {
- return prevActiveTasksByConsumer.get(consumer);
+ public void addOwnedPartitions(final Collection<TopicPartition>
ownedPartitions, final String consumer) {
+ for (final TopicPartition tp : ownedPartitions) {
+ this.ownedPartitions.put(tp, consumer);
+ }
}
- public Set<TaskId> prevStandbyTasksForConsumer(final String consumer) {
- return prevStandbyTasksByConsumer.get(consumer);
+ public void removeFromAssignment(final TaskId task) {
+ activeTasks.remove(task);
+ assignedTasks.remove(task);
}
@Override
@@ -153,6 +155,7 @@ public class ClientState {
") prevActiveTasks: (" + prevActiveTasks +
") prevStandbyTasks: (" + prevStandbyTasks +
") prevAssignedTasks: (" + prevAssignedTasks +
+ ") prevOwnedPartitionsByConsumerId: (" +
ownedPartitions.keySet() +
") capacity: " + capacity +
"]";
}
@@ -182,16 +185,6 @@ public class ClientState {
}
}
- Set<TaskId> previousStandbyTasks() {
- final Set<TaskId> standby = new HashSet<>(prevAssignedTasks);
- standby.removeAll(prevActiveTasks);
- return standby;
- }
-
- Set<TaskId> previousActiveTasks() {
- return prevActiveTasks;
- }
-
boolean hasAssignedTask(final TaskId taskId) {
return assignedTasks.contains(taskId);
}
@@ -212,4 +205,9 @@ public class ClientState {
boolean hasUnfulfilledQuota(final int tasksPerThread) {
return activeTasks.size() < capacity * tasksPerThread;
}
+
+ // the following methods are used for testing only
+ public void assignActiveTasks(final Collection<TaskId> tasks) {
+ activeTasks.addAll(tasks);
+ }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 157497d..d1da8b8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -228,14 +228,12 @@ public class StickyTaskAssignor<ID> implements
TaskAssignor<ID, TaskId> {
private void mapPreviousTaskAssignment(final Map<ID, ClientState> clients)
{
for (final Map.Entry<ID, ClientState> clientState :
clients.entrySet()) {
- for (final TaskId activeTask :
clientState.getValue().previousActiveTasks()) {
+ for (final TaskId activeTask :
clientState.getValue().prevActiveTasks()) {
previousActiveTaskAssignment.put(activeTask,
clientState.getKey());
}
- for (final TaskId prevAssignedTask :
clientState.getValue().previousStandbyTasks()) {
- if
(!previousStandbyTaskAssignment.containsKey(prevAssignedTask)) {
- previousStandbyTaskAssignment.put(prevAssignedTask, new
HashSet<>());
- }
+ for (final TaskId prevAssignedTask :
clientState.getValue().prevStandbyTasks()) {
+
previousStandbyTaskAssignment.computeIfAbsent(prevAssignedTask, t -> new
HashSet<>());
previousStandbyTaskAssignment.get(prevAssignedTask).add(clientState.getKey());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index a8526bf..7ce9712 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -227,15 +227,9 @@ public class AbstractTaskTest {
public void commit() {}
@Override
- public void suspend() {}
-
- @Override
public void close(final boolean clean, final boolean isZombie) {}
@Override
- public void closeSuspended(final boolean clean, final boolean
isZombie, final RuntimeException e) {}
-
- @Override
public boolean initializeStateStores() {
return false;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 88d4d6f..c2b9cdb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -443,7 +443,6 @@ public class StandbyTaskTest {
singletonList(makeWindowedConsumerRecord(changelogName, 10, 1, 0L,
60_000L))
);
- task.suspend();
task.close(true, false);
final File taskDir = stateDirectory.directoryForTask(taskId);
@@ -817,26 +816,4 @@ public class StandbyTaskTest {
final double expectedCloseTaskMetric = 1.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics,
metricName);
}
-
- @Test
- public void shouldRecordTaskClosedMetricOnCloseSuspended() throws
IOException {
- final MetricName metricName = setupCloseTaskMetric();
- final StandbyTask task = new StandbyTask(
- taskId,
- ktablePartitions,
- ktableTopology,
- consumer,
- changelogReader,
- createConfig(baseDir),
- streamsMetrics,
- stateDirectory
- );
-
- final boolean clean = true;
- final boolean isZombie = false;
- task.closeSuspended(clean, isZombie, new RuntimeException());
-
- final double expectedCloseTaskMetric = 1.0;
- verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics,
metricName);
- }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index e997f2e..9ff6e33 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
@@ -37,6 +39,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.test.MockClientSupplier;
@@ -66,11 +69,17 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@SuppressWarnings("unchecked")
public class StreamsPartitionAssignorTest {
+ private final String c1 = "consumer1";
+ private final String c2 = "consumer2";
+ private final String c3 = "consumer3";
+ private final String c4 = "consumer4";
private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
@@ -84,6 +93,40 @@ public class StreamsPartitionAssignorTest {
private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
+ private final TopicPartition t4p0 = new TopicPartition("topic4", 0);
+ private final TopicPartition t4p1 = new TopicPartition("topic4", 1);
+ private final TopicPartition t4p2 = new TopicPartition("topic4", 2);
+ private final TopicPartition t4p3 = new TopicPartition("topic4", 3);
+
+ private final TaskId task0_0 = new TaskId(0, 0);
+ private final TaskId task0_1 = new TaskId(0, 1);
+ private final TaskId task0_2 = new TaskId(0, 2);
+ private final TaskId task0_3 = new TaskId(0, 3);
+ private final TaskId task1_0 = new TaskId(1, 0);
+ private final TaskId task1_1 = new TaskId(1, 1);
+ private final TaskId task1_2 = new TaskId(1, 2);
+ private final TaskId task1_3 = new TaskId(1, 3);
+ private final TaskId task2_0 = new TaskId(2, 0);
+ private final TaskId task2_1 = new TaskId(2, 1);
+ private final TaskId task2_2 = new TaskId(2, 2);
+ private final TaskId task2_3 = new TaskId(2, 3);
+
+ private final Map<TaskId, Set<TopicPartition>> partitionsForTask = new
HashMap<TaskId, Set<TopicPartition>>() {{
+ put(task0_0, Utils.mkSet(t1p0, t2p0));
+ put(task0_1, Utils.mkSet(t1p1, t2p1));
+ put(task0_2, Utils.mkSet(t1p2, t2p2));
+ put(task0_3, Utils.mkSet(t1p3, t2p3));
+
+ put(task1_0, Utils.mkSet(t3p0));
+ put(task1_1, Utils.mkSet(t3p1));
+ put(task1_2, Utils.mkSet(t3p2));
+ put(task1_3, Utils.mkSet(t3p3));
+
+ put(task2_0, Utils.mkSet(t4p0));
+ put(task2_1, Utils.mkSet(t4p1));
+ put(task2_2, Utils.mkSet(t4p2));
+ put(task2_3, Utils.mkSet(t4p3));
+ }};
private final Set<String> allTopics = Utils.mkSet("topic1", "topic2");
@@ -109,10 +152,6 @@ public class StreamsPartitionAssignorTest {
Collections.emptySet(),
Collections.emptySet());
- private final TaskId task0 = new TaskId(0, 0);
- private final TaskId task1 = new TaskId(0, 1);
- private final TaskId task2 = new TaskId(0, 2);
- private final TaskId task3 = new TaskId(0, 3);
private final StreamsPartitionAssignor partitionAssignor = new
StreamsPartitionAssignor();
private final MockClientSupplier mockClientSupplier = new
MockClientSupplier();
private final InternalTopologyBuilder builder = new
InternalTopologyBuilder();
@@ -137,6 +176,11 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.configure(configurationMap);
}
+ private void configureDefault() {
+ createMockTaskManager();
+ partitionAssignor.configure(configProps());
+ }
+
private void createMockTaskManager() {
final StreamsBuilder builder = new StreamsBuilder();
final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(builder.build());
@@ -153,6 +197,7 @@ public class StreamsPartitionAssignorTest {
EasyMock.expect(taskManager.adminClient()).andReturn(null).anyTimes();
EasyMock.expect(taskManager.builder()).andReturn(builder).anyTimes();
EasyMock.expect(taskManager.previousRunningTaskIds()).andReturn(prevTasks).anyTimes();
+
EasyMock.expect(taskManager.activeTaskIds()).andReturn(prevTasks).anyTimes();
EasyMock.expect(taskManager.cachedTasksIds()).andReturn(cachedTasks).anyTimes();
EasyMock.expect(taskManager.processId()).andReturn(processId).anyTimes();
}
@@ -169,6 +214,141 @@ public class StreamsPartitionAssignorTest {
}
@Test
+ public void shouldUseEagerRebalancingProtocol() {
+ createMockTaskManager();
+ final Map<String, Object> props = configProps();
+ props.put(StreamsConfig.UPGRADE_FROM_CONFIG,
StreamsConfig.UPGRADE_FROM_23);
+ partitionAssignor.configure(props);
+
+ assertEquals(1, partitionAssignor.supportedProtocols().size());
+
assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.EAGER));
+
assertFalse(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE));
+ }
+
+ @Test
+ public void shouldUseCooperativeRebalancingProtocol() {
+ createMockTaskManager();
+ final Map<String, Object> props = configProps();
+ partitionAssignor.configure(props);
+
+ assertEquals(2, partitionAssignor.supportedProtocols().size());
+
assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE));
+ }
+
+ @Test
+ public void shouldProduceStickyAndBalancedAssignmentWhenNothingChanges() {
+ configureDefault();
+ final ClientState state = new ClientState();
+ final List<TaskId> allTasks = Arrays.asList(task0_0, task0_1, task0_2,
task0_3, task1_0, task1_1, task1_2, task1_3);
+
+ final Map<String, List<TaskId>> previousAssignment = new
HashMap<String, List<TaskId>>() {{
+ put(c1, Arrays.asList(task0_0, task1_1, task1_3));
+ put(c2, Arrays.asList(task0_3, task1_0));
+ put(c3, Arrays.asList(task0_1, task0_2, task1_2));
+ }};
+
+ for (final Map.Entry<String, List<TaskId>> entry :
previousAssignment.entrySet()) {
+ for (final TaskId task : entry.getValue()) {
+ state.addOwnedPartitions(partitionsForTask.get(task),
entry.getKey());
+ }
+ }
+
+ final Set<String> consumers = Utils.mkSet(c1, c2, c3);
+ state.assignActiveTasks(allTasks);
+
+ assertEquivalentAssignment(previousAssignment,
+
partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(state,
consumers, partitionsForTask, Collections.emptySet()));
+ }
+
+ @Test
+ public void shouldProduceStickyAndBalancedAssignmentWhenNewTasksAreAdded()
{
+ configureDefault();
+ final ClientState state = new ClientState();
+
+ final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2,
task0_3, task1_0, task1_1, task1_2, task1_3);
+
+ final Map<String, List<TaskId>> previousAssignment = new
HashMap<String, List<TaskId>>() {{
+ put(c1, new ArrayList<>(Arrays.asList(task0_0, task1_1,
task1_3)));
+ put(c2, new ArrayList<>(Arrays.asList(task0_3, task1_0)));
+ put(c3, new ArrayList<>(Arrays.asList(task0_1, task0_2,
task1_2)));
+ }};
+
+ for (final Map.Entry<String, List<TaskId>> entry :
previousAssignment.entrySet()) {
+ for (final TaskId task : entry.getValue()) {
+ state.addOwnedPartitions(partitionsForTask.get(task),
entry.getKey());
+ }
+ }
+
+ final Set<String> consumers = Utils.mkSet(c1, c2, c3);
+
+ // We should be able to add a new task without sacrificing stickyness
+ final TaskId newTask = task2_0;
+ allTasks.add(newTask);
+ state.assignActiveTasks(allTasks);
+
+ final Map<String, List<TaskId>> newAssignment =
partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(state,
consumers, partitionsForTask, Collections.emptySet());
+
+ previousAssignment.get(c2).add(newTask);
+ assertEquivalentAssignment(previousAssignment, newAssignment);
+ }
+
+ @Test
+ public void
shouldReturnEmptyMapWhenStickyAndBalancedAssignmentIsNotPossibleBecauseNewConsumerJoined()
{
+ configureDefault();
+ final ClientState state = new ClientState();
+
+ final List<TaskId> allTasks = Arrays.asList(task0_0, task0_1, task0_2,
task0_3, task1_0, task1_1, task1_2, task1_3);
+
+ final Map<String, List<TaskId>> previousAssignment = new
HashMap<String, List<TaskId>>() {{
+ put(c1, Arrays.asList(task0_0, task1_1, task1_3));
+ put(c2, Arrays.asList(task0_3, task1_0));
+ put(c3, Arrays.asList(task0_1, task0_2, task1_2));
+ }};
+
+ for (final Map.Entry<String, List<TaskId>> entry :
previousAssignment.entrySet()) {
+ for (final TaskId task : entry.getValue()) {
+ state.addOwnedPartitions(partitionsForTask.get(task),
entry.getKey());
+ }
+ }
+
+ // If we add a new consumer here, we cannot produce an assignment that
is both sticky and balanced
+ final Set<String> consumers = Utils.mkSet(c1, c2, c3, c4);
+ state.assignActiveTasks(allTasks);
+
+
assertThat(partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(state,
consumers, partitionsForTask, Collections.emptySet()),
+ equalTo(Collections.emptyMap()));
+ }
+
+ @Test
+ public void
shouldReturnEmptyMapWhenStickyAndBalancedAssignmentIsNotPossibleBecauseOtherClientOwnedPartition()
{
+ configureDefault();
+ final ClientState state = new ClientState();
+
+ final List<TaskId> allTasks = Arrays.asList(task0_0, task0_1, task0_2,
task0_3, task1_0, task1_1, task1_2, task1_3);
+
+ final Map<String, List<TaskId>> previousAssignment = new
HashMap<String, List<TaskId>>() {{
+ put(c1, new ArrayList<>(Arrays.asList(task1_1, task1_3)));
+ put(c2, new ArrayList<>(Arrays.asList(task0_3, task1_0)));
+ put(c3, new ArrayList<>(Arrays.asList(task0_1, task0_2,
task1_2)));
+ }};
+
+ for (final Map.Entry<String, List<TaskId>> entry :
previousAssignment.entrySet()) {
+ for (final TaskId task : entry.getValue()) {
+ state.addOwnedPartitions(partitionsForTask.get(task),
entry.getKey());
+ }
+ }
+
+ // Add the partitions of task0_0 to allOwnedPartitions but not c1's
ownedPartitions/previousAssignment
+ final Set<TopicPartition> allOwnedPartitions = new
HashSet<>(partitionsForTask.get(task0_0));
+
+ final Set<String> consumers = Utils.mkSet(c1, c2, c3);
+ state.assignActiveTasks(allTasks);
+
+
assertThat(partitionAssignor.tryStickyAndBalancedTaskAssignmentWithinClient(state,
consumers, partitionsForTask, allOwnedPartitions),
+ equalTo(Collections.emptyMap()));
+ }
+
+ @Test
public void shouldInterleaveTasksByGroupId() {
final TaskId taskIdA0 = new TaskId(0, 0);
final TaskId taskIdA1 = new TaskId(0, 1);
@@ -182,21 +362,31 @@ public class StreamsPartitionAssignorTest {
final TaskId taskIdC0 = new TaskId(2, 0);
final TaskId taskIdC1 = new TaskId(2, 1);
+ final String c1 = "c1";
+ final String c2 = "c2";
+ final String c3 = "c3";
+
+ final Set<String> consumers = Utils.mkSet(c1, c2, c3);
+
final List<TaskId> expectedSubList1 = asList(taskIdA0, taskIdA3,
taskIdB2);
final List<TaskId> expectedSubList2 = asList(taskIdA1, taskIdB0,
taskIdC0);
final List<TaskId> expectedSubList3 = asList(taskIdA2, taskIdB1,
taskIdC1);
- final List<List<TaskId>> embeddedList = asList(expectedSubList1,
expectedSubList2, expectedSubList3);
+
+ final Map<String, List<TaskId>> assignment = new HashMap<>();
+ assignment.put(c1, expectedSubList1);
+ assignment.put(c2, expectedSubList2);
+ assignment.put(c3, expectedSubList3);
final List<TaskId> tasks = asList(taskIdC0, taskIdC1, taskIdB0,
taskIdB1, taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3);
Collections.shuffle(tasks);
- final List<List<TaskId>> interleavedTaskIds =
StreamsPartitionAssignor.interleaveTasksByGroupId(tasks, 3);
+ final Map<String, List<TaskId>> interleavedTaskIds =
StreamsPartitionAssignor.interleaveConsumerTasksByGroupId(tasks, consumers);
- assertThat(interleavedTaskIds, equalTo(embeddedList));
+ assertThat(interleavedTaskIds, equalTo(assignment));
}
@Test
- public void testSubscription() {
+ public void testEagerSubscription() {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source1", "source2");
@@ -212,6 +402,7 @@ public class StreamsPartitionAssignorTest {
EasyMock.replay(taskManager);
configurePartitionAssignor(Collections.emptyMap());
+ partitionAssignor.setRebalanceProtocol(RebalanceProtocol.EAGER);
final Set<String> topics = Utils.mkSet("topic1", "topic2");
final ConsumerPartitionAssignor.Subscription subscription = new
ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics),
partitionAssignor.subscriptionUserData(topics));
@@ -222,24 +413,60 @@ public class StreamsPartitionAssignorTest {
final Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
standbyTasks.removeAll(prevTasks);
+ // When following the eager protocol, we must encode the previous
tasks ourselves since we must revoke
+ // everything and thus the "ownedPartitions" field in the subscription
will be empty
final SubscriptionInfo info = new SubscriptionInfo(processId,
prevTasks, standbyTasks, null);
assertEquals(info.encode(), subscription.userData());
}
@Test
+ public void testCooperativeSubscription() {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+ builder.addSource(null, "source2", null, null, null, "topic2");
+ builder.addProcessor("processor", new MockProcessorSupplier(),
"source1", "source2");
+
+ final Set<TaskId> prevTasks = Utils.mkSet(
+ new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1));
+ final Set<TaskId> cachedTasks = Utils.mkSet(
+ new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
+ new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
+
+ final UUID processId = UUID.randomUUID();
+ createMockTaskManager(prevTasks, cachedTasks, processId, builder);
+ EasyMock.replay(taskManager);
+
+ configurePartitionAssignor(Collections.emptyMap());
+
+ final Set<String> topics = Utils.mkSet("topic1", "topic2");
+ final ConsumerPartitionAssignor.Subscription subscription = new
ConsumerPartitionAssignor.Subscription(
+ new ArrayList<>(topics),
partitionAssignor.subscriptionUserData(topics));
+
+ Collections.sort(subscription.topics());
+ assertEquals(asList("topic1", "topic2"), subscription.topics());
+
+ final Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
+ standbyTasks.removeAll(prevTasks);
+
+ // We don't encode the active tasks when following the cooperative
protocol, as these are inferred from the
+ // ownedPartitions encoded in the subscription
+ final SubscriptionInfo info = new SubscriptionInfo(processId,
Collections.emptySet(), standbyTasks, null);
+ assertEquals(info.encode(), subscription.userData());
+ }
+
+ @Test
public void testAssignBasic() {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source1", "source2");
final List<String> topics = asList("topic1", "topic2");
- final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+ final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2);
- final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
- final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
- final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
- final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
- final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
- final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
+ final Set<TaskId> prevTasks10 = Utils.mkSet(task0_0);
+ final Set<TaskId> prevTasks11 = Utils.mkSet(task0_1);
+ final Set<TaskId> prevTasks20 = Utils.mkSet(task0_2);
+ final Set<TaskId> standbyTasks10 = Utils.mkSet(task0_1);
+ final Set<TaskId> standbyTasks11 = Utils.mkSet(task0_2);
+ final Set<TaskId> standbyTasks20 = Utils.mkSet(task0_0);
final UUID uuid1 = UUID.randomUUID();
final UUID uuid2 = UUID.randomUUID();
@@ -251,14 +478,20 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.setInternalTopicManager(new
MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
subscriptions.put("consumer10",
- new ConsumerPartitionAssignor.Subscription(topics,
- new SubscriptionInfo(uuid1, prevTasks10,
standbyTasks10, userEndPoint).encode()));
+ new ConsumerPartitionAssignor.Subscription(
+ topics,
+ new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10,
userEndPoint).encode(),
+ Collections.singletonList(t1p0)));
subscriptions.put("consumer11",
- new ConsumerPartitionAssignor.Subscription(topics,
- new SubscriptionInfo(uuid1, prevTasks11,
standbyTasks11, userEndPoint).encode()));
+ new ConsumerPartitionAssignor.Subscription(
+ topics,
+ new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11,
userEndPoint).encode(),
+ Collections.singletonList(t1p1)));
subscriptions.put("consumer20",
- new ConsumerPartitionAssignor.Subscription(topics,
- new SubscriptionInfo(uuid2, prevTasks20,
standbyTasks20, userEndPoint).encode()));
+ new ConsumerPartitionAssignor.Subscription(
+ topics,
+ new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20,
userEndPoint).encode(),
+ Collections.singletonList(t1p2)));
final Map<String, ConsumerPartitionAssignor.Assignment> assignments =
partitionAssignor.assign(metadata, new
GroupSubscription(subscriptions)).groupAssignment();
@@ -277,7 +510,7 @@ public class StreamsPartitionAssignorTest {
final AssignmentInfo info11 = checkAssignment(allTopics,
assignments.get("consumer11"));
allActiveTasks.addAll(info11.activeTasks());
- assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
+ assertEquals(Utils.mkSet(task0_0, task0_1), allActiveTasks);
// the third consumer
final AssignmentInfo info20 = checkAssignment(allTopics,
assignments.get("consumer20"));
@@ -351,12 +584,12 @@ public class StreamsPartitionAssignorTest {
// the first consumer
final AssignmentInfo info10 =
AssignmentInfo.decode(assignments.get("consumer10").userData());
- final List<TaskId> expectedInfo10TaskIds = asList(taskIdA1, taskIdA3,
taskIdB1, taskIdB3);
+ final List<TaskId> expectedInfo10TaskIds = asList(taskIdA0, taskIdA2,
taskIdB0, taskIdB2);
assertEquals(expectedInfo10TaskIds, info10.activeTasks());
// the second consumer
final AssignmentInfo info11 =
AssignmentInfo.decode(assignments.get("consumer11").userData());
- final List<TaskId> expectedInfo11TaskIds = asList(taskIdA0, taskIdA2,
taskIdB0, taskIdB2);
+ final List<TaskId> expectedInfo11TaskIds = asList(taskIdA1, taskIdA3,
taskIdB1, taskIdB3);
assertEquals(expectedInfo11TaskIds, info11.activeTasks());
}
@@ -371,7 +604,7 @@ public class StreamsPartitionAssignorTest {
builder.addProcessor("processor2", new MockProcessorSupplier(),
"source2");
builder.addStateStore(new MockKeyValueStoreBuilder("store2", false),
"processor2");
final List<String> topics = asList("topic1", "topic2");
- final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+ final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2);
final UUID uuid1 = UUID.randomUUID();
@@ -403,10 +636,10 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source1", "source2");
final List<String> topics = asList("topic1", "topic2");
- final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+ final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2);
- final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
- final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
+ final Set<TaskId> prevTasks10 = Utils.mkSet(task0_0);
+ final Set<TaskId> standbyTasks10 = Utils.mkSet(task0_1);
final Cluster emptyMetadata = new Cluster("cluster",
Collections.singletonList(Node.noNode()),
Collections.emptySet(),
Collections.emptySet(),
@@ -459,12 +692,12 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source3", null, null, null, "topic3");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source1", "source2", "source3");
final List<String> topics = asList("topic1", "topic2", "topic3");
- final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
+ final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2,
task0_3);
// assuming that previous tasks do not have topic3
- final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
- final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
- final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+ final Set<TaskId> prevTasks10 = Utils.mkSet(task0_0);
+ final Set<TaskId> prevTasks11 = Utils.mkSet(task0_1);
+ final Set<TaskId> prevTasks20 = Utils.mkSet(task0_2);
final UUID uuid1 = UUID.randomUUID();
final UUID uuid2 = UUID.randomUUID();
@@ -606,15 +839,15 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source1", "source2");
final List<String> topics = asList("topic1", "topic2");
- final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+ final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2);
- final Set<TaskId> prevTasks00 = Utils.mkSet(task0);
- final Set<TaskId> prevTasks01 = Utils.mkSet(task1);
- final Set<TaskId> prevTasks02 = Utils.mkSet(task2);
- final Set<TaskId> standbyTasks01 = Utils.mkSet(task1);
- final Set<TaskId> standbyTasks02 = Utils.mkSet(task2);
- final Set<TaskId> standbyTasks00 = Utils.mkSet(task0);
+ final Set<TaskId> prevTasks00 = Utils.mkSet(task0_0);
+ final Set<TaskId> prevTasks01 = Utils.mkSet(task0_1);
+ final Set<TaskId> prevTasks02 = Utils.mkSet(task0_2);
+ final Set<TaskId> standbyTasks01 = Utils.mkSet(task0_1);
+ final Set<TaskId> standbyTasks02 = Utils.mkSet(task0_2);
+ final Set<TaskId> standbyTasks00 = Utils.mkSet(task0_0);
final UUID uuid1 = UUID.randomUUID();
final UUID uuid2 = UUID.randomUUID();
@@ -651,8 +884,8 @@ public class StreamsPartitionAssignorTest {
assertNotEquals("same processId has same set of standby tasks",
info11.standbyTasks().keySet(), info10.standbyTasks().keySet());
// check active tasks assigned to the first client
- assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
- assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
+ assertEquals(Utils.mkSet(task0_0, task0_1), new
HashSet<>(allActiveTasks));
+ assertEquals(Utils.mkSet(task0_2), new HashSet<>(allStandbyTasks));
// the third consumer
final AssignmentInfo info20 = checkAssignment(allTopics,
assignments.get("consumer20"));
@@ -678,11 +911,11 @@ public class StreamsPartitionAssignorTest {
EasyMock.expectLastCall();
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
- activeTasks.put(task0, Utils.mkSet(t3p0));
- activeTasks.put(task3, Utils.mkSet(t3p3));
+ activeTasks.put(task0_0, Utils.mkSet(t3p0));
+ activeTasks.put(task0_3, Utils.mkSet(t3p3));
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
- standbyTasks.put(task1, Utils.mkSet(t3p1));
- standbyTasks.put(task2, Utils.mkSet(t3p2));
+ standbyTasks.put(task0_1, Utils.mkSet(t3p1));
+ standbyTasks.put(task0_2, Utils.mkSet(t3p2));
taskManager.setAssignmentMetadata(activeTasks, standbyTasks);
EasyMock.expectLastCall();
@@ -693,7 +926,7 @@ public class StreamsPartitionAssignorTest {
EasyMock.replay(taskManager);
configurePartitionAssignor(Collections.emptyMap());
- final List<TaskId> activeTaskList = asList(task0, task3);
+ final List<TaskId> activeTaskList = asList(task0_0, task0_3);
final AssignmentInfo info = new AssignmentInfo(activeTaskList,
standbyTasks, hostState);
final ConsumerPartitionAssignor.Assignment assignment = new
ConsumerPartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode());
@@ -715,7 +948,7 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source2", null, null, null, "topicX");
builder.addProcessor("processor2", new MockProcessorSupplier(),
"source2");
final List<String> topics = asList("topic1", applicationId +
"-topicX");
- final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+ final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2);
final UUID uuid1 = UUID.randomUUID();
createMockTaskManager(emptyTasks, emptyTasks, uuid1, builder);
@@ -749,7 +982,7 @@ public class StreamsPartitionAssignorTest {
builder.addSink("sink2", "topicZ", null, null, null, "processor2");
builder.addSource(null, "source3", null, null, null, "topicZ");
final List<String> topics = asList("topic1", "test-topicX",
"test-topicZ");
- final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+ final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2);
final UUID uuid1 = UUID.randomUUID();
createMockTaskManager(emptyTasks, emptyTasks, uuid1, builder);
@@ -1197,36 +1430,95 @@ public class StreamsPartitionAssignorTest {
}
@Test
- public void
shouldReturnUnchangedAssignmentForOldInstancesAndEmptyAssignmentForFutureInstances()
{
+ public void
shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins()
{
builder.addSource(null, "source1", null, null, null, "topic1");
- final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+ final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2);
+
+ subscriptions.put(c1,
+ new ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(UUID.randomUUID(), allTasks,
Collections.emptySet(), null).encode(),
+ Arrays.asList(t1p0, t1p1, t1p2))
+ );
+ subscriptions.put(c2,
+ new ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(UUID.randomUUID(),
Collections.emptySet(), Collections.emptySet(), null).encode(),
+ Collections.emptyList())
+ );
- final Set<TaskId> activeTasks = Utils.mkSet(task0, task1);
- final Set<TaskId> standbyTasks = Utils.mkSet(task2);
+ createMockTaskManager(allTasks, allTasks, UUID.randomUUID(), builder);
+ EasyMock.replay(taskManager);
+ partitionAssignor.configure(configProps());
+
+ final Map<String, ConsumerPartitionAssignor.Assignment> assignment =
partitionAssignor.assign(metadata, new
GroupSubscription(subscriptions)).groupAssignment();
+
+ assertThat(assignment.size(), equalTo(2));
+
+ assertThat(assignment.get(c1).partitions(), equalTo(asList(t1p0,
t1p2)));
+ assertThat(
+ AssignmentInfo.decode(assignment.get(c1).userData()),
+ equalTo(new AssignmentInfo(
+ Arrays.asList(task0_0, task0_2),
+ Collections.emptyMap(),
+ Collections.emptyMap()
+ )));
+
+ // The new consumer's assignment should be empty until c1 has the
chance to revoke its partitions/tasks
+ assertThat(assignment.get(c2).partitions(),
equalTo(Collections.emptyList()));
+ assertThat(
+ AssignmentInfo.decode(assignment.get(c2).userData()),
+ equalTo(new AssignmentInfo(
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Collections.emptyMap()
+ )));
+ }
+
+ @Test
+ public void
shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+
+ final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2);
+
+ final Set<TaskId> activeTasks = Utils.mkSet(task0_0, task0_1);
+ final Set<TaskId> standbyTasks = Utils.mkSet(task0_2);
final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new
HashMap<TaskId, Set<TopicPartition>>() {
{
- put(task2, Collections.singleton(t1p2));
+ put(task0_2, Collections.singleton(t1p2));
+ }
+ };
+ final Map<TaskId, Set<TopicPartition>> futureStandbyTaskMap = new
HashMap<TaskId, Set<TopicPartition>>() {
+ {
+ put(task0_0, Collections.singleton(t1p0));
+ put(task0_1, Collections.singleton(t1p1));
}
};
subscriptions.put("consumer1",
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("topic1"),
- new SubscriptionInfo(UUID.randomUUID(), activeTasks,
standbyTasks, null).encode())
+ new SubscriptionInfo(UUID.randomUUID(), activeTasks,
standbyTasks, null).encode(),
+ Arrays.asList(t1p0, t1p1))
);
subscriptions.put("future-consumer",
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("topic1"),
- encodeFutureSubscription())
+ encodeFutureSubscription(),
+ Collections.singletonList(t1p2))
);
createMockTaskManager(allTasks, allTasks, UUID.randomUUID(), builder);
EasyMock.replay(taskManager);
- partitionAssignor.configure(configProps());
+ final Map<String, Object> props = configProps();
+ props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+ partitionAssignor.configure(props);
final Map<String, ConsumerPartitionAssignor.Assignment> assignment =
partitionAssignor.assign(metadata, new
GroupSubscription(subscriptions)).groupAssignment();
assertThat(assignment.size(), equalTo(2));
+
+ assertThat(assignment.get("consumer1").partitions(),
equalTo(asList(t1p0, t1p1)));
assertThat(
AssignmentInfo.decode(assignment.get("consumer1").userData()),
equalTo(new AssignmentInfo(
@@ -1234,10 +1526,64 @@ public class StreamsPartitionAssignorTest {
standbyTaskMap,
Collections.emptyMap()
)));
- assertThat(assignment.get("consumer1").partitions(),
equalTo(asList(t1p0, t1p1)));
-
assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()),
equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION,
LATEST_SUPPORTED_VERSION)));
- assertThat(assignment.get("future-consumer").partitions().size(),
equalTo(0));
+
+ assertThat(assignment.get("future-consumer").partitions(),
equalTo(Collections.singletonList(t1p2)));
+ assertThat(
+
AssignmentInfo.decode(assignment.get("future-consumer").userData()),
+ equalTo(new AssignmentInfo(
+ Collections.singletonList(task0_2),
+ futureStandbyTaskMap,
+ Collections.emptyMap()
+ )));
+ }
+
+ @Test
+ public void
shouldReturnInterleavedAssignmentForOnlyFutureInstancesDuringVersionProbing() {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+
+ final Set<TaskId> allTasks = Utils.mkSet(task0_0, task0_1, task0_2);
+
+ subscriptions.put(c1,
+ new ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ encodeFutureSubscription(),
+ Collections.emptyList())
+ );
+ subscriptions.put(c2,
+ new ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ encodeFutureSubscription(),
+ Collections.emptyList())
+ );
+
+ createMockTaskManager(allTasks, allTasks, UUID.randomUUID(), builder);
+ EasyMock.replay(taskManager);
+ final Map<String, Object> props = configProps();
+ props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+ partitionAssignor.configure(props);
+ final Map<String, ConsumerPartitionAssignor.Assignment> assignment =
partitionAssignor.assign(metadata, new
GroupSubscription(subscriptions)).groupAssignment();
+
+ assertThat(assignment.size(), equalTo(2));
+
+ assertThat(assignment.get(c1).partitions(), equalTo(asList(t1p0,
t1p2)));
+ assertThat(
+ AssignmentInfo.decode(assignment.get(c1).userData()),
+ equalTo(new AssignmentInfo(
+ Arrays.asList(task0_0, task0_2),
+ Collections.emptyMap(),
+ Collections.emptyMap()
+ )));
+
+
+ assertThat(assignment.get(c2).partitions(),
equalTo(Collections.singletonList(t1p1)));
+ assertThat(
+ AssignmentInfo.decode(assignment.get(c2).userData()),
+ equalTo(new AssignmentInfo(
+ Collections.singletonList(task0_1),
+ Collections.emptyMap(),
+ Collections.emptyMap()
+ )));
}
@Test
@@ -1334,4 +1680,21 @@ public class StreamsPartitionAssignorTest {
return info;
}
+
+ private void assertEquivalentAssignment(final Map<String, List<TaskId>>
thisAssignment,
+ final Map<String, List<TaskId>>
otherAssignment) {
+ assertEquals(thisAssignment.size(), otherAssignment.size());
+ for (final Map.Entry<String, List<TaskId>> entry :
thisAssignment.entrySet()) {
+ final String consumer = entry.getKey();
+ assertTrue(otherAssignment.containsKey(consumer));
+
+ final List<TaskId> thisTaskList = entry.getValue();
+ Collections.sort(thisTaskList);
+ final List<TaskId> otherTaskList = otherAssignment.get(consumer);
+ Collections.sort(otherTaskList);
+
+ assertThat(thisTaskList, equalTo(otherTaskList));
+ }
+ }
+
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 7e1ca7f..e46a4cc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -42,7 +42,6 @@ import org.junit.runner.RunWith;
import java.io.File;
import java.io.IOException;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -245,7 +244,8 @@ public class TaskManagerTest {
@Test
public void
shouldCloseActiveUnAssignedSuspendedTasksWhenClosingRevokedTasks() {
mockSingleActiveTask();
-
EasyMock.expect(active.closeNotAssignedSuspendedTasks(taskId0Assignment.keySet())).andReturn(null).once();
+
+
expect(active.closeNotAssignedSuspendedTasks(taskId0Assignment.keySet())).andReturn(null).once();
expect(restoreConsumer.assignment()).andReturn(Collections.emptySet());
replay();
@@ -281,6 +281,7 @@ public class TaskManagerTest {
// Need to call this twice so task manager doesn't consider all
partitions "new"
taskManager.setAssignmentMetadata(taskId0Assignment,
Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.setAssignmentMetadata(taskId0Assignment,
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
taskManager.setPartitionsToTaskId(taskId0PartitionToTaskId);
taskManager.createTasks(taskId0Partitions);
@@ -404,9 +405,7 @@ public class TaskManagerTest {
@Test
public void shouldInitializeNewActiveTasks() {
-
EasyMock.expect(restoreConsumer.assignment()).andReturn(Collections.emptySet()).once();
-
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions).once();
-
active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
+ active.initializeNewTasks();
expectLastCall();
replay();
@@ -416,9 +415,7 @@ public class TaskManagerTest {
@Test
public void shouldInitializeNewStandbyTasks() {
-
EasyMock.expect(restoreConsumer.assignment()).andReturn(Collections.emptySet()).once();
-
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions).once();
-
active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
+ standby.initializeNewTasks();
expectLastCall();
replay();
@@ -428,6 +425,7 @@ public class TaskManagerTest {
@Test
public void shouldRestoreStateFromChangeLogReader() {
+ EasyMock.expect(active.hasRestoringTasks()).andReturn(true).once();
EasyMock.expect(restoreConsumer.assignment()).andReturn(taskId0Partitions).once();
expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
active.updateRestored(taskId0Partitions);
@@ -440,11 +438,9 @@ public class TaskManagerTest {
@Test
public void shouldResumeRestoredPartitions() {
-
EasyMock.expect(restoreConsumer.assignment()).andReturn(taskId0Partitions).once();
- expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
- expect(active.allTasksRunning()).andReturn(true);
+ expect(active.allTasksRunning()).andReturn(true).once();
expect(consumer.assignment()).andReturn(taskId0Partitions);
-
expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
+ expect(standby.running()).andReturn(Collections.emptySet());
consumer.resume(taskId0Partitions);
expectLastCall();
@@ -666,6 +662,7 @@ public class TaskManagerTest {
}
private void mockAssignStandbyPartitions(final long offset) {
+ expect(active.hasRestoringTasks()).andReturn(true).once();
final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
expect(active.allTasksRunning()).andReturn(true);
expect(standby.running()).andReturn(Collections.singletonList(task));
@@ -679,13 +676,6 @@ public class TaskManagerTest {
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions).once();
}
- private void mockStandbyTaskExpectations() {
- expect(standbyTaskCreator.createTasks(EasyMock.<Consumer<byte[],
byte[]>>anyObject(),
-
EasyMock.eq(taskId0Assignment)))
- .andReturn(Collections.singletonList(standbyTask));
-
- }
-
private void mockSingleActiveTask() {
expect(activeTaskCreator.createTasks(EasyMock.<Consumer<byte[],
byte[]>>anyObject(),
EasyMock.eq(taskId0Assignment)))
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
index dc54c86..1443edf 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
@@ -70,8 +70,8 @@ public class ClientStateTest {
final TaskId tid1 = new TaskId(0, 1);
final TaskId tid2 = new TaskId(0, 2);
- client.addPreviousActiveTasks("consumer", Utils.mkSet(tid1, tid2));
- assertThat(client.previousActiveTasks(), equalTo(Utils.mkSet(tid1,
tid2)));
+ client.addPreviousActiveTasks(Utils.mkSet(tid1, tid2));
+ assertThat(client.prevActiveTasks(), equalTo(Utils.mkSet(tid1, tid2)));
assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1,
tid2)));
}
@@ -80,8 +80,8 @@ public class ClientStateTest {
final TaskId tid1 = new TaskId(0, 1);
final TaskId tid2 = new TaskId(0, 2);
- client.addPreviousStandbyTasks("consumer", Utils.mkSet(tid1, tid2));
- assertThat(client.previousActiveTasks().size(), equalTo(0));
+ client.addPreviousStandbyTasks(Utils.mkSet(tid1, tid2));
+ assertThat(client.prevActiveTasks().size(), equalTo(0));
assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1,
tid2)));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 19d7730..17d403f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -207,11 +207,11 @@ public class StickyTaskAssignorTest {
@Test
public void shouldAssignTasksToClientWithPreviousStandbyTasks() {
final ClientState client1 = createClient(p1, 1);
- client1.addPreviousStandbyTasks("consumer", Utils.mkSet(task02));
+ client1.addPreviousStandbyTasks(Utils.mkSet(task02));
final ClientState client2 = createClient(p2, 1);
- client2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
+ client2.addPreviousStandbyTasks(Utils.mkSet(task01));
final ClientState client3 = createClient(p3, 1);
- client3.addPreviousStandbyTasks("consumer", Utils.mkSet(task00));
+ client3.addPreviousStandbyTasks(Utils.mkSet(task00));
final StickyTaskAssignor taskAssignor = createTaskAssignor(task00,
task01, task02);
@@ -225,9 +225,9 @@ public class StickyTaskAssignorTest {
@Test
public void
shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() {
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1,
task00);
- c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
+ c1.addPreviousStandbyTasks(Utils.mkSet(task01));
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 2,
task02);
- c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
+ c2.addPreviousStandbyTasks(Utils.mkSet(task01));
final StickyTaskAssignor taskAssignor = createTaskAssignor(task00,
task01, task02);
@@ -455,9 +455,9 @@ public class StickyTaskAssignorTest {
@Test
public void
shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() {
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1,
task01, task02);
- c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task00));
+ c1.addPreviousStandbyTasks(Utils.mkSet(task03, task00));
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1,
task03, task00);
- c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task02));
+ c2.addPreviousStandbyTasks(Utils.mkSet(task01, task02));
createClient(p3, 1);
createClient(p4, 1);
@@ -577,14 +577,14 @@ public class StickyTaskAssignorTest {
final TaskId task23 = new TaskId(2, 3);
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1,
task01, task12, task13);
- c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, task11,
task20, task21, task23));
+ c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21,
task23));
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1,
task00, task11, task22);
- c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10,
task02, task20, task03, task12, task21, task13, task23));
+ c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20,
task03, task12, task21, task13, task23));
final ClientState c3 = createClientWithPreviousActiveTasks(p3, 1,
task20, task21, task23);
- c3.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, task12));
+ c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12));
final ClientState newClient = createClient(p4, 1);
- newClient.addPreviousStandbyTasks("consumer", Utils.mkSet(task00,
task10, task01, task02, task11, task20, task03, task12, task21, task13, task22,
task23));
+ newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01,
task02, task11, task20, task03, task12, task21, task13, task22, task23));
final StickyTaskAssignor<Integer> taskAssignor =
createTaskAssignor(task00, task10, task01, task02, task11, task20, task03,
task12, task21, task13, task22, task23);
taskAssignor.assign(0);
@@ -607,15 +607,15 @@ public class StickyTaskAssignorTest {
final TaskId task23 = new TaskId(2, 3);
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1,
task01, task12, task13);
- c1.addPreviousStandbyTasks("c1onsumer", Utils.mkSet(task00, task11,
task20, task21, task23));
+ c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21,
task23));
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1,
task00, task11, task22);
- c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10,
task02, task20, task03, task12, task21, task13, task23));
+ c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20,
task03, task12, task21, task13, task23));
final ClientState bounce1 = createClient(p3, 1);
- bounce1.addPreviousStandbyTasks("consumer", Utils.mkSet(task20,
task21, task23));
+ bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23));
final ClientState bounce2 = createClient(p4, 1);
- bounce2.addPreviousStandbyTasks("consumer", Utils.mkSet(task02,
task03, task10));
+ bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10));
final StickyTaskAssignor<Integer> taskAssignor =
createTaskAssignor(task00, task10, task01, task02, task11, task20, task03,
task12, task21, task13, task22, task23);
taskAssignor.assign(0);
@@ -658,7 +658,7 @@ public class StickyTaskAssignorTest {
final TaskId task06 = new TaskId(0, 6);
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1,
task00, task01, task02, task06);
final ClientState c2 = createClient(p2, 1);
- c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task04,
task05));
+ c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05));
final ClientState newClient = createClient(p3, 1);
final StickyTaskAssignor<Integer> taskAssignor =
createTaskAssignor(task00, task01, task02, task03, task04, task05, task06);
@@ -705,7 +705,7 @@ public class StickyTaskAssignorTest {
private ClientState createClientWithPreviousActiveTasks(final Integer
processId, final int capacity, final TaskId... taskIds) {
final ClientState clientState = new ClientState(capacity);
- clientState.addPreviousActiveTasks("consumer", Utils.mkSet(taskIds));
+ clientState.addPreviousActiveTasks(Utils.mkSet(taskIds));
clients.put(processId, clientState);
return clientState;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 98e6e8f..496f89a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -569,7 +569,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
if (entry.getValue().getLast().value().longValue() !=
expectedCount) {
- resultStream.println("fail: key=" + key + " tagg=" +
entry.getValue() + " expected=" + expected.get(key));
+ resultStream.println("fail: key=" + key + " tagg=" +
entry.getValue() + " expected=" + expectedCount);
resultStream.println("\t outputEvents: " +
entry.getValue());
return false;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 0e07cac..185fa7c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
@@ -61,6 +62,8 @@ import static
org.apache.kafka.streams.processor.internals.assignment.StreamsAss
public class StreamsUpgradeTest {
+ private static final RebalanceProtocol REBALANCE_PROTOCOL =
RebalanceProtocol.COOPERATIVE;
+
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 1) {
@@ -123,26 +126,26 @@ public class StreamsUpgradeTest {
// 1. Client UUID (a unique id assigned to an instance of
KafkaStreams)
// 2. Task ids of previously running tasks
// 3. Task ids of valid local states on the client's state
directory.
-
final TaskManager taskManager = taskManger();
- final Set<TaskId> previousActiveTasks =
taskManager.previousRunningTaskIds();
+
final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
- standbyTasks.removeAll(previousActiveTasks);
- final FutureSubscriptionInfo data = new FutureSubscriptionInfo(
+ final Set<TaskId> activeTasks = prepareForSubscription(taskManager,
+ topics,
+
standbyTasks,
+
REBALANCE_PROTOCOL);
+ return new FutureSubscriptionInfo(
usedSubscriptionMetadataVersion,
LATEST_SUPPORTED_VERSION + 1,
taskManager.processId(),
- previousActiveTasks,
+ activeTasks,
standbyTasks,
- userEndPoint());
-
- taskManager.updateSubscriptionsFromMetadata(topics);
-
- return data.encode();
+ userEndPoint())
+ .encode();
}
@Override
- public void onAssignment(final ConsumerPartitionAssignor.Assignment
assignment, final ConsumerGroupMetadata metadata) {
+ public void onAssignment(final ConsumerPartitionAssignor.Assignment
assignment,
+ final ConsumerGroupMetadata metadata) {
try {
super.onAssignment(assignment, metadata);
return;
@@ -193,6 +196,7 @@ public class StreamsUpgradeTest {
taskManager.setPartitionsToTaskId(partitionsToTaskId);
taskManager.setAssignmentMetadata(activeTasks,
info.standbyTasks());
taskManager.updateSubscriptionsFromAssignment(partitions);
+ taskManager.setRebalanceInProgress(false);
}
@Override
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py
b/tests/kafkatest/tests/streams/streams_eos_test.py
index 7e5cc26..428db9b 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -159,7 +159,7 @@ class StreamsEosTest(KafkaTest):
def wait_for_startup(self, monitor, processor):
self.wait_for(monitor, processor, "StateChange: REBALANCING ->
RUNNING")
- self.wait_for(monitor, processor, "processed 500 records from topic")
+ self.wait_for(monitor, processor, "processed [0-9]* records from
topic")
def wait_for(self, monitor, processor, output):
monitor.wait_until(output,