This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95e1cdc4efb HOTFIX: avoid placement of unnecessary transient standby 
tasks & improve assignor logging (#14149)
95e1cdc4efb is described below

commit 95e1cdc4efbc720687cefad5bacd053565d03614
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Wed Aug 30 13:29:38 2023 -0700

    HOTFIX: avoid placement of unnecessary transient standby tasks & improve 
assignor logging (#14149)
    
    Minor fix to avoid creating unnecessary standby tasks, especially when 
these may be surprising or unexpected as in the case of an application with 
num.standby.replicas = 0 and warmup replicas disabled.
    
    The "bug" here was introduced during the fix for an issue with cooperative 
rebalancing and in-memory stores. The fundamental problem is that in-memory 
stores cannot be unassigned from a consumer for any period, however temporary, 
without being closed and losing all the accumulated state. This caused some 
grief when the new HA task assignor would assign an active task to a node based 
on the readiness of the standby version of that task, but would have to remove 
the active task from the [...]
    
    To fix this, we simply began to place standby tasks on the intended 
recipient of an active task awaiting revocation by another consumer. However, 
the fix was a bit of an overreach, as we assigned these temporary standby tasks 
in all cases, regardless of whether there had previously been a standby version 
of that task. We can narrow this down without sacrificing any of the intended 
functionality by only assigning this kind of standby task where the consumer 
had previously owned some ve [...]
    
    Also breaks up some of the long log lines in the StreamsPartitionAssignor 
and expands the summary info while moving it all to the front of the line 
(following reports of missing info due to truncation of long log lines in 
larger applications)
---
 .../internals/StreamsPartitionAssignor.java        | 52 +++++++++++--------
 .../internals/assignment/ClientState.java          |  6 ++-
 .../internals/StreamsPartitionAssignorTest.java    | 60 ++++++++++++++++++++++
 3 files changed, 97 insertions(+), 21 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 330d46a1780..e232ef15df0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -625,18 +625,24 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         final boolean lagComputationSuccessful =
             populateClientStatesMap(clientStates, clientMetadataMap, 
taskForPartition, changelogTopics);
 
-        log.info("{} members participating in this rebalance: \n{}.",
-                clientStates.size(),
-                clientStates.entrySet().stream()
-                        .sorted(comparingByKey())
-                        .map(entry -> entry.getKey() + ": " + 
entry.getValue().consumers())
-                        .collect(Collectors.joining(Utils.NL)));
+
+        log.info("{} client nodes and {} consumers participating in this 
rebalance: \n{}.",
+                 clientStates.size(),
+                 
clientStates.values().stream().map(ClientState::capacity).reduce(Integer::sum),
+                 clientStates.entrySet().stream()
+                     .sorted(comparingByKey())
+                     .map(entry -> entry.getKey() + ": " + 
entry.getValue().consumers())
+                     .collect(Collectors.joining(Utils.NL)));
 
         final Set<TaskId> allTasks = partitionsForTask.keySet();
         statefulTasks.addAll(changelogTopics.statefulTaskIds());
 
-        log.debug("Assigning tasks {} including stateful {} to clients {} with 
number of replicas {}",
-            allTasks, statefulTasks, clientStates, numStandbyReplicas());
+        log.info("Assigning stateful tasks: {}\n"
+                     + "and stateless tasks: {}",
+                 statefulTasks,
+                 allTasks.stream().filter(t -> !statefulTasks.contains(t)));
+        log.debug("Assigning tasks and {} standby replicas to client nodes {}",
+                  numStandbyReplicas(), clientStates);
 
         final TaskAssignor taskAssignor = 
createTaskAssignor(lagComputationSuccessful);
 
@@ -656,15 +662,17 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
                                                                    
rackAwareTaskAssignor,
                                                                    
assignmentConfigs);
 
-        log.info("{} assigned tasks {} including stateful {} to {} clients as: 
\n{}.",
-                allTasks.size(),
-                allTasks,
-                statefulTasks,
-                clientStates.size(),
-                clientStates.entrySet().stream()
-                        .sorted(comparingByKey())
-                        .map(entry -> entry.getKey() + "=" + 
entry.getValue().currentAssignment())
-                        .collect(Collectors.joining(Utils.NL)));
+        // Break this up into multiple logs to make sure the summary info gets 
through, which helps avoid
+        // info loss for example due to long line truncation with large apps
+        log.info("Assigned {} total tasks including {} stateful tasks to {} 
client nodes.",
+                 allTasks.size(),
+                 statefulTasks.size(),
+                 clientStates.size());
+        log.info("Assignment of tasks to nodes: {}",
+                 clientStates.entrySet().stream()
+                     .sorted(comparingByKey())
+                     .map(entry -> entry.getKey() + "=" + 
entry.getValue().currentAssignment())
+                     .collect(Collectors.joining(Utils.NL)));
 
         return probingRebalanceNeeded;
     }
@@ -1081,9 +1089,13 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         }
 
         for (final TaskId task : revokedTasks) {
-            if (allStatefulTasks.contains(task)) {
-                log.info("Adding removed stateful active task {} as a standby 
for {} before it is revoked in followup rebalance",
-                        task, consumer);
+            // If this task is stateful and already owned by the consumer, but 
can't (yet) be assigned as an active
+            // task during this rebalance as it must be revoked from another 
consumer first, place a temporary
+            // standby task here until it can receive the active task to avoid 
closing the state store (and losing
+            // all of the accumulated state in the case of in-memory stores)
+            if (clientState.previouslyOwnedStandby(task) && 
allStatefulTasks.contains(task)) {
+                log.info("Adding removed stateful active task {} as a standby 
for {} until it is revoked and can "
+                             + "be transitioned to active in a followup 
rebalance", task, consumer);
 
                 // This has no effect on the assignment, as we'll never 
consult the ClientState again, but
                 // it does perform a useful assertion that the it's legal to 
assign this task as a standby to this instance
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 71fc1c3ef00..a7fcd514cfc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -124,7 +124,7 @@ public class ClientState {
         );
     }
 
-    int capacity() {
+    public int capacity() {
         return capacity;
     }
 
@@ -269,6 +269,10 @@ public class ClientState {
         );
     }
 
+    public boolean previouslyOwnedStandby(final TaskId task) {
+        return previousStandbyTasks.taskIds().contains(task);
+    }
+
     public int assignedTaskCount() {
         return activeTaskCount() + standbyTaskCount();
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 36ec4bb3fe2..d4742cc6644 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -748,6 +748,66 @@ public class StreamsPartitionAssignorTest {
         assertEquals(expectedInfo11TaskIds, info11.activeTasks());
     }
 
+    @Test
+    public void shouldNotAssignTemporaryStandbyTask() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+
+        final List<PartitionInfo> localInfos = asList(
+            new PartitionInfo("topic1", 0, NODE_0, REPLICA_0, REPLICA_0),
+            new PartitionInfo("topic1", 1, NODE_1, REPLICA_1, REPLICA_1),
+            new PartitionInfo("topic1", 2, NODE_2, REPLICA_2, REPLICA_2),
+            new PartitionInfo("topic1", 3, NODE_0, REPLICA_1, REPLICA_2)
+        );
+
+        final Cluster localMetadata = new Cluster(
+            "cluster",
+            asList(NODE_0, NODE_1, NODE_2),
+            localInfos,
+            emptySet(),
+            emptySet()
+        );
+
+        final List<String> topics = singletonList("topic1");
+
+        createMockTaskManager(mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3), 
emptySet());
+        configureDefaultPartitionAssignor();
+
+        subscriptions.put("consumer10",
+                          new Subscription(
+                              topics,
+                              getInfo(UUID_1, mkSet(TASK_0_0, TASK_0_2), 
emptySet()).encode(),
+                              asList(t1p0, t1p2),
+                              DEFAULT_GENERATION,
+                              Optional.of(RACK_2)
+                          ));
+        subscriptions.put("consumer11",
+                          new Subscription(
+                              topics,
+                              getInfo(UUID_1, mkSet(TASK_0_1, TASK_0_3), 
emptySet()).encode(),
+                              asList(t1p1, t1p3),
+                              DEFAULT_GENERATION,
+                              Optional.of(RACK_2)
+                          ));
+        subscriptions.put("consumer20",
+                          new Subscription(
+                              topics,
+                              getInfo(UUID_2, emptySet(), emptySet()).encode(),
+                              emptyList(),
+                              DEFAULT_GENERATION,
+                              Optional.of(RACK_2)
+                          ));
+
+        final Map<String, Assignment> assignments = 
partitionAssignor.assign(localMetadata, new 
GroupSubscription(subscriptions)).groupAssignment();
+
+        // neither active nor standby tasks should be assigned to consumer 3, 
which will have to wait until
+        // the followup cooperative rebalance to get the active task(s) it was 
assigned (and does not need
+        // a standby copy before that since it previously had no tasks at all)
+        final AssignmentInfo info20 = 
AssignmentInfo.decode(assignments.get("consumer20").userData());
+        assertTrue(info20.activeTasks().isEmpty());
+        assertTrue(info20.standbyTasks().isEmpty());
+
+    }
+
     @Test
     public void testAssignEmptyMetadata() {
         builder.addSource(null, "source1", null, null, null, "topic1");

Reply via email to