This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 0eb621959e7c4826c74d96d0604fe7c8f4d52b32
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jan 19 15:25:57 2026 +0100

    MINOR: Improve streams groups log messages (#21311)
    
    Minor logging improvements for streams groups.
    
    - In InternalTopicManager, we'd not log the group ID or member ID,
    making the logs difficult discover.
    - Some logs could turn out very verbose
      - We'd log the whole topology. This is difficult to parse from a log
    message, and can be determined by using `StreamsGroupDescribe` instead.
      - We'd log all missing internal topics. Instead, restrict it to the
    first 5.
    - The statusDetail returned by `StreamsGroupHeartbeat` (and logged on
    the client) would also become fairly large, logging all topic creation
    errors for all internal topics that we failed to create. Restrict it to
    the first 3.
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  8 ++++-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  4 +--
 .../coordinator/group/GroupMetadataManager.java    |  7 ++--
 .../group/streams/topics/InternalTopicManager.java | 39 +++++++++++++++++-----
 .../group/GroupMetadataManagerTest.java            |  2 +-
 .../group/GroupMetadataManagerTestContext.java     |  3 +-
 .../streams/topics/InternalTopicManagerTest.java   |  6 ++--
 7 files changed, 51 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c3667d00154..2a0604dc301 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2836,8 +2836,14 @@ class KafkaApis(val requestChannel: RequestChannel,
                 if (cachedErrors.nonEmpty) {
                   val missingInternalTopicStatus =
                     responseData.status().stream().filter(x => x.statusCode() 
== 
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst()
-                  val creationErrorDetails = cachedErrors.map { case (topic, 
error) => s"$topic ($error)" }.mkString(", ")
                   if (missingInternalTopicStatus.isPresent) {
+                    val maxErrorsToInclude = 3
+                    val errorList = cachedErrors.take(maxErrorsToInclude).map 
{ case (topic, error) => s"$topic ($error)" }.mkString(", ")
+                    val creationErrorDetails = if (cachedErrors.size > 
maxErrorsToInclude) {
+                      s"$errorList and ${cachedErrors.size - 
maxErrorsToInclude} more"
+                    } else {
+                      errorList
+                    }
                     val existingDetail = 
Option(missingInternalTopicStatus.get().statusDetail()).getOrElse("")
                     missingInternalTopicStatus.get().setStatusDetail(
                       existingDetail + s"; Creation failed: 
$creationErrorDetails."
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 0c6310b5e1d..d7cbceaf48d 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -3849,7 +3849,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
         .setStatusDetail("Assignment delayed due to the configured initial 
rebalance delay."),
         new StreamsGroupHeartbeatResponseData.Status()
         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
-        .setStatusDetail("Internal topics are missing: [topic]; Unauthorized 
to CREATE on topics topic.")),
+        .setStatusDetail("Internal topics are missing: topic; Unauthorized to 
CREATE on topics topic.")),
     response.data().status())
   }
 
@@ -3883,7 +3883,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
         .setStatusDetail("Assignment delayed due to the configured initial 
rebalance delay."),
         new StreamsGroupHeartbeatResponseData.Status()
         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
-        .setStatusDetail("Internal topics are missing: [topic]")),
+        .setStatusDetail("Internal topics are missing: topic")),
       response.data().status())
   }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 022c1991360..2e0651c199f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2006,8 +2006,9 @@ public class GroupMetadataManager {
             }
 
             if (reconfigureTopology || group.configuredTopology().isEmpty()) {
-                log.info("[GroupId {}][MemberId {}] Configuring the topology 
{}", groupId, memberId, updatedTopology);
-                updatedConfiguredTopology = 
InternalTopicManager.configureTopics(logContext, metadataHash, updatedTopology, 
metadataImage);
+                log.info("[GroupId {}][MemberId {}] Configuring the topology 
{}", groupId, memberId, updatedTopology.topologyEpoch());
+                LogContext topicManagerLogContext = new 
LogContext(String.format("%s[GroupId %s][MemberId %s] ", 
logContext.logPrefix(), groupId, memberId));
+                updatedConfiguredTopology = 
InternalTopicManager.configureTopics(topicManagerLogContext, metadataHash, 
updatedTopology, metadataImage, time);
                 group.setConfiguredTopology(updatedConfiguredTopology);
             } else {
                 updatedConfiguredTopology = group.configuredTopology().get();
@@ -3649,7 +3650,7 @@ public class GroupMetadataManager {
         String memberId = updatedMember.memberId();
         if (!updatedMember.equals(member)) {
             records.add(newStreamsGroupMemberRecord(groupId, updatedMember));
-            log.info("[GroupId {}] Member {} updated its member metdata to 
{}.",
+            log.info("[GroupId {}][MemberId {}] Member updated its member 
metdata to {}.",
                 groupId, memberId, updatedMember);
 
             return true;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
index 5c89c622a24..cb92c14d720 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
@@ -20,6 +20,7 @@ import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.streams.StreamsTopology;
@@ -45,6 +46,7 @@ import java.util.stream.Stream;
  */
 public class InternalTopicManager {
 
+
     /**
      * Configures the internal topics for the given topology. Given a topology 
and the metadata image, this method determines the number of
      * partitions for all internal topics and returns a {@link 
ConfiguredTopology} object.
@@ -58,8 +60,10 @@ public class InternalTopicManager {
     public static ConfiguredTopology configureTopics(LogContext logContext,
                                                      long metadataHash,
                                                      StreamsTopology topology,
-                                                     CoordinatorMetadataImage 
metadataImage) {
+                                                     CoordinatorMetadataImage 
metadataImage,
+                                                     Time time) {
         final Logger log = logContext.logger(InternalTopicManager.class);
+        final long startTimeMs = time.milliseconds();
         final Collection<StreamsGroupTopologyValue.Subtopology> subtopologies 
= topology.subtopologies().values();
 
         final Map<String, Collection<Set<String>>> 
copartitionGroupsBySubtopology =
@@ -89,14 +93,16 @@ public class InternalTopicManager {
                     ));
 
             Map<String, CreatableTopic> internalTopicsToCreate = 
missingInternalTopics(configuredSubtopologies, topology, metadataImage);
+            long elapsedMs = time.milliseconds() - startTimeMs;
             if (!internalTopicsToCreate.isEmpty()) {
                 topicConfigurationException = 
Optional.of(TopicConfigurationException.missingInternalTopics(
-                    "Internal topics are missing: " + 
internalTopicsToCreate.keySet()
+                    "Internal topics are missing: " + 
summarizeTopics(internalTopicsToCreate.keySet())
                 ));
-                log.info("Valid topic configuration found, but internal topics 
are missing for topology epoch {}: {}",
-                    topology.topologyEpoch(), 
topicConfigurationException.get().toString());
+                log.info("Valid topic configuration found in {}ms, but 
internal topics are missing for topology epoch {}: {}",
+                    elapsedMs, topology.topologyEpoch(), 
summarizeTopics(internalTopicsToCreate.keySet()));
             } else {
-                log.info("Valid topic configuration found, topology epoch {} 
is now initialized.", topology.topologyEpoch());
+                log.info("Valid topic configuration found in {}ms, topology 
epoch {} is now initialized.",
+                    elapsedMs, topology.topologyEpoch());
             }
 
             return new ConfiguredTopology(
@@ -108,8 +114,9 @@ public class InternalTopicManager {
             );
 
         } catch (TopicConfigurationException e) {
-            log.warn("Topic configuration failed for topology epoch {}: {} ",
-                topology.topologyEpoch(), e.toString());
+            long elapsedMs = time.milliseconds() - startTimeMs;
+            log.warn("Topic configuration failed for topology epoch {} in 
{}ms: {}",
+                topology.topologyEpoch(), elapsedMs, e.getMessage());
             return new ConfiguredTopology(
                 topology.topologyEpoch(),
                 metadataHash,
@@ -132,7 +139,7 @@ public class InternalTopicManager {
         }
         if (!sortedMissingTopics.isEmpty()) {
             throw TopicConfigurationException.missingSourceTopics(
-                "Source topics " + String.join(", ", sortedMissingTopics) + " 
are missing.");
+                "Source topics " + summarizeTopics(sortedMissingTopics) + " 
are missing.");
         }
     }
 
@@ -322,4 +329,20 @@ public class InternalTopicManager {
             ).collect(Collectors.toSet())
         ).toList();
     }
+
+    /**
+     * Formats a collection of topic names for log and exception messages.
+     * Includes up to 3 topic names, and if more are present, appends a 
summary.
+     */
+    private static String summarizeTopics(Collection<String> topics) {
+        if (topics == null || topics.isEmpty()) {
+            return "<none>";
+        }
+        int maxToShow = 3;
+        int size = topics.size();
+        return topics.stream()
+            .limit(maxToShow)
+            .collect(Collectors.joining(", ")) +
+            (size > maxToShow ? " and " + (size - maxToShow) + " additional 
topics" : "");
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index da84f0b397d..03a109817ec 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -17315,7 +17315,7 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     .setStatusCode(Status.MISSING_INTERNAL_TOPICS.code())
-                    .setStatusDetail("Internal topics are missing: [bar]"))),
+                    .setStatusDetail("Internal topics are missing: bar"))),
             result.response().data()
         );
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index f1379e2404a..b52d3edb3fe 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -563,7 +563,8 @@ public class GroupMetadataManagerTestContext {
                         new LogContext(),
                         0,
                         group.topology().get(),
-                        metadataImage)
+                        metadataImage,
+                        time)
                     );
                 }
             });
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
index 8c7c389e562..ed13f0ef94a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -22,6 +22,7 @@ import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCon
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
 import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
@@ -43,6 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class InternalTopicManagerTest {
 
+    public static final MockTime TIME = new MockTime();
     public static final String SOURCE_TOPIC_1 = "source_topic1";
     public static final String SOURCE_TOPIC_2 = "source_topic2";
     public static final String REPARTITION_TOPIC = "repartition_topic";
@@ -61,7 +63,7 @@ class InternalTopicManagerTest {
         // SOURCE_TOPIC_2 is missing from topicMetadata
         StreamsTopology topology = makeTestTopology();
 
-        final ConfiguredTopology configuredTopology = 
InternalTopicManager.configureTopics(new LogContext(), 0, topology, new 
KRaftCoordinatorMetadataImage(metadataImage));
+        final ConfiguredTopology configuredTopology = 
InternalTopicManager.configureTopics(new LogContext(), 0, topology, new 
KRaftCoordinatorMetadataImage(metadataImage), TIME);
 
         assertEquals(Optional.empty(), configuredTopology.subtopologies());
         
assertTrue(configuredTopology.topicConfigurationException().isPresent());
@@ -78,7 +80,7 @@ class InternalTopicManagerTest {
             .build();
         StreamsTopology topology = makeTestTopology();
 
-        ConfiguredTopology configuredTopology = 
InternalTopicManager.configureTopics(new LogContext(), 0, topology, new 
KRaftCoordinatorMetadataImage(metadataImage));
+        ConfiguredTopology configuredTopology = 
InternalTopicManager.configureTopics(new LogContext(), 0, topology, new 
KRaftCoordinatorMetadataImage(metadataImage), TIME);
         final Map<String, CreatableTopic> internalTopicsToBeCreated = 
configuredTopology.internalTopicsToBeCreated();
 
         assertEquals(2, internalTopicsToBeCreated.size());

Reply via email to