This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4b03d67 MINOR: Break up StreamsPartitionAssignor's gargantuan #assign
(#8245)
4b03d67 is described below
commit 4b03d67e106c6d3b9dd465a308a62b400ead70a4
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Sat Mar 7 08:46:38 2020 -0800
MINOR: Break up StreamsPartitionAssignor's gargantuan #assign (#8245)
Just a minor refactoring of StreamsPartitionAssignor's endless assign
method into logical chunks to hopefully improve readability. No logical
changes, literally just moving code around and adding docs.
The hope is to make it easier to write and review KIP-441 PRs that dig into
the assignment logic.
Reviewers: Guozhang Wang <[email protected]>
---
.../internals/StreamsPartitionAssignor.java | 434 ++++++++++++++-------
1 file changed, 287 insertions(+), 147 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 639509f..e858e86 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -153,6 +153,8 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
}
+ // keep track of any future consumers in a "dummy" Client since we can't
decipher their subscription
+ private static final UUID FUTURE_ID = randomUUID();
protected static final Comparator<TopicPartition> PARTITION_COMPARATOR =
Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition);
@@ -231,10 +233,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
private Map<String, Assignment> errorAssignment(final Map<UUID,
ClientMetadata> clientsMetadata,
- final String topic,
final int errorCode) {
- log.error("{} is unknown yet during rebalance," +
- " please make sure they have been pre-created before starting the
Streams application.", topic);
final Map<String, Assignment> assignment = new HashMap<>();
for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
for (final String consumerId : clientMetadata.consumers) {
@@ -275,14 +274,14 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
@Override
public GroupAssignment assign(final Cluster metadata, final
GroupSubscription groupSubscription) {
final Map<String, Subscription> subscriptions =
groupSubscription.groupSubscription();
+
+ // ---------------- Step Zero ---------------- //
+
// construct the client metadata from the decoded subscription info
+
final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>();
final Set<TopicPartition> allOwnedPartitions = new HashSet<>();
- // keep track of any future consumers in a "dummy" Client since we
can't decipher their subscription
- final UUID futureId = randomUUID();
- final ClientMetadata futureClient = new ClientMetadata(null);
-
int minReceivedMetadataVersion = LATEST_SUPPORTED_VERSION;
int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
@@ -299,9 +298,9 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
final UUID processId;
if (usedVersion > LATEST_SUPPORTED_VERSION) {
futureMetadataVersion = usedVersion;
- processId = futureId;
- if (!clientMetadataMap.containsKey(futureId)) {
- clientMetadataMap.put(futureId, futureClient);
+ processId = FUTURE_ID;
+ if (!clientMetadataMap.containsKey(FUTURE_ID)) {
+ clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null));
}
} else {
processId = info.processId();
@@ -321,13 +320,107 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
clientMetadata.addPreviousTasks(info);
}
+ final boolean versionProbing =
+ checkMetadataVersions(minReceivedMetadataVersion,
minSupportedMetadataVersion, futureMetadataVersion);
+
+ log.debug("Constructed client metadata {} from the member
subscriptions.", clientMetadataMap);
+
+ // ---------------- Step One ---------------- //
+
+ // parse the topology to determine the repartition source topics,
+ // making sure they are created with the number of partitions as
+ // the maximum of the depending sub-topologies source topics' number
of partitions
+ final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups =
taskManager.builder().topicGroups();
+
+ final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions;
+ try {
+ allRepartitionTopicPartitions =
prepareRepartitionTopics(topicGroups, metadata);
+ } catch (final TaskAssignmentException e) {
+ return new GroupAssignment(
+ errorAssignment(clientMetadataMap,
+ AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+ );
+ }
+
+ final Cluster fullMetadata =
metadata.withPartitions(allRepartitionTopicPartitions);
+
+ log.debug("Created repartition topics {} from the parsed topology.",
allRepartitionTopicPartitions.values());
+
+ // ---------------- Step Two ---------------- //
+
+ // construct the assignment of tasks to clients
+
+ final Set<String> allSourceTopics = new HashSet<>();
+ final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
+ for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo>
entry : topicGroups.entrySet()) {
+ allSourceTopics.addAll(entry.getValue().sourceTopics);
+ sourceTopicsByGroup.put(entry.getKey(),
entry.getValue().sourceTopics);
+ }
+
+ // get the tasks as partition groups from the partition grouper
+ final Map<TaskId, Set<TopicPartition>> partitionsForTask =
+ partitionGrouper.partitionGroups(sourceTopicsByGroup,
fullMetadata);
+
+
+ assignTasksToClients(allSourceTopics, partitionsForTask, topicGroups,
clientMetadataMap, fullMetadata);
+
+ // ---------------- Step Three ---------------- //
+
+ // construct the global partition assignment per host map
+
+ final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new
HashMap<>();
+ final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new
HashMap<>();
+ if (minReceivedMetadataVersion >= 2) {
+ populatePartitionsByHostMaps(partitionsByHost,
standbyPartitionsByHost, partitionsForTask, clientMetadataMap);
+ }
+ streamsMetadataState.onChange(partitionsByHost,
standbyPartitionsByHost, fullMetadata);
+
+ // ---------------- Step Four ---------------- //
+
+ // compute the assignment of tasks to threads within each client and
build the final group assignment
+
+ final Map<String, Assignment> assignment;
+ if (versionProbing) {
+ assignment = versionProbingAssignment(
+ clientMetadataMap,
+ partitionsForTask,
+ partitionsByHost,
+ standbyPartitionsByHost,
+ allOwnedPartitions,
+ minReceivedMetadataVersion,
+ minSupportedMetadataVersion
+ );
+ } else {
+ assignment = computeNewAssignment(
+ clientMetadataMap,
+ partitionsForTask,
+ partitionsByHost,
+ standbyPartitionsByHost,
+ allOwnedPartitions,
+ minReceivedMetadataVersion,
+ minSupportedMetadataVersion
+ );
+ }
+
+ return new GroupAssignment(assignment);
+ }
+
+ /**
+ * Verify the subscription versions are within the expected bounds and
check for version probing.
+ *
+ * @return whether this was a version probing rebalance
+ */
+ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
+ final int
minSupportedMetadataVersion,
+ final int futureMetadataVersion) {
final boolean versionProbing;
+
if (futureMetadataVersion == UNKNOWN) {
versionProbing = false;
} else if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
versionProbing = true;
log.info("Received a future (version probing) subscription
(version: {})."
- + " Sending assignment back (with supported version {}).",
+ + " Sending assignment back (with supported version
{}).",
futureMetadataVersion,
minSupportedMetadataVersion);
@@ -349,35 +442,76 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
minSupportedMetadataVersion,
LATEST_SUPPORTED_VERSION);
}
+ return versionProbing;
+ }
- log.debug("Constructed client metadata {} from the member
subscriptions.", clientMetadataMap);
-
- // ---------------- Step Zero ---------------- //
-
- // parse the topology to determine the repartition source topics,
- // making sure they are created with the number of partitions as
- // the maximum of the depending sub-topologies source topics' number
of partitions
- final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups =
- taskManager.builder().topicGroups();
-
+ /**
+ * @return a map of repartition topics and their metadata
+ * @throws TaskAssignmentException if there is incomplete source topic
metadata due to missing source topic(s)
+ */
+ private Map<String, InternalTopicConfig>
computeRepartitionTopicMetadata(final Map<Integer,
InternalTopologyBuilder.TopicsInfo> topicGroups,
+
final Cluster metadata) throws TaskAssignmentException {
final Map<String, InternalTopicConfig> repartitionTopicMetadata = new
HashMap<>();
for (final InternalTopologyBuilder.TopicsInfo topicsInfo :
topicGroups.values()) {
for (final String topic : topicsInfo.sourceTopics) {
if
(!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
- !metadata.topics().contains(topic)) {
- log.error("Missing source topic {} during assignment.
Returning error {}.",
- topic,
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
- return new GroupAssignment(
- errorAssignment(clientMetadataMap, topic,
-
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
- );
+ !metadata.topics().contains(topic)) {
+ log.error("Source topic {} is missing/unknown during
rebalance, please make sure all source topics " +
+ "have been pre-created before starting the
Streams application. Returning error {}",
+ topic,
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
+ throw new TaskAssignmentException("Missing source topic
during assignment.");
}
}
for (final InternalTopicConfig topic :
topicsInfo.repartitionSourceTopics.values()) {
repartitionTopicMetadata.put(topic.name(), topic);
}
}
+ return repartitionTopicMetadata;
+ }
+ /**
+ * Computes and assembles all repartition topic metadata then creates the
topics if necessary.
+ *
+ * @return map from repartition topic to its partition info
+ */
+ private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final
Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups,
+
final Cluster metadata) {
+ final Map<String, InternalTopicConfig> repartitionTopicMetadata =
computeRepartitionTopicMetadata(topicGroups, metadata);
+
+
setRepartitionTopicMetadataNumberOfPartitions(repartitionTopicMetadata,
topicGroups, metadata);
+
+ // ensure the co-partitioning topics within the group have the same
number of partitions,
+ // and enforce the number of partitions for those repartition topics
to be the same if they
+ // are co-partitioned as well.
+ ensureCopartitioning(taskManager.builder().copartitionGroups(),
repartitionTopicMetadata, metadata);
+
+ // make sure the repartition source topics exist with the right number
of partitions,
+ // create these topics if necessary
+ prepareTopic(repartitionTopicMetadata);
+
+ // augment the metadata with the newly computed number of partitions
for all the
+ // repartition source topics
+ final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions
= new HashMap<>();
+ for (final Map.Entry<String, InternalTopicConfig> entry :
repartitionTopicMetadata.entrySet()) {
+ final String topic = entry.getKey();
+ final int numPartitions =
entry.getValue().numberOfPartitions().orElse(-1);
+
+ for (int partition = 0; partition < numPartitions; partition++) {
+ allRepartitionTopicPartitions.put(
+ new TopicPartition(topic, partition),
+ new PartitionInfo(topic, partition, null, new Node[0], new
Node[0])
+ );
+ }
+ }
+ return allRepartitionTopicPartitions;
+ }
+
+ /**
+ * Computes the number of partitions and sets it for each repartition
topic in repartitionTopicMetadata
+ */
+ private void setRepartitionTopicMetadataNumberOfPartitions(final
Map<String, InternalTopicConfig> repartitionTopicMetadata,
+ final
Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups,
+ final Cluster
metadata) {
boolean numPartitionsNeeded;
do {
numPartitionsNeeded = false;
@@ -385,7 +519,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
for (final InternalTopologyBuilder.TopicsInfo topicsInfo :
topicGroups.values()) {
for (final String topicName :
topicsInfo.repartitionSourceTopics.keySet()) {
final Optional<Integer> maybeNumPartitions =
repartitionTopicMetadata.get(topicName)
- .numberOfPartitions();
+
.numberOfPartitions();
Integer numPartitions = null;
if (!maybeNumPartitions.isPresent()) {
@@ -437,53 +571,25 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
}
} while (numPartitionsNeeded);
+ }
- // ensure the co-partitioning topics within the group have the same
number of partitions,
- // and enforce the number of partitions for those repartition topics
to be the same if they
- // are co-partitioned as well.
- ensureCopartitioning(taskManager.builder().copartitionGroups(),
repartitionTopicMetadata, metadata);
-
- // make sure the repartition source topics exist with the right number
of partitions,
- // create these topics if necessary
- prepareTopic(repartitionTopicMetadata);
-
- // augment the metadata with the newly computed number of partitions
for all the
- // repartition source topics
- final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions
= new HashMap<>();
- for (final Map.Entry<String, InternalTopicConfig> entry :
repartitionTopicMetadata.entrySet()) {
- final String topic = entry.getKey();
- final int numPartitions =
entry.getValue().numberOfPartitions().orElse(-1);
-
- for (int partition = 0; partition < numPartitions; partition++) {
- allRepartitionTopicPartitions.put(
- new TopicPartition(topic, partition),
- new PartitionInfo(topic, partition, null, new Node[0], new
Node[0])
- );
- }
- }
-
- final Cluster fullMetadata =
metadata.withPartitions(allRepartitionTopicPartitions);
-
- log.debug("Created repartition topics {} from the parsed topology.",
allRepartitionTopicPartitions.values());
-
- // ---------------- Step One ---------------- //
-
- // get the tasks as partition groups from the partition grouper
- final Set<String> allSourceTopics = new HashSet<>();
- final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
- for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo>
entry : topicGroups.entrySet()) {
- allSourceTopics.addAll(entry.getValue().sourceTopics);
- sourceTopicsByGroup.put(entry.getKey(),
entry.getValue().sourceTopics);
- }
-
- final Map<TaskId, Set<TopicPartition>> partitionsForTask =
- partitionGrouper.partitionGroups(sourceTopicsByGroup,
fullMetadata);
-
- final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
-
+ /**
+ * Populates the taskForPartition and tasksForTopicGroup maps, and checks
that partitions are assigned to exactly
+ * one task.
+ *
+ * @param taskForPartition a map from partition to the corresponding task.
Populated here.
+ * @param tasksForTopicGroup a map from the topicGroupId to the set of
corresponding tasks. Populated here.
+ * @param allSourceTopics a set of all source topics in the topology
+ * @param partitionsForTask a map from task to the set of input partitions
+ * @param fullMetadata the cluster metadata
+ */
+ private void populateTasksForMaps(final Map<TopicPartition, TaskId>
taskForPartition,
+ final Map<Integer, Set<TaskId>>
tasksForTopicGroup,
+ final Set<String> allSourceTopics,
+ final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
+ final Cluster fullMetadata) {
// check if all partitions are assigned, and there are no duplicates
of partitions in multiple tasks
final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
- final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> entry :
partitionsForTask.entrySet()) {
final TaskId id = entry.getKey();
final Set<TopicPartition> partitions = entry.getValue();
@@ -496,8 +602,17 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
allAssignedPartitions.addAll(partitions);
- tasksByTopicGroup.computeIfAbsent(id.topicGroupId, k -> new
HashSet<>()).add(id);
+ tasksForTopicGroup.computeIfAbsent(id.topicGroupId, k -> new
HashSet<>()).add(id);
}
+
+ checkAllPartitions(allSourceTopics, partitionsForTask,
allAssignedPartitions, fullMetadata);
+ }
+
+ // Logs a warning if any partitions are not assigned to a task, or a task
has no assigned partitions
+ private void checkAllPartitions(final Set<String> allSourceTopics,
+ final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
+ final Set<TopicPartition>
allAssignedPartitions,
+ final Cluster fullMetadata) {
for (final String topic : allSourceTopics) {
final List<PartitionInfo> partitionInfoList =
fullMetadata.partitionsForTopic(topic);
if (partitionInfoList.isEmpty()) {
@@ -508,17 +623,24 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
partitionInfo.partition());
if (!allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is not assigned to any tasks:
{}"
- + " Possible causes of a partition not getting
assigned"
- + " is that another topic defined in the
topology has not been"
- + " created when starting your streams
application,"
- + " resulting in no tasks created for this
topology at all.", partition,
+ + " Possible causes of a partition not
getting assigned"
+ + " is that another topic defined in the
topology has not been"
+ + " created when starting your streams
application,"
+ + " resulting in no tasks created for
this topology at all.", partition,
partitionsForTask);
}
}
}
}
+ }
- // We only create a standby for tasks that are stateful and have at
least one changelog
+ /**
+ * Resolve changelog topic metadata and create them if necessary.
+ *
+ * @return set of standby task ids (any task that is stateful and has
logging enabled)
+ */
+ private Set<TaskId> prepareChangelogTopics(final Map<Integer,
InternalTopologyBuilder.TopicsInfo> topicGroups,
+ final Map<Integer, Set<TaskId>>
tasksForTopicGroup) {
final Set<TaskId> standbyTaskIds = new HashSet<>();
// add tasks to state change log topic subscribers
@@ -527,7 +649,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
final int topicGroupId = entry.getKey();
final InternalTopologyBuilder.TopicsInfo topicsInfo =
entry.getValue();
- final Set<TaskId> topicGroupTasks =
tasksByTopicGroup.get(topicGroupId);
+ final Set<TaskId> topicGroupTasks =
tasksForTopicGroup.get(topicGroupId);
if (topicGroupTasks == null) {
log.debug("No tasks found for topic group {}", topicGroupId);
continue;
@@ -551,10 +673,23 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
prepareTopic(changelogTopicMetadata);
-
log.debug("Created state changelog topics {} from the parsed
topology.", changelogTopicMetadata.values());
+ return standbyTaskIds;
+ }
- // ---------------- Step Two ---------------- //
+ /**
+ * Assigns a set of tasks to each client (Streams instance) using the
sticky assignor
+ */
+ private void assignTasksToClients(final Set<String> allSourceTopics,
+ final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
+ final Map<Integer,
InternalTopologyBuilder.TopicsInfo> topicGroups,
+ final Map<UUID, ClientMetadata>
clientMetadataMap,
+ final Cluster fullMetadata) {
+ final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
+ final Map<Integer, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
+ populateTasksForMaps(taskForPartition, tasksForTopicGroup,
allSourceTopics, partitionsForTask, fullMetadata);
+
+ final Set<TaskId> standbyTaskIds = prepareChangelogTopics(topicGroups,
tasksForTopicGroup);
final Map<UUID, ClientState> states = new HashMap<>();
for (final Map.Entry<UUID, ClientMetadata> entry :
clientMetadataMap.entrySet()) {
@@ -565,7 +700,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
// there are two cases where we need to construct the prevTasks
from the ownedPartitions:
// 1) COOPERATIVE clients on version 2.4-2.5 do not encode active
tasks and rely on ownedPartitions instead
// 2) future client during version probing, when we can't decode
the future subscription info's prev tasks
- if (!state.ownedPartitions().isEmpty() && (uuid == futureId ||
state.prevActiveTasks().isEmpty())) {
+ if (!state.ownedPartitions().isEmpty() && (uuid == FUTURE_ID ||
state.prevActiveTasks().isEmpty())) {
final Set<TaskId> previousActiveTasks = new HashSet<>();
for (final Map.Entry<TopicPartition, String> partitionEntry :
state.ownedPartitions().entrySet()) {
final TopicPartition tp = partitionEntry.getKey();
@@ -589,64 +724,49 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
taskAssignor.assign(numStandbyReplicas);
log.info("Assigned tasks to clients as {}{}.", Utils.NL,
states.entrySet().stream()
- .map(Map.Entry::toString).collect(Collectors.joining(Utils.NL)));
-
- // ---------------- Step Three ---------------- //
-
- // construct the global partition assignment per host map
- final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new
HashMap<>();
- final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new
HashMap<>();
- if (minReceivedMetadataVersion >= 2) {
- for (final Map.Entry<UUID, ClientMetadata> entry :
clientMetadataMap.entrySet()) {
- final HostInfo hostInfo = entry.getValue().hostInfo;
+
.map(Map.Entry::toString).collect(Collectors.joining(Utils.NL)));
+ }
- // if application server is configured, also include host
state map
- if (hostInfo != null) {
- final Set<TopicPartition> topicPartitions = new
HashSet<>();
- final Set<TopicPartition> standbyPartitions = new
HashSet<>();
- final ClientState state = entry.getValue().state;
+ /**
+ * Populates the global partitionsByHost and standbyPartitionsByHost maps
that are sent to each member
+ *
+ * @param partitionsByHost a map from host to the set of partitions hosted
there. Populated here.
+ * @param standbyPartitionsByHost a map from host to the set of standby
partitions hosted there. Populated here.
+ * @param partitionsForTask a map from task to its set of assigned
partitions
+ * @param clientMetadataMap a map from client to its metadata and state
+ */
+ private void populatePartitionsByHostMaps(final Map<HostInfo,
Set<TopicPartition>> partitionsByHost,
+ final Map<HostInfo,
Set<TopicPartition>> standbyPartitionsByHost,
+ final Map<TaskId,
Set<TopicPartition>> partitionsForTask,
+ final Map<UUID, ClientMetadata>
clientMetadataMap) {
+ for (final Map.Entry<UUID, ClientMetadata> entry :
clientMetadataMap.entrySet()) {
+ final HostInfo hostInfo = entry.getValue().hostInfo;
- for (final TaskId id : state.activeTasks()) {
- topicPartitions.addAll(partitionsForTask.get(id));
- }
+ // if application server is configured, also include host state map
+ if (hostInfo != null) {
+ final Set<TopicPartition> topicPartitions = new HashSet<>();
+ final Set<TopicPartition> standbyPartitions = new HashSet<>();
+ final ClientState state = entry.getValue().state;
- for (final TaskId id : state.standbyTasks()) {
- standbyPartitions.addAll(partitionsForTask.get(id));
- }
+ for (final TaskId id : state.activeTasks()) {
+ topicPartitions.addAll(partitionsForTask.get(id));
+ }
- partitionsByHost.put(hostInfo, topicPartitions);
- standbyPartitionsByHost.put(hostInfo, standbyPartitions);
+ for (final TaskId id : state.standbyTasks()) {
+ standbyPartitions.addAll(partitionsForTask.get(id));
}
- }
- }
- streamsMetadataState.onChange(partitionsByHost,
standbyPartitionsByHost, fullMetadata);
- final Map<String, Assignment> assignment;
- if (versionProbing) {
- assignment = versionProbingAssignment(
- clientMetadataMap,
- partitionsForTask,
- partitionsByHost,
- standbyPartitionsByHost,
- allOwnedPartitions,
- minReceivedMetadataVersion,
- minSupportedMetadataVersion
- );
- } else {
- assignment = computeNewAssignment(
- clientMetadataMap,
- partitionsForTask,
- partitionsByHost,
- standbyPartitionsByHost,
- allOwnedPartitions,
- minReceivedMetadataVersion,
- minSupportedMetadataVersion
- );
+ partitionsByHost.put(hostInfo, topicPartitions);
+ standbyPartitionsByHost.put(hostInfo, standbyPartitions);
+ }
}
-
- return new GroupAssignment(assignment);
}
+ /**
+ * Computes the assignment of tasks to threads within each client and
assembles the final assignment to send out.
+ *
+ * @return the final assignment for each StreamThread consumer
+ */
private Map<String, Assignment> computeNewAssignment(final Map<UUID,
ClientMetadata> clientsMetadata,
final Map<TaskId,
Set<TopicPartition>> partitionsForTask,
final Map<HostInfo,
Set<TopicPartition>> partitionsByHostState,
@@ -695,6 +815,13 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
return assignment;
}
+ /**
+ * Computes the assignment of tasks to threads within each client and
assembles the final assignment to send out,
+ * in the special case of version probing where some members are on
different versions and have sent different
+ * subscriptions.
+ *
+ * @return the final assignment for each StreamThread consumer
+ */
private Map<String, Assignment> versionProbingAssignment(final Map<UUID,
ClientMetadata> clientsMetadata,
final Map<TaskId,
Set<TopicPartition>> partitionsForTask,
final
Map<HostInfo, Set<TopicPartition>> partitionsByHost,
@@ -732,6 +859,9 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
return assignment;
}
+ /**
+ * Adds the encoded assignment for each StreamThread consumer in the
client to the overall assignment map
+ */
private void addClientAssignments(final Map<String, Assignment> assignment,
final ClientMetadata clientMetadata,
final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
@@ -747,17 +877,19 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
for (final String consumer : clientMetadata.consumers) {
final List<TaskId> activeTasksForConsumer =
activeTaskAssignments.get(consumer);
- // These will be filled in by
buildAssignedActiveTaskAndPartitionsList below
+ // These will be filled in by populateActiveTaskAndPartitionsLists
below
final List<TopicPartition> activePartitionsList = new
ArrayList<>();
final List<TaskId> assignedActiveList = new ArrayList<>();
- buildAssignedActiveTaskAndPartitionsList(consumer,
- clientMetadata.state,
- activeTasksForConsumer,
- partitionsForTask,
- allOwnedPartitions,
- activePartitionsList,
- assignedActiveList);
+ populateActiveTaskAndPartitionsLists(
+ activePartitionsList,
+ assignedActiveList,
+ consumer,
+ clientMetadata.state,
+ activeTasksForConsumer,
+ partitionsForTask,
+ allOwnedPartitions
+ );
final Map<TaskId, Set<TopicPartition>> standbyTaskMap =
buildStandbyTaskMap(standbyTaskAssignments.get(consumer),
partitionsForTask);
@@ -781,13 +913,18 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
}
- private void buildAssignedActiveTaskAndPartitionsList(final String
consumer,
- final ClientState
clientState,
- final List<TaskId>
activeTasksForConsumer,
- final Map<TaskId,
Set<TopicPartition>> partitionsForTask,
- final
Set<TopicPartition> allOwnedPartitions,
- final
List<TopicPartition> activePartitionsList,
- final List<TaskId>
assignedActiveList) {
+ /**
+ * Populates the lists of active tasks and active task partitions for the
consumer with a 1:1 mapping between them
+ * such that the nth task corresponds to the nth partition in the list.
This means tasks with multiple partitions
+ * will be repeated in the list.
+ */
+ private void populateActiveTaskAndPartitionsLists(final
List<TopicPartition> activePartitionsList,
+ final List<TaskId>
assignedActiveList,
+ final String consumer,
+ final ClientState
clientState,
+ final List<TaskId>
activeTasksForConsumer,
+ final Map<TaskId,
Set<TopicPartition>> partitionsForTask,
+ final
Set<TopicPartition> allOwnedPartitions) {
final List<AssignedPartition> assignedPartitions = new ArrayList<>();
// Build up list of all assigned partition-task pairs
@@ -822,6 +959,9 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
}
+ /**
+ * @return map from task id to its assigned partitions for all standby
tasks
+ */
private static Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final
Collection<TaskId> standbys,
final
Map<TaskId, Set<TopicPartition>> partitionsForTask) {
final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new
HashMap<>();