This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new e9f41ae  MINOR: StreamsPartitionAssignor should log the individual 
members of each client (#10996)
e9f41ae is described below

commit e9f41ae5905d11e9820d7146423e04c9e639a280
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Jul 8 11:11:39 2021 -0700

    MINOR: StreamsPartitionAssignor should log the individual members of each 
client (#10996)
    
    Log the specific StreamThreads participating in the rebalance for each 
client in the Streams application
    
    Reviewers: Walker Carlson <[email protected]>, John Roesler 
<[email protected]>
---
 .../src/main/java/org/apache/kafka/streams/KafkaStreams.java   |  2 +-
 .../src/main/java/org/apache/kafka/streams/ThreadMetadata.java |  4 +++-
 .../streams/processor/internals/StreamsPartitionAssignor.java  | 10 ++++++++--
 .../streams/processor/internals/assignment/ClientState.java    |  4 ++++
 4 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 451f036..557a2b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -105,7 +105,7 @@ import static 
org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndO
  * sends output to zero, one, or more output topics.
  * <p>
  * The computational logic can be specified either by using the {@link 
Topology} to define a DAG topology of
- * {@link Processor}s or by using the {@link StreamsBuilder} which provides 
the high-level DSL to define
+ * {@link org.apache.kafka.streams.processor.api.Processor}s or by using the 
{@link StreamsBuilder} which provides the high-level DSL to define
  * transformations.
  * <p>
  * One {@code KafkaStreams} instance can contain one or more threads specified 
in the configs for the processing work.
diff --git a/streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java 
b/streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
index 4b84070..f611fe7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
@@ -103,7 +103,9 @@ public interface ThreadMetadata {
      *             mainConsumerClientId,
      *             restoreConsumerClientId,
      *             producerClientIds,
-     *             adminClientId);
+     *             adminClientId
+     *             );
+     * }
      * </pre>
      *
      * @return a hash code value for this object.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 8345142..2767429 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -578,11 +578,16 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         final boolean lagComputationSuccessful =
             populateClientStatesMap(clientStates, clientMetadataMap, 
taskForPartition, changelogTopics);
 
+        log.info("All members participating in this rebalance: \n{}.",
+                 clientStates.entrySet().stream()
+                     .map(entry -> entry.getKey() + ": " + 
entry.getValue().consumers())
+                     .collect(Collectors.joining(Utils.NL)));
+
         final Set<TaskId> allTasks = partitionsForTask.keySet();
         statefulTasks.addAll(changelogTopics.statefulTaskIds());
 
-        log.debug("Assigning tasks {} to clients {} with number of replicas 
{}",
-            allTasks, clientStates, numStandbyReplicas());
+        log.debug("Assigning tasks {} including stateful {} to clients {} with 
number of replicas {}",
+            allTasks, statefulTasks, clientStates, numStandbyReplicas());
 
         final TaskAssignor taskAssignor = 
createTaskAssignor(lagComputationSuccessful);
 
@@ -661,6 +666,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             state.computeTaskLags(uuid, allTaskEndOffsetSums);
             clientStates.put(uuid, state);
         }
+
         return fetchEndOffsetsSuccessful;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 0029e73..c28d779 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -360,6 +360,10 @@ public class ClientState {
         }
     }
 
+    public String consumers() {
+        return consumerToPreviousStatefulTaskIds.keySet().toString();
+    }
+
     public String currentAssignment() {
         return "[activeTasks: (" + assignedActiveTasks.taskIds() +
                ") standbyTasks: (" + assignedStandbyTasks.taskIds() + ")]";

Reply via email to