This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new e9f41ae MINOR: StreamsPartitionAssignor should log the individual
members of each client (#10996)
e9f41ae is described below
commit e9f41ae5905d11e9820d7146423e04c9e639a280
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Jul 8 11:11:39 2021 -0700
MINOR: StreamsPartitionAssignor should log the individual members of each
client (#10996)
Log the specific StreamThreads participating in the rebalance for each
client in the Streams application
Reviewers: Walker Carlson <[email protected]>, John Roesler
<[email protected]>
---
.../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +-
.../src/main/java/org/apache/kafka/streams/ThreadMetadata.java | 4 +++-
.../streams/processor/internals/StreamsPartitionAssignor.java | 10 ++++++++--
.../streams/processor/internals/assignment/ClientState.java | 4 ++++
4 files changed, 16 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 451f036..557a2b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -105,7 +105,7 @@ import static
org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndO
* sends output to zero, one, or more output topics.
* <p>
* The computational logic can be specified either by using the {@link
Topology} to define a DAG topology of
- * {@link Processor}s or by using the {@link StreamsBuilder} which provides
the high-level DSL to define
+ * {@link org.apache.kafka.streams.processor.api.Processor}s or by using the
{@link StreamsBuilder} which provides the high-level DSL to define
* transformations.
* <p>
* One {@code KafkaStreams} instance can contain one or more threads specified
in the configs for the processing work.
diff --git a/streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
b/streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
index 4b84070..f611fe7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
@@ -103,7 +103,9 @@ public interface ThreadMetadata {
* mainConsumerClientId,
* restoreConsumerClientId,
* producerClientIds,
- * adminClientId);
+ * adminClientId
+ * );
+ * }
* </pre>
*
* @return a hash code value for this object.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 8345142..2767429 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -578,11 +578,16 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
final boolean lagComputationSuccessful =
populateClientStatesMap(clientStates, clientMetadataMap,
taskForPartition, changelogTopics);
+ log.info("All members participating in this rebalance: \n{}.",
+ clientStates.entrySet().stream()
+ .map(entry -> entry.getKey() + ": " +
entry.getValue().consumers())
+ .collect(Collectors.joining(Utils.NL)));
+
final Set<TaskId> allTasks = partitionsForTask.keySet();
statefulTasks.addAll(changelogTopics.statefulTaskIds());
- log.debug("Assigning tasks {} to clients {} with number of replicas
{}",
- allTasks, clientStates, numStandbyReplicas());
+ log.debug("Assigning tasks {} including stateful {} to clients {} with
number of replicas {}",
+ allTasks, statefulTasks, clientStates, numStandbyReplicas());
final TaskAssignor taskAssignor =
createTaskAssignor(lagComputationSuccessful);
@@ -661,6 +666,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
state.computeTaskLags(uuid, allTaskEndOffsetSums);
clientStates.put(uuid, state);
}
+
return fetchEndOffsetsSuccessful;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 0029e73..c28d779 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -360,6 +360,10 @@ public class ClientState {
}
}
+ public String consumers() {
+ return consumerToPreviousStatefulTaskIds.keySet().toString();
+ }
+
public String currentAssignment() {
return "[activeTasks: (" + assignedActiveTasks.taskIds() +
") standbyTasks: (" + assignedStandbyTasks.taskIds() + ")]";