This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch 4.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 0eb621959e7c4826c74d96d0604fe7c8f4d52b32 Author: Lucas Brutschy <[email protected]> AuthorDate: Mon Jan 19 15:25:57 2026 +0100 MINOR: Improve streams groups log messages (#21311) Minor logging improvements for streams groups. - In InternalTopicManager, we'd not log the group ID or member ID, making the logs difficult discover. - Some logs could turn out very verbose - We'd log the whole topology. This is difficult to parse from a log message, and can be determined by using `StreamsGroupDescribe` instead. - We'd log all missing internal topics. Instead, restrict it to the first 5. - The statusDetail returned by `StreamsGroupHeartbeat` (and logged on the client) would also become fairly large, logging all topic creation errors for all internal topics that we failed to create. Restrict it to the first 3. --- core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++++- .../kafka/api/AuthorizerIntegrationTest.scala | 4 +-- .../coordinator/group/GroupMetadataManager.java | 7 ++-- .../group/streams/topics/InternalTopicManager.java | 39 +++++++++++++++++----- .../group/GroupMetadataManagerTest.java | 2 +- .../group/GroupMetadataManagerTestContext.java | 3 +- .../streams/topics/InternalTopicManagerTest.java | 6 ++-- 7 files changed, 51 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c3667d00154..2a0604dc301 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2836,8 +2836,14 @@ class KafkaApis(val requestChannel: RequestChannel, if (cachedErrors.nonEmpty) { val missingInternalTopicStatus = responseData.status().stream().filter(x => x.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst() - val creationErrorDetails = cachedErrors.map { case (topic, error) => s"$topic ($error)" }.mkString(", ") if (missingInternalTopicStatus.isPresent) { + val maxErrorsToInclude = 3 + val errorList = cachedErrors.take(maxErrorsToInclude).map { case (topic, error) => s"$topic ($error)" }.mkString(", ") + val creationErrorDetails = if (cachedErrors.size > maxErrorsToInclude) { + s"$errorList and ${cachedErrors.size - maxErrorsToInclude} more" + } else { + errorList + } val existingDetail = Option(missingInternalTopicStatus.get().statusDetail()).getOrElse("") missingInternalTopicStatus.get().setStatusDetail( existingDetail + s"; Creation failed: $creationErrorDetails." diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 0c6310b5e1d..d7cbceaf48d 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -3849,7 +3849,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { .setStatusDetail("Assignment delayed due to the configured initial rebalance delay."), new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) - .setStatusDetail("Internal topics are missing: [topic]; Unauthorized to CREATE on topics topic.")), + .setStatusDetail("Internal topics are missing: topic; Unauthorized to CREATE on topics topic.")), response.data().status()) } @@ -3883,7 +3883,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { .setStatusDetail("Assignment delayed due to the configured initial rebalance delay."), new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) - .setStatusDetail("Internal topics are missing: [topic]")), + .setStatusDetail("Internal topics are missing: topic")), response.data().status()) } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 022c1991360..2e0651c199f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2006,8 +2006,9 @@ public class GroupMetadataManager { } if (reconfigureTopology || group.configuredTopology().isEmpty()) { - log.info("[GroupId {}][MemberId {}] Configuring the topology {}", groupId, memberId, updatedTopology); - updatedConfiguredTopology = InternalTopicManager.configureTopics(logContext, metadataHash, updatedTopology, metadataImage); + log.info("[GroupId {}][MemberId {}] Configuring the topology {}", groupId, memberId, updatedTopology.topologyEpoch()); + LogContext topicManagerLogContext = new LogContext(String.format("%s[GroupId %s][MemberId %s] ", logContext.logPrefix(), groupId, memberId)); + updatedConfiguredTopology = InternalTopicManager.configureTopics(topicManagerLogContext, metadataHash, updatedTopology, metadataImage, time); group.setConfiguredTopology(updatedConfiguredTopology); } else { updatedConfiguredTopology = group.configuredTopology().get(); @@ -3649,7 +3650,7 @@ public class GroupMetadataManager { String memberId = updatedMember.memberId(); if (!updatedMember.equals(member)) { records.add(newStreamsGroupMemberRecord(groupId, updatedMember)); - log.info("[GroupId {}] Member {} updated its member metdata to {}.", + log.info("[GroupId {}][MemberId {}] Member updated its member metdata to {}.", groupId, memberId, updatedMember); return true; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java index 5c89c622a24..cb92c14d720 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.streams.StreamsTopology; @@ -45,6 +46,7 @@ import java.util.stream.Stream; */ public class InternalTopicManager { + /** * Configures the internal topics for the given topology. Given a topology and the metadata image, this method determines the number of * partitions for all internal topics and returns a {@link ConfiguredTopology} object. @@ -58,8 +60,10 @@ public class InternalTopicManager { public static ConfiguredTopology configureTopics(LogContext logContext, long metadataHash, StreamsTopology topology, - CoordinatorMetadataImage metadataImage) { + CoordinatorMetadataImage metadataImage, + Time time) { final Logger log = logContext.logger(InternalTopicManager.class); + final long startTimeMs = time.milliseconds(); final Collection<StreamsGroupTopologyValue.Subtopology> subtopologies = topology.subtopologies().values(); final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology = @@ -89,14 +93,16 @@ public class InternalTopicManager { )); Map<String, CreatableTopic> internalTopicsToCreate = missingInternalTopics(configuredSubtopologies, topology, metadataImage); + long elapsedMs = time.milliseconds() - startTimeMs; if (!internalTopicsToCreate.isEmpty()) { topicConfigurationException = Optional.of(TopicConfigurationException.missingInternalTopics( - "Internal topics are missing: " + internalTopicsToCreate.keySet() + "Internal topics are missing: " + summarizeTopics(internalTopicsToCreate.keySet()) )); - log.info("Valid topic configuration found, but internal topics are missing for topology epoch {}: {}", - topology.topologyEpoch(), topicConfigurationException.get().toString()); + log.info("Valid topic configuration found in {}ms, but internal topics are missing for topology epoch {}: {}", + elapsedMs, topology.topologyEpoch(), summarizeTopics(internalTopicsToCreate.keySet())); } else { - log.info("Valid topic configuration found, topology epoch {} is now initialized.", topology.topologyEpoch()); + log.info("Valid topic configuration found in {}ms, topology epoch {} is now initialized.", + elapsedMs, topology.topologyEpoch()); } return new ConfiguredTopology( @@ -108,8 +114,9 @@ public class InternalTopicManager { ); } catch (TopicConfigurationException e) { - log.warn("Topic configuration failed for topology epoch {}: {} ", - topology.topologyEpoch(), e.toString()); + long elapsedMs = time.milliseconds() - startTimeMs; + log.warn("Topic configuration failed for topology epoch {} in {}ms: {}", + topology.topologyEpoch(), elapsedMs, e.getMessage()); return new ConfiguredTopology( topology.topologyEpoch(), metadataHash, @@ -132,7 +139,7 @@ public class InternalTopicManager { } if (!sortedMissingTopics.isEmpty()) { throw TopicConfigurationException.missingSourceTopics( - "Source topics " + String.join(", ", sortedMissingTopics) + " are missing."); + "Source topics " + summarizeTopics(sortedMissingTopics) + " are missing."); } } @@ -322,4 +329,20 @@ public class InternalTopicManager { ).collect(Collectors.toSet()) ).toList(); } + + /** + * Formats a collection of topic names for log and exception messages. + * Includes up to 3 topic names, and if more are present, appends a summary. + */ + private static String summarizeTopics(Collection<String> topics) { + if (topics == null || topics.isEmpty()) { + return "<none>"; + } + int maxToShow = 3; + int size = topics.size(); + return topics.stream() + .limit(maxToShow) + .collect(Collectors.joining(", ")) + + (size > maxToShow ? " and " + (size - maxToShow) + " additional topics" : ""); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index da84f0b397d..03a109817ec 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -17315,7 +17315,7 @@ public class GroupMetadataManagerTest { .setWarmupTasks(List.of()) .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.MISSING_INTERNAL_TOPICS.code()) - .setStatusDetail("Internal topics are missing: [bar]"))), + .setStatusDetail("Internal topics are missing: bar"))), result.response().data() ); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index f1379e2404a..b52d3edb3fe 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -563,7 +563,8 @@ public class GroupMetadataManagerTestContext { new LogContext(), 0, group.topology().get(), - metadataImage) + metadataImage, + time) ); } }); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java index 8c7c389e562..ed13f0ef94a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCon import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage; import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; @@ -43,6 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class InternalTopicManagerTest { + public static final MockTime TIME = new MockTime(); public static final String SOURCE_TOPIC_1 = "source_topic1"; public static final String SOURCE_TOPIC_2 = "source_topic2"; public static final String REPARTITION_TOPIC = "repartition_topic"; @@ -61,7 +63,7 @@ class InternalTopicManagerTest { // SOURCE_TOPIC_2 is missing from topicMetadata StreamsTopology topology = makeTestTopology(); - final ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), 0, topology, new KRaftCoordinatorMetadataImage(metadataImage)); + final ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), 0, topology, new KRaftCoordinatorMetadataImage(metadataImage), TIME); assertEquals(Optional.empty(), configuredTopology.subtopologies()); assertTrue(configuredTopology.topicConfigurationException().isPresent()); @@ -78,7 +80,7 @@ class InternalTopicManagerTest { .build(); StreamsTopology topology = makeTestTopology(); - ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), 0, topology, new KRaftCoordinatorMetadataImage(metadataImage)); + ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), 0, topology, new KRaftCoordinatorMetadataImage(metadataImage), TIME); final Map<String, CreatableTopic> internalTopicsToBeCreated = configuredTopology.internalTopicsToBeCreated(); assertEquals(2, internalTopicsToBeCreated.size());
