This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c552631801f MINOR: Improve streams groups log messages (#21311)
c552631801f is described below
commit c552631801f709ff7d1090b99dd5f0053d5c08ce
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jan 19 15:25:57 2026 +0100
MINOR: Improve streams groups log messages (#21311)
Minor logging improvements for streams groups.
- In InternalTopicManager, we'd not log the group ID or member ID,
making the logs difficult discover.
- Some logs could turn out very verbose
- We'd log the whole topology. This is difficult to parse from a log
message, and can be determined by using `StreamsGroupDescribe` instead.
- We'd log all missing internal topics. Instead, restrict it to the
first 5.
- The statusDetail returned by `StreamsGroupHeartbeat` (and logged on
the client) would also become fairly large, logging all topic creation
errors for all internal topics that we failed to create. Restrict it to
the first 3.
---
core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++++-
.../kafka/api/AuthorizerIntegrationTest.scala | 4 +--
.../coordinator/group/GroupMetadataManager.java | 7 ++--
.../group/streams/topics/InternalTopicManager.java | 39 +++++++++++++++++-----
.../group/GroupMetadataManagerTest.java | 2 +-
.../group/GroupMetadataManagerTestContext.java | 3 +-
.../streams/topics/InternalTopicManagerTest.java | 6 ++--
7 files changed, 51 insertions(+), 18 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index dc7194074ed..f4c23c401ec 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2836,8 +2836,14 @@ class KafkaApis(val requestChannel: RequestChannel,
if (cachedErrors.nonEmpty) {
val missingInternalTopicStatus =
responseData.status().stream().filter(x => x.statusCode()
==
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst()
- val creationErrorDetails = cachedErrors.map { case (topic,
error) => s"$topic ($error)" }.mkString(", ")
if (missingInternalTopicStatus.isPresent) {
+ val maxErrorsToInclude = 3
+ val errorList = cachedErrors.take(maxErrorsToInclude).map
{ case (topic, error) => s"$topic ($error)" }.mkString(", ")
+ val creationErrorDetails = if (cachedErrors.size >
maxErrorsToInclude) {
+ s"$errorList and ${cachedErrors.size -
maxErrorsToInclude} more"
+ } else {
+ errorList
+ }
val existingDetail =
Option(missingInternalTopicStatus.get().statusDetail()).getOrElse("")
missingInternalTopicStatus.get().setStatusDetail(
existingDetail + s"; Creation failed:
$creationErrorDetails."
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 5ce46083341..b4a8259fcc1 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -3851,7 +3851,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
.setStatusDetail("Assignment delayed due to the configured initial
rebalance delay."),
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
- .setStatusDetail("Internal topics are missing: [topic]; Unauthorized
to CREATE on topics topic.")),
+ .setStatusDetail("Internal topics are missing: topic; Unauthorized to
CREATE on topics topic.")),
response.data().status())
}
@@ -3885,7 +3885,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
.setStatusDetail("Assignment delayed due to the configured initial
rebalance delay."),
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
- .setStatusDetail("Internal topics are missing: [topic]")),
+ .setStatusDetail("Internal topics are missing: topic")),
response.data().status())
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 5b9d715a9b4..205c9ee2d6e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1998,8 +1998,9 @@ public class GroupMetadataManager {
}
if (reconfigureTopology || group.configuredTopology().isEmpty()) {
- log.info("[GroupId {}][MemberId {}] Configuring the topology
{}", groupId, memberId, updatedTopology);
- updatedConfiguredTopology =
InternalTopicManager.configureTopics(logContext, metadataHash, updatedTopology,
metadataImage);
+ log.info("[GroupId {}][MemberId {}] Configuring the topology
{}", groupId, memberId, updatedTopology.topologyEpoch());
+ LogContext topicManagerLogContext = new
LogContext(String.format("%s[GroupId %s][MemberId %s] ",
logContext.logPrefix(), groupId, memberId));
+ updatedConfiguredTopology =
InternalTopicManager.configureTopics(topicManagerLogContext, metadataHash,
updatedTopology, metadataImage, time);
group.setConfiguredTopology(updatedConfiguredTopology);
} else {
updatedConfiguredTopology = group.configuredTopology().get();
@@ -3529,7 +3530,7 @@ public class GroupMetadataManager {
String memberId = updatedMember.memberId();
if (!updatedMember.equals(member)) {
records.add(newStreamsGroupMemberRecord(groupId, updatedMember));
- log.info("[GroupId {}] Member {} updated its member metdata to
{}.",
+ log.info("[GroupId {}][MemberId {}] Member updated its member
metdata to {}.",
groupId, memberId, updatedMember);
return true;
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
index 5c89c622a24..cb92c14d720 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
@@ -20,6 +20,7 @@ import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
@@ -45,6 +46,7 @@ import java.util.stream.Stream;
*/
public class InternalTopicManager {
+
/**
* Configures the internal topics for the given topology. Given a topology
and the metadata image, this method determines the number of
* partitions for all internal topics and returns a {@link
ConfiguredTopology} object.
@@ -58,8 +60,10 @@ public class InternalTopicManager {
public static ConfiguredTopology configureTopics(LogContext logContext,
long metadataHash,
StreamsTopology topology,
- CoordinatorMetadataImage
metadataImage) {
+ CoordinatorMetadataImage
metadataImage,
+ Time time) {
final Logger log = logContext.logger(InternalTopicManager.class);
+ final long startTimeMs = time.milliseconds();
final Collection<StreamsGroupTopologyValue.Subtopology> subtopologies
= topology.subtopologies().values();
final Map<String, Collection<Set<String>>>
copartitionGroupsBySubtopology =
@@ -89,14 +93,16 @@ public class InternalTopicManager {
));
Map<String, CreatableTopic> internalTopicsToCreate =
missingInternalTopics(configuredSubtopologies, topology, metadataImage);
+ long elapsedMs = time.milliseconds() - startTimeMs;
if (!internalTopicsToCreate.isEmpty()) {
topicConfigurationException =
Optional.of(TopicConfigurationException.missingInternalTopics(
- "Internal topics are missing: " +
internalTopicsToCreate.keySet()
+ "Internal topics are missing: " +
summarizeTopics(internalTopicsToCreate.keySet())
));
- log.info("Valid topic configuration found, but internal topics
are missing for topology epoch {}: {}",
- topology.topologyEpoch(),
topicConfigurationException.get().toString());
+ log.info("Valid topic configuration found in {}ms, but
internal topics are missing for topology epoch {}: {}",
+ elapsedMs, topology.topologyEpoch(),
summarizeTopics(internalTopicsToCreate.keySet()));
} else {
- log.info("Valid topic configuration found, topology epoch {}
is now initialized.", topology.topologyEpoch());
+ log.info("Valid topic configuration found in {}ms, topology
epoch {} is now initialized.",
+ elapsedMs, topology.topologyEpoch());
}
return new ConfiguredTopology(
@@ -108,8 +114,9 @@ public class InternalTopicManager {
);
} catch (TopicConfigurationException e) {
- log.warn("Topic configuration failed for topology epoch {}: {} ",
- topology.topologyEpoch(), e.toString());
+ long elapsedMs = time.milliseconds() - startTimeMs;
+ log.warn("Topic configuration failed for topology epoch {} in
{}ms: {}",
+ topology.topologyEpoch(), elapsedMs, e.getMessage());
return new ConfiguredTopology(
topology.topologyEpoch(),
metadataHash,
@@ -132,7 +139,7 @@ public class InternalTopicManager {
}
if (!sortedMissingTopics.isEmpty()) {
throw TopicConfigurationException.missingSourceTopics(
- "Source topics " + String.join(", ", sortedMissingTopics) + "
are missing.");
+ "Source topics " + summarizeTopics(sortedMissingTopics) + "
are missing.");
}
}
@@ -322,4 +329,20 @@ public class InternalTopicManager {
).collect(Collectors.toSet())
).toList();
}
+
+ /**
+ * Formats a collection of topic names for log and exception messages.
+ * Includes up to 3 topic names, and if more are present, appends a
summary.
+ */
+ private static String summarizeTopics(Collection<String> topics) {
+ if (topics == null || topics.isEmpty()) {
+ return "<none>";
+ }
+ int maxToShow = 3;
+ int size = topics.size();
+ return topics.stream()
+ .limit(maxToShow)
+ .collect(Collectors.joining(", ")) +
+ (size > maxToShow ? " and " + (size - maxToShow) + " additional
topics" : "");
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6b25e42475f..5ebeb53e3d2 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -17313,7 +17313,7 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.MISSING_INTERNAL_TOPICS.code())
- .setStatusDetail("Internal topics are missing: [bar]"))),
+ .setStatusDetail("Internal topics are missing: bar"))),
result.response().data()
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index f1379e2404a..b52d3edb3fe 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -563,7 +563,8 @@ public class GroupMetadataManagerTestContext {
new LogContext(),
0,
group.topology().get(),
- metadataImage)
+ metadataImage,
+ time)
);
}
});
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
index 8c7c389e562..ed13f0ef94a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -22,6 +22,7 @@ import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCon
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
@@ -43,6 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class InternalTopicManagerTest {
+ public static final MockTime TIME = new MockTime();
public static final String SOURCE_TOPIC_1 = "source_topic1";
public static final String SOURCE_TOPIC_2 = "source_topic2";
public static final String REPARTITION_TOPIC = "repartition_topic";
@@ -61,7 +63,7 @@ class InternalTopicManagerTest {
// SOURCE_TOPIC_2 is missing from topicMetadata
StreamsTopology topology = makeTestTopology();
- final ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(new LogContext(), 0, topology, new
KRaftCoordinatorMetadataImage(metadataImage));
+ final ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(new LogContext(), 0, topology, new
KRaftCoordinatorMetadataImage(metadataImage), TIME);
assertEquals(Optional.empty(), configuredTopology.subtopologies());
assertTrue(configuredTopology.topicConfigurationException().isPresent());
@@ -78,7 +80,7 @@ class InternalTopicManagerTest {
.build();
StreamsTopology topology = makeTestTopology();
- ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(new LogContext(), 0, topology, new
KRaftCoordinatorMetadataImage(metadataImage));
+ ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(new LogContext(), 0, topology, new
KRaftCoordinatorMetadataImage(metadataImage), TIME);
final Map<String, CreatableTopic> internalTopicsToBeCreated =
configuredTopology.internalTopicsToBeCreated();
assertEquals(2, internalTopicsToBeCreated.size());