This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4b03d67  MINOR: Break up StreamsPartitionAssignor's gargantuan #assign 
(#8245)
4b03d67 is described below

commit 4b03d67e106c6d3b9dd465a308a62b400ead70a4
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Sat Mar 7 08:46:38 2020 -0800

    MINOR: Break up StreamsPartitionAssignor's gargantuan #assign (#8245)
    
    Just a minor refactoring of StreamsPartitionAssignor's endless assign 
method into logical chunks to hopefully improve readability. No logical 
changes, literally just moving code around and adding docs.
    
    The hope is to make it easier to write and review KIP-441 PRs that dig into 
the assignment logic.
    
    Reviewers: Guozhang Wang <[email protected]>
---
 .../internals/StreamsPartitionAssignor.java        | 434 ++++++++++++++-------
 1 file changed, 287 insertions(+), 147 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 639509f..e858e86 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -153,6 +153,8 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         }
     }
 
+    // keep track of any future consumers in a "dummy" Client since we can't 
decipher their subscription
+    private static final UUID FUTURE_ID = randomUUID();
 
     protected static final Comparator<TopicPartition> PARTITION_COMPARATOR =
         
Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition);
@@ -231,10 +233,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
     }
 
     private Map<String, Assignment> errorAssignment(final Map<UUID, 
ClientMetadata> clientsMetadata,
-                                                    final String topic,
                                                     final int errorCode) {
-        log.error("{} is unknown yet during rebalance," +
-            " please make sure they have been pre-created before starting the 
Streams application.", topic);
         final Map<String, Assignment> assignment = new HashMap<>();
         for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
             for (final String consumerId : clientMetadata.consumers) {
@@ -275,14 +274,14 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
     @Override
     public GroupAssignment assign(final Cluster metadata, final 
GroupSubscription groupSubscription) {
         final Map<String, Subscription> subscriptions = 
groupSubscription.groupSubscription();
+
+        // ---------------- Step Zero ---------------- //
+
         // construct the client metadata from the decoded subscription info
+
         final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>();
         final Set<TopicPartition> allOwnedPartitions = new HashSet<>();
 
-        // keep track of any future consumers in a "dummy" Client since we 
can't decipher their subscription
-        final UUID futureId = randomUUID();
-        final ClientMetadata futureClient = new ClientMetadata(null);
-
         int minReceivedMetadataVersion = LATEST_SUPPORTED_VERSION;
         int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
 
@@ -299,9 +298,9 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             final UUID processId;
             if (usedVersion > LATEST_SUPPORTED_VERSION) {
                 futureMetadataVersion = usedVersion;
-                processId = futureId;
-                if (!clientMetadataMap.containsKey(futureId)) {
-                    clientMetadataMap.put(futureId, futureClient);
+                processId = FUTURE_ID;
+                if (!clientMetadataMap.containsKey(FUTURE_ID)) {
+                    clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null));
                 }
             } else {
                 processId = info.processId();
@@ -321,13 +320,107 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             clientMetadata.addPreviousTasks(info);
         }
 
+        final boolean versionProbing =
+            checkMetadataVersions(minReceivedMetadataVersion, 
minSupportedMetadataVersion, futureMetadataVersion);
+
+        log.debug("Constructed client metadata {} from the member 
subscriptions.", clientMetadataMap);
+
+        // ---------------- Step One ---------------- //
+
+        // parse the topology to determine the repartition source topics,
+        // making sure they are created with the number of partitions as
+        // the maximum of the depending sub-topologies source topics' number 
of partitions
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = 
taskManager.builder().topicGroups();
+
+        final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions;
+        try {
+            allRepartitionTopicPartitions = 
prepareRepartitionTopics(topicGroups, metadata);
+        } catch (final TaskAssignmentException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap,
+                    AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );
+        }
+
+        final Cluster fullMetadata = 
metadata.withPartitions(allRepartitionTopicPartitions);
+
+        log.debug("Created repartition topics {} from the parsed topology.", 
allRepartitionTopicPartitions.values());
+
+        // ---------------- Step Two ---------------- //
+
+        // construct the assignment of tasks to clients
+
+        final Set<String> allSourceTopics = new HashSet<>();
+        final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
+        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> 
entry : topicGroups.entrySet()) {
+            allSourceTopics.addAll(entry.getValue().sourceTopics);
+            sourceTopicsByGroup.put(entry.getKey(), 
entry.getValue().sourceTopics);
+        }
+
+        // get the tasks as partition groups from the partition grouper
+        final Map<TaskId, Set<TopicPartition>> partitionsForTask =
+            partitionGrouper.partitionGroups(sourceTopicsByGroup, 
fullMetadata);
+
+
+        assignTasksToClients(allSourceTopics, partitionsForTask, topicGroups, 
clientMetadataMap, fullMetadata);
+
+        // ---------------- Step Three ---------------- //
+
+        // construct the global partition assignment per host map
+
+        final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new 
HashMap<>();
+        final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new 
HashMap<>();
+        if (minReceivedMetadataVersion >= 2) {
+            populatePartitionsByHostMaps(partitionsByHost, 
standbyPartitionsByHost, partitionsForTask, clientMetadataMap);
+        }
+        streamsMetadataState.onChange(partitionsByHost, 
standbyPartitionsByHost, fullMetadata);
+
+        // ---------------- Step Four ---------------- //
+
+        // compute the assignment of tasks to threads within each client and 
build the final group assignment
+
+        final Map<String, Assignment> assignment;
+        if (versionProbing) {
+            assignment = versionProbingAssignment(
+                clientMetadataMap,
+                partitionsForTask,
+                partitionsByHost,
+                standbyPartitionsByHost,
+                allOwnedPartitions,
+                minReceivedMetadataVersion,
+                minSupportedMetadataVersion
+            );
+        } else {
+            assignment = computeNewAssignment(
+                clientMetadataMap,
+                partitionsForTask,
+                partitionsByHost,
+                standbyPartitionsByHost,
+                allOwnedPartitions,
+                minReceivedMetadataVersion,
+                minSupportedMetadataVersion
+            );
+        }
+
+        return new GroupAssignment(assignment);
+    }
+
+    /**
+     * Verify the subscription versions are within the expected bounds and 
check for version probing.
+     *
+     * @return whether this was a version probing rebalance
+     */
+    private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
+                                          final int 
minSupportedMetadataVersion,
+                                          final int futureMetadataVersion) {
         final boolean versionProbing;
+
         if (futureMetadataVersion == UNKNOWN) {
             versionProbing = false;
         } else if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
             versionProbing = true;
             log.info("Received a future (version probing) subscription 
(version: {})."
-                    + " Sending assignment back (with supported version {}).",
+                         + " Sending assignment back (with supported version 
{}).",
                 futureMetadataVersion,
                 minSupportedMetadataVersion);
 
@@ -349,35 +442,76 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
                 minSupportedMetadataVersion,
                 LATEST_SUPPORTED_VERSION);
         }
+        return versionProbing;
+    }
 
-        log.debug("Constructed client metadata {} from the member 
subscriptions.", clientMetadataMap);
-
-        // ---------------- Step Zero ---------------- //
-
-        // parse the topology to determine the repartition source topics,
-        // making sure they are created with the number of partitions as
-        // the maximum of the depending sub-topologies source topics' number 
of partitions
-        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups =
-            taskManager.builder().topicGroups();
-
+    /**
+     * @return a map of repartition topics and their metadata
+     * @throws TaskAssignmentException if there is incomplete source topic 
metadata due to missing source topic(s)
+     */
+    private Map<String, InternalTopicConfig> 
computeRepartitionTopicMetadata(final Map<Integer, 
InternalTopologyBuilder.TopicsInfo> topicGroups,
+                                                                             
final Cluster metadata) throws TaskAssignmentException {
         final Map<String, InternalTopicConfig> repartitionTopicMetadata = new 
HashMap<>();
         for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
topicGroups.values()) {
             for (final String topic : topicsInfo.sourceTopics) {
                 if 
(!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
-                    !metadata.topics().contains(topic)) {
-                    log.error("Missing source topic {} during assignment. 
Returning error {}.",
-                        topic, 
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
-                    return new GroupAssignment(
-                        errorAssignment(clientMetadataMap, topic,
-                            
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
-                    );
+                        !metadata.topics().contains(topic)) {
+                    log.error("Source topic {} is missing/unknown during 
rebalance, please make sure all source topics " +
+                                  "have been pre-created before starting the 
Streams application. Returning error {}",
+                                  topic, 
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
+                    throw new TaskAssignmentException("Missing source topic 
during assignment.");
                 }
             }
             for (final InternalTopicConfig topic : 
topicsInfo.repartitionSourceTopics.values()) {
                 repartitionTopicMetadata.put(topic.name(), topic);
             }
         }
+        return repartitionTopicMetadata;
+    }
 
+    /**
+     * Computes and assembles all repartition topic metadata then creates the 
topics if necessary.
+     *
+     * @return map from repartition topic to its partition info
+     */
+    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final 
Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups,
+                                                                           
final Cluster metadata) {
+        final Map<String, InternalTopicConfig> repartitionTopicMetadata = 
computeRepartitionTopicMetadata(topicGroups, metadata);
+
+        
setRepartitionTopicMetadataNumberOfPartitions(repartitionTopicMetadata, 
topicGroups, metadata);
+
+        // ensure the co-partitioning topics within the group have the same 
number of partitions,
+        // and enforce the number of partitions for those repartition topics 
to be the same if they
+        // are co-partitioned as well.
+        ensureCopartitioning(taskManager.builder().copartitionGroups(), 
repartitionTopicMetadata, metadata);
+
+        // make sure the repartition source topics exist with the right number 
of partitions,
+        // create these topics if necessary
+        prepareTopic(repartitionTopicMetadata);
+
+        // augment the metadata with the newly computed number of partitions 
for all the
+        // repartition source topics
+        final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions 
= new HashMap<>();
+        for (final Map.Entry<String, InternalTopicConfig> entry : 
repartitionTopicMetadata.entrySet()) {
+            final String topic = entry.getKey();
+            final int numPartitions = 
entry.getValue().numberOfPartitions().orElse(-1);
+
+            for (int partition = 0; partition < numPartitions; partition++) {
+                allRepartitionTopicPartitions.put(
+                    new TopicPartition(topic, partition),
+                    new PartitionInfo(topic, partition, null, new Node[0], new 
Node[0])
+                );
+            }
+        }
+        return allRepartitionTopicPartitions;
+    }
+
+    /**
+     * Computes the number of partitions and sets it for each repartition 
topic in repartitionTopicMetadata
+     */
+    private void setRepartitionTopicMetadataNumberOfPartitions(final 
Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                                               final 
Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups,
+                                                               final Cluster 
metadata) {
         boolean numPartitionsNeeded;
         do {
             numPartitionsNeeded = false;
@@ -385,7 +519,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
topicGroups.values()) {
                 for (final String topicName : 
topicsInfo.repartitionSourceTopics.keySet()) {
                     final Optional<Integer> maybeNumPartitions = 
repartitionTopicMetadata.get(topicName)
-                        .numberOfPartitions();
+                                                                     
.numberOfPartitions();
                     Integer numPartitions = null;
 
                     if (!maybeNumPartitions.isPresent()) {
@@ -437,53 +571,25 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
                 }
             }
         } while (numPartitionsNeeded);
+    }
 
-        // ensure the co-partitioning topics within the group have the same 
number of partitions,
-        // and enforce the number of partitions for those repartition topics 
to be the same if they
-        // are co-partitioned as well.
-        ensureCopartitioning(taskManager.builder().copartitionGroups(), 
repartitionTopicMetadata, metadata);
-
-        // make sure the repartition source topics exist with the right number 
of partitions,
-        // create these topics if necessary
-        prepareTopic(repartitionTopicMetadata);
-
-        // augment the metadata with the newly computed number of partitions 
for all the
-        // repartition source topics
-        final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions 
= new HashMap<>();
-        for (final Map.Entry<String, InternalTopicConfig> entry : 
repartitionTopicMetadata.entrySet()) {
-            final String topic = entry.getKey();
-            final int numPartitions = 
entry.getValue().numberOfPartitions().orElse(-1);
-
-            for (int partition = 0; partition < numPartitions; partition++) {
-                allRepartitionTopicPartitions.put(
-                    new TopicPartition(topic, partition),
-                    new PartitionInfo(topic, partition, null, new Node[0], new 
Node[0])
-                );
-            }
-        }
-
-        final Cluster fullMetadata = 
metadata.withPartitions(allRepartitionTopicPartitions);
-
-        log.debug("Created repartition topics {} from the parsed topology.", 
allRepartitionTopicPartitions.values());
-
-        // ---------------- Step One ---------------- //
-
-        // get the tasks as partition groups from the partition grouper
-        final Set<String> allSourceTopics = new HashSet<>();
-        final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
-        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> 
entry : topicGroups.entrySet()) {
-            allSourceTopics.addAll(entry.getValue().sourceTopics);
-            sourceTopicsByGroup.put(entry.getKey(), 
entry.getValue().sourceTopics);
-        }
-
-        final Map<TaskId, Set<TopicPartition>> partitionsForTask =
-            partitionGrouper.partitionGroups(sourceTopicsByGroup, 
fullMetadata);
-
-        final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
-
+    /**
+     * Populates the taskForPartition and tasksForTopicGroup maps, and checks 
that partitions are assigned to exactly
+     * one task.
+     *
+     * @param taskForPartition a map from partition to the corresponding task. 
Populated here.
+     * @param tasksForTopicGroup a map from the topicGroupId to the set of 
corresponding tasks. Populated here.
+     * @param allSourceTopics a set of all source topics in the topology
+     * @param partitionsForTask a map from task to the set of input partitions
+     * @param fullMetadata the cluster metadata
+     */
+    private void populateTasksForMaps(final Map<TopicPartition, TaskId> 
taskForPartition,
+                                      final Map<Integer, Set<TaskId>> 
tasksForTopicGroup,
+                                      final Set<String> allSourceTopics,
+                                      final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
+                                      final Cluster fullMetadata) {
         // check if all partitions are assigned, and there are no duplicates 
of partitions in multiple tasks
         final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
-        final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
         for (final Map.Entry<TaskId, Set<TopicPartition>> entry : 
partitionsForTask.entrySet()) {
             final TaskId id = entry.getKey();
             final Set<TopicPartition> partitions = entry.getValue();
@@ -496,8 +602,17 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             }
             allAssignedPartitions.addAll(partitions);
 
-            tasksByTopicGroup.computeIfAbsent(id.topicGroupId, k -> new 
HashSet<>()).add(id);
+            tasksForTopicGroup.computeIfAbsent(id.topicGroupId, k -> new 
HashSet<>()).add(id);
         }
+
+        checkAllPartitions(allSourceTopics, partitionsForTask, 
allAssignedPartitions, fullMetadata);
+    }
+
+    // Logs a warning if any partitions are not assigned to a task, or a task 
has no assigned partitions
+    private void checkAllPartitions(final Set<String> allSourceTopics,
+                                    final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
+                                    final Set<TopicPartition> 
allAssignedPartitions,
+                                    final Cluster fullMetadata) {
         for (final String topic : allSourceTopics) {
             final List<PartitionInfo> partitionInfoList = 
fullMetadata.partitionsForTopic(topic);
             if (partitionInfoList.isEmpty()) {
@@ -508,17 +623,24 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
                         partitionInfo.partition());
                     if (!allAssignedPartitions.contains(partition)) {
                         log.warn("Partition {} is not assigned to any tasks: 
{}"
-                                + " Possible causes of a partition not getting 
assigned"
-                                + " is that another topic defined in the 
topology has not been"
-                                + " created when starting your streams 
application,"
-                                + " resulting in no tasks created for this 
topology at all.", partition,
+                                     + " Possible causes of a partition not 
getting assigned"
+                                     + " is that another topic defined in the 
topology has not been"
+                                     + " created when starting your streams 
application,"
+                                     + " resulting in no tasks created for 
this topology at all.", partition,
                             partitionsForTask);
                     }
                 }
             }
         }
+    }
 
-        // We only create a standby for tasks that are stateful and have at 
least one changelog
+    /**
+     * Resolve changelog topic metadata and create them if necessary.
+     *
+     * @return set of standby task ids (any task that is stateful and has 
logging enabled)
+     */
+    private Set<TaskId> prepareChangelogTopics(final Map<Integer, 
InternalTopologyBuilder.TopicsInfo> topicGroups,
+                                               final Map<Integer, Set<TaskId>> 
tasksForTopicGroup) {
         final Set<TaskId> standbyTaskIds = new HashSet<>();
 
         // add tasks to state change log topic subscribers
@@ -527,7 +649,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             final int topicGroupId = entry.getKey();
             final InternalTopologyBuilder.TopicsInfo topicsInfo = 
entry.getValue();
 
-            final Set<TaskId> topicGroupTasks = 
tasksByTopicGroup.get(topicGroupId);
+            final Set<TaskId> topicGroupTasks = 
tasksForTopicGroup.get(topicGroupId);
             if (topicGroupTasks == null) {
                 log.debug("No tasks found for topic group {}", topicGroupId);
                 continue;
@@ -551,10 +673,23 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         }
 
         prepareTopic(changelogTopicMetadata);
-
         log.debug("Created state changelog topics {} from the parsed 
topology.", changelogTopicMetadata.values());
+        return standbyTaskIds;
+    }
 
-        // ---------------- Step Two ---------------- //
+    /**
+     * Assigns a set of tasks to each client (Streams instance) using the 
sticky assignor
+     */
+    private void assignTasksToClients(final Set<String> allSourceTopics,
+                                      final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
+                                      final Map<Integer, 
InternalTopologyBuilder.TopicsInfo> topicGroups,
+                                      final Map<UUID, ClientMetadata> 
clientMetadataMap,
+                                      final Cluster fullMetadata) {
+        final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
+        final Map<Integer, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
+        populateTasksForMaps(taskForPartition, tasksForTopicGroup, 
allSourceTopics, partitionsForTask, fullMetadata);
+
+        final Set<TaskId> standbyTaskIds = prepareChangelogTopics(topicGroups, 
tasksForTopicGroup);
 
         final Map<UUID, ClientState> states = new HashMap<>();
         for (final Map.Entry<UUID, ClientMetadata> entry : 
clientMetadataMap.entrySet()) {
@@ -565,7 +700,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             // there are two cases where we need to construct the prevTasks 
from the ownedPartitions:
             // 1) COOPERATIVE clients on version 2.4-2.5 do not encode active 
tasks and rely on ownedPartitions instead
             // 2) future client during version probing, when we can't decode 
the future subscription info's prev tasks
-            if (!state.ownedPartitions().isEmpty() && (uuid == futureId || 
state.prevActiveTasks().isEmpty())) {
+            if (!state.ownedPartitions().isEmpty() && (uuid == FUTURE_ID || 
state.prevActiveTasks().isEmpty())) {
                 final Set<TaskId> previousActiveTasks = new HashSet<>();
                 for (final Map.Entry<TopicPartition, String> partitionEntry : 
state.ownedPartitions().entrySet()) {
                     final TopicPartition tp = partitionEntry.getKey();
@@ -589,64 +724,49 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         taskAssignor.assign(numStandbyReplicas);
 
         log.info("Assigned tasks to clients as {}{}.", Utils.NL, 
states.entrySet().stream()
-            .map(Map.Entry::toString).collect(Collectors.joining(Utils.NL)));
-
-        // ---------------- Step Three ---------------- //
-
-        // construct the global partition assignment per host map
-        final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new 
HashMap<>();
-        final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new 
HashMap<>();
-        if (minReceivedMetadataVersion >= 2) {
-            for (final Map.Entry<UUID, ClientMetadata> entry : 
clientMetadataMap.entrySet()) {
-                final HostInfo hostInfo = entry.getValue().hostInfo;
+                                                                     
.map(Map.Entry::toString).collect(Collectors.joining(Utils.NL)));
+    }
 
-                // if application server is configured, also include host 
state map
-                if (hostInfo != null) {
-                    final Set<TopicPartition> topicPartitions = new 
HashSet<>();
-                    final Set<TopicPartition> standbyPartitions = new 
HashSet<>();
-                    final ClientState state = entry.getValue().state;
+    /**
+     * Populates the global partitionsByHost and standbyPartitionsByHost maps 
that are sent to each member
+     *
+     * @param partitionsByHost a map from host to the set of partitions hosted 
there. Populated here.
+     * @param standbyPartitionsByHost a map from host to the set of standby 
partitions hosted there. Populated here.
+     * @param partitionsForTask a map from task to its set of assigned 
partitions
+     * @param clientMetadataMap a map from client to its metadata and state
+     */
+    private void populatePartitionsByHostMaps(final Map<HostInfo, 
Set<TopicPartition>> partitionsByHost,
+                                              final Map<HostInfo, 
Set<TopicPartition>> standbyPartitionsByHost,
+                                              final Map<TaskId, 
Set<TopicPartition>> partitionsForTask,
+                                              final Map<UUID, ClientMetadata> 
clientMetadataMap) {
+        for (final Map.Entry<UUID, ClientMetadata> entry : 
clientMetadataMap.entrySet()) {
+            final HostInfo hostInfo = entry.getValue().hostInfo;
 
-                    for (final TaskId id : state.activeTasks()) {
-                        topicPartitions.addAll(partitionsForTask.get(id));
-                    }
+            // if application server is configured, also include host state map
+            if (hostInfo != null) {
+                final Set<TopicPartition> topicPartitions = new HashSet<>();
+                final Set<TopicPartition> standbyPartitions = new HashSet<>();
+                final ClientState state = entry.getValue().state;
 
-                    for (final TaskId id : state.standbyTasks()) {
-                        standbyPartitions.addAll(partitionsForTask.get(id));
-                    }
+                for (final TaskId id : state.activeTasks()) {
+                    topicPartitions.addAll(partitionsForTask.get(id));
+                }
 
-                    partitionsByHost.put(hostInfo, topicPartitions);
-                    standbyPartitionsByHost.put(hostInfo, standbyPartitions);
+                for (final TaskId id : state.standbyTasks()) {
+                    standbyPartitions.addAll(partitionsForTask.get(id));
                 }
-            }
-        }
-        streamsMetadataState.onChange(partitionsByHost, 
standbyPartitionsByHost, fullMetadata);
 
-        final Map<String, Assignment> assignment;
-        if (versionProbing) {
-            assignment = versionProbingAssignment(
-                clientMetadataMap,
-                partitionsForTask,
-                partitionsByHost,
-                standbyPartitionsByHost,
-                allOwnedPartitions,
-                minReceivedMetadataVersion,
-                minSupportedMetadataVersion
-            );
-        } else {
-            assignment = computeNewAssignment(
-                clientMetadataMap,
-                partitionsForTask,
-                partitionsByHost,
-                standbyPartitionsByHost,
-                allOwnedPartitions,
-                minReceivedMetadataVersion,
-                minSupportedMetadataVersion
-            );
+                partitionsByHost.put(hostInfo, topicPartitions);
+                standbyPartitionsByHost.put(hostInfo, standbyPartitions);
+            }
         }
-
-        return new GroupAssignment(assignment);
     }
 
+    /**
+     * Computes the assignment of tasks to threads within each client and 
assembles the final assignment to send out.
+     *
+     * @return the final assignment for each StreamThread consumer
+     */
     private Map<String, Assignment> computeNewAssignment(final Map<UUID, 
ClientMetadata> clientsMetadata,
                                                          final Map<TaskId, 
Set<TopicPartition>> partitionsForTask,
                                                          final Map<HostInfo, 
Set<TopicPartition>> partitionsByHostState,
@@ -695,6 +815,13 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         return assignment;
     }
 
+    /**
+     * Computes the assignment of tasks to threads within each client and 
assembles the final assignment to send out,
+     * in the special case of version probing where some members are on 
different versions and have sent different
+     * subscriptions.
+     *
+     * @return the final assignment for each StreamThread consumer
+     */
     private Map<String, Assignment> versionProbingAssignment(final Map<UUID, 
ClientMetadata> clientsMetadata,
                                                              final Map<TaskId, 
Set<TopicPartition>> partitionsForTask,
                                                              final 
Map<HostInfo, Set<TopicPartition>> partitionsByHost,
@@ -732,6 +859,9 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         return assignment;
     }
 
+    /**
+     * Adds the encoded assignment for each StreamThread consumer in the 
client to the overall assignment map
+     */
     private void addClientAssignments(final Map<String, Assignment> assignment,
                                       final ClientMetadata clientMetadata,
                                       final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
@@ -747,17 +877,19 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         for (final String consumer : clientMetadata.consumers) {
             final List<TaskId> activeTasksForConsumer = 
activeTaskAssignments.get(consumer);
 
-            // These will be filled in by 
buildAssignedActiveTaskAndPartitionsList below
+            // These will be filled in by populateActiveTaskAndPartitionsLists 
below
             final List<TopicPartition> activePartitionsList = new 
ArrayList<>();
             final List<TaskId> assignedActiveList = new ArrayList<>();
 
-            buildAssignedActiveTaskAndPartitionsList(consumer,
-                                                     clientMetadata.state,
-                                                     activeTasksForConsumer,
-                                                     partitionsForTask,
-                                                     allOwnedPartitions,
-                                                     activePartitionsList,
-                                                     assignedActiveList);
+            populateActiveTaskAndPartitionsLists(
+                activePartitionsList,
+                assignedActiveList,
+                consumer,
+                clientMetadata.state,
+                activeTasksForConsumer,
+                partitionsForTask,
+                allOwnedPartitions
+            );
 
             final Map<TaskId, Set<TopicPartition>> standbyTaskMap =
                 buildStandbyTaskMap(standbyTaskAssignments.get(consumer), 
partitionsForTask);
@@ -781,13 +913,18 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         }
     }
 
-    private void buildAssignedActiveTaskAndPartitionsList(final String 
consumer,
-                                                          final ClientState 
clientState,
-                                                          final List<TaskId> 
activeTasksForConsumer,
-                                                          final Map<TaskId, 
Set<TopicPartition>> partitionsForTask,
-                                                          final 
Set<TopicPartition> allOwnedPartitions,
-                                                          final 
List<TopicPartition> activePartitionsList,
-                                                          final List<TaskId> 
assignedActiveList) {
+    /**
+     * Populates the lists of active tasks and active task partitions for the 
consumer with a 1:1 mapping between them
+     * such that the nth task corresponds to the nth partition in the list. 
This means tasks with multiple partitions
+     * will be repeated in the list.
+     */
+    private void populateActiveTaskAndPartitionsLists(final 
List<TopicPartition> activePartitionsList,
+                                                      final List<TaskId> 
assignedActiveList,
+                                                      final String consumer,
+                                                      final ClientState 
clientState,
+                                                      final List<TaskId> 
activeTasksForConsumer,
+                                                      final Map<TaskId, 
Set<TopicPartition>> partitionsForTask,
+                                                      final 
Set<TopicPartition> allOwnedPartitions) {
         final List<AssignedPartition> assignedPartitions = new ArrayList<>();
 
         // Build up list of all assigned partition-task pairs
@@ -822,6 +959,9 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         }
     }
 
+    /**
+     * @return map from task id to its assigned partitions for all standby 
tasks
+     */
     private static Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final 
Collection<TaskId> standbys,
                                                                         final 
Map<TaskId, Set<TopicPartition>> partitionsForTask) {
         final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new 
HashMap<>();

Reply via email to