This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2651a51aae1 MINOR: Avoid repeated logger creation in
InternalTopicManager (#21494)
2651a51aae1 is described below
commit 2651a51aae1f4c00e2a2b3054c5824efd10b333a
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Feb 18 10:17:50 2026 +0100
MINOR: Avoid repeated logger creation in InternalTopicManager (#21494)
As reported in https://github.com/apache/kafka/pull/19138,
`LoggerFactory.getLogger()` is expensive with log4j2.
`InternalTopicManager.configureTopics` and the helper classes it
instantiates (`RepartitionTopics`, `CopartitionedTopicsEnforcer`,
`ChangelogTopics`) each called `logContext.logger()` on every
invocation, creating 4 new loggers per call. Since `configureTopics` is
called on the streams group heartbeat path, this adds unnecessary CPU
overhead. ConfigureTopics is not called very frequently (only when
groups are created or topic metadata changes), but it's still good to
avoid the overhead.
This changes all four classes to accept a pre-created `Logger` instead
of a `LogContext`, so that the caller's existing logger is reused rather
than creating new loggers on every `configureTopics` call.
Reviewers: Matthias J. Sax <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 3 +-
.../group/streams/topics/ChangelogTopics.java | 7 ++---
.../topics/CopartitionedTopicsEnforcer.java | 7 ++---
.../group/streams/topics/InternalTopicManager.java | 32 ++++++++++++----------
.../group/streams/topics/RepartitionTopics.java | 7 ++---
.../group/GroupMetadataManagerTestContext.java | 4 ++-
.../group/streams/topics/ChangelogTopicsTest.java | 19 +++++++------
.../topics/CopartitionedTopicsEnforcerTest.java | 23 ++++++++--------
.../streams/topics/InternalTopicManagerTest.java | 8 ++++--
.../streams/topics/RepartitionTopicsTest.java | 19 +++++++------
10 files changed, 67 insertions(+), 62 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index e440ee57da2..a79c3a6a8d9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1997,8 +1997,7 @@ public class GroupMetadataManager {
if (reconfigureTopology || group.configuredTopology().isEmpty()) {
log.info("[GroupId {}][MemberId {}] Configuring the topology
{}", groupId, memberId, updatedTopology.topologyEpoch());
- LogContext topicManagerLogContext = new
LogContext(String.format("%s[GroupId %s][MemberId %s] ",
logContext.logPrefix(), groupId, memberId));
- updatedConfiguredTopology =
InternalTopicManager.configureTopics(topicManagerLogContext, metadataHash,
updatedTopology, metadataImage, time);
+ updatedConfiguredTopology =
InternalTopicManager.configureTopics(log, groupId, memberId, metadataHash,
updatedTopology, metadataImage, time);
group.setConfiguredTopology(updatedConfiguredTopology);
} else {
updatedConfiguredTopology = group.configuredTopology().get();
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
index b69858cd0ed..1e3c1f9aa81 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
@@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
-import org.apache.kafka.common.utils.LogContext;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
@@ -44,7 +43,7 @@ public class ChangelogTopics {
/**
* Constructor for ChangelogTopics.
*
- * @param logContext The context for emitting log
messages.
+ * @param log The logger.
* @param subtopologies The subtopologies for the
requested topology.
* @param topicPartitionCountProvider Returns the number of partitions
for a given topic, representing the current state of the broker
* as well as any partition number
decisions that have already been made. In particular, we expect
@@ -52,11 +51,11 @@ public class ChangelogTopics {
* broker yet.
*/
public ChangelogTopics(
- final LogContext logContext,
+ final Logger log,
final Collection<Subtopology> subtopologies,
final Function<String, OptionalInt> topicPartitionCountProvider
) {
- this.log = logContext.logger(getClass());
+ this.log = log;
this.subtopologies = subtopologies;
this.topicPartitionCountProvider = topicPartitionCountProvider;
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java
index d7955126a5c..3de0d5fc135 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java
@@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
-import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
@@ -44,14 +43,14 @@ public class CopartitionedTopicsEnforcer {
/**
* The constructor for the class.
*
- * @param logContext The context for emitting log
messages.
+ * @param log The logger.
* @param topicPartitionCountProvider Returns the number of partitions for
a given topic, representing the current state of the broker
* as well as any partition number
decisions that have already been made. In particular, we expect
* the number of partitions for all
topics in all co-partitions groups to be defined.
*/
- public CopartitionedTopicsEnforcer(final LogContext logContext,
+ public CopartitionedTopicsEnforcer(final Logger log,
final Function<String, OptionalInt>
topicPartitionCountProvider) {
- this.log = logContext.logger(getClass());
+ this.log = log;
this.topicPartitionCountProvider = topicPartitionCountProvider;
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
index 6cd5e75462e..5befc16a878 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
@@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
-import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
@@ -51,18 +50,21 @@ public class InternalTopicManager {
* Configures the internal topics for the given topology. Given a topology
and the metadata image, this method determines the number of
* partitions for all internal topics and returns a {@link
ConfiguredTopology} object.
*
- * @param logContext The log context.
+ * @param log The logger.
+ * @param groupId The group id.
+ * @param memberId The member id.
* @param metadataHash The metadata hash of the group.
* @param topology The topology.
* @param metadataImage The metadata image.
* @return The configured topology.
*/
- public static ConfiguredTopology configureTopics(LogContext logContext,
+ public static ConfiguredTopology configureTopics(Logger log,
+ String groupId,
+ String memberId,
long metadataHash,
StreamsTopology topology,
CoordinatorMetadataImage
metadataImage,
Time time) {
- final Logger log = logContext.logger(InternalTopicManager.class);
final long startTimeMs = time.milliseconds();
final Collection<StreamsGroupTopologyValue.Subtopology> subtopologies
= topology.subtopologies().values();
@@ -79,7 +81,7 @@ public class InternalTopicManager {
throwOnMissingSourceTopics(topology, metadataImage);
Map<String, Integer> decidedPartitionCountsForInternalTopics =
- decidePartitionCounts(logContext, topology, metadataImage,
copartitionGroupsBySubtopology);
+ decidePartitionCounts(log, topology, metadataImage,
copartitionGroupsBySubtopology);
final SortedMap<String, ConfiguredSubtopology>
configuredSubtopologies =
subtopologies.stream()
@@ -98,11 +100,11 @@ public class InternalTopicManager {
topicConfigurationException =
Optional.of(TopicConfigurationException.missingInternalTopics(
"Internal topics are missing: " +
summarizeTopics(internalTopicsToCreate.keySet())
));
- log.info("Valid topic configuration found in {}ms, but
internal topics are missing for topology epoch {}: {}",
- elapsedMs, topology.topologyEpoch(),
summarizeTopics(internalTopicsToCreate.keySet()));
+ log.info("[GroupId {}][MemberId {}] Valid topic configuration
found in {}ms, but internal topics are missing for topology epoch {}: {}",
+ groupId, memberId, elapsedMs, topology.topologyEpoch(),
summarizeTopics(internalTopicsToCreate.keySet()));
} else {
- log.info("Valid topic configuration found in {}ms, topology
epoch {} is now initialized.",
- elapsedMs, topology.topologyEpoch());
+ log.info("[GroupId {}][MemberId {}] Valid topic configuration
found in {}ms, topology epoch {} is now initialized.",
+ groupId, memberId, elapsedMs, topology.topologyEpoch());
}
return new ConfiguredTopology(
@@ -115,8 +117,8 @@ public class InternalTopicManager {
} catch (TopicConfigurationException e) {
long elapsedMs = time.milliseconds() - startTimeMs;
- log.warn("Topic configuration failed for topology epoch {} in
{}ms: {}",
- topology.topologyEpoch(), elapsedMs, e.getMessage());
+ log.warn("[GroupId {}][MemberId {}] Topic configuration failed for
topology epoch {} in {}ms: {}",
+ groupId, memberId, topology.topologyEpoch(), elapsedMs,
e.getMessage());
return new ConfiguredTopology(
topology.topologyEpoch(),
metadataHash,
@@ -143,7 +145,7 @@ public class InternalTopicManager {
}
}
- private static Map<String, Integer> decidePartitionCounts(final LogContext
logContext,
+ private static Map<String, Integer> decidePartitionCounts(final Logger log,
final
StreamsTopology topology,
final
CoordinatorMetadataImage metadataImage,
final
Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology) {
@@ -151,13 +153,13 @@ public class InternalTopicManager {
final Function<String, OptionalInt> topicPartitionCountProvider =
topic -> getPartitionCount(metadataImage, topic,
decidedPartitionCountsForInternalTopics);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
- logContext,
+ log,
topology.subtopologies().values(),
topicPartitionCountProvider);
final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer = new
CopartitionedTopicsEnforcer(
- logContext,
+ log,
topicPartitionCountProvider);
- final ChangelogTopics changelogTopics = new ChangelogTopics(logContext,
+ final ChangelogTopics changelogTopics = new ChangelogTopics(log,
topology.subtopologies().values(),
topicPartitionCountProvider);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
index c37902b1f17..4dac9fb77fa 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java
@@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
-import org.apache.kafka.common.utils.LogContext;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
@@ -45,15 +44,15 @@ public class RepartitionTopics {
/**
* The constructor for the class.
*
- * @param logContext The context for emitting log
messages.
+ * @param log The logger.
* @param subtopologies The subtopologies for the requested
topology.
* @param topicPartitionCountProvider Returns the number of partitions
for a given topic, representing the current state of the
* broker. This class requires the
number of partition for all source topics to be defined.
*/
- public RepartitionTopics(final LogContext logContext,
+ public RepartitionTopics(final Logger log,
final Collection<Subtopology> subtopologies,
final Function<String, OptionalInt>
topicPartitionCountProvider) {
- this.log = logContext.logger(getClass());
+ this.log = log;
this.subtopologies = subtopologies;
this.topicPartitionCountProvider = topicPartitionCountProvider;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 1bbfd86f96a..cbaa5770541 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -560,7 +560,9 @@ public class GroupMetadataManagerTestContext {
StreamsGroup group =
context.groupMetadataManager.getStreamsGroupOrThrow(builder.groupId());
if (group.topology().isPresent()) {
group.setConfiguredTopology(InternalTopicManager.configureTopics(
- new LogContext(),
+ logContext.logger(InternalTopicManager.class),
+ builder.groupId(),
+ "",
0,
group.topology().get(),
metadataImage,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java
index b8f2f1c00e6..9c744d84c56 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java
@@ -17,12 +17,13 @@
package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
-import org.apache.kafka.common.utils.LogContext;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
@@ -35,7 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ChangelogTopicsTest {
- private static final LogContext LOG_CONTEXT = new LogContext();
+ private static final Logger LOG =
LoggerFactory.getLogger(ChangelogTopicsTest.class);
private static final String SOURCE_TOPIC_NAME = "source";
private static final String SINK_TOPIC_NAME = "sink";
private static final String REPARTITION_TOPIC_NAME = "repartition";
@@ -102,7 +103,7 @@ public class ChangelogTopicsTest {
final List<Subtopology> subtopologies =
List.of(SUBTOPOLOGY_NO_SOURCE_NO_REPARTITION_SOURCE);
final ChangelogTopics changelogTopics =
- new ChangelogTopics(LOG_CONTEXT, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
+ new ChangelogTopics(LOG, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
StreamsInvalidTopologyException e =
assertThrows(StreamsInvalidTopologyException.class, changelogTopics::setup);
assertTrue(e.getMessage().contains("No source topics found for
subtopology"));
@@ -113,7 +114,7 @@ public class ChangelogTopicsTest {
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_NO_SOURCE);
final ChangelogTopics changelogTopics =
- new ChangelogTopics(LOG_CONTEXT, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
+ new ChangelogTopics(LOG, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
assertDoesNotThrow(changelogTopics::setup);
}
@@ -122,7 +123,7 @@ public class ChangelogTopicsTest {
final List<Subtopology> subtopologies =
List.of(SUBTOPOLOGY_NO_REPARTITION_SOURCE);
final ChangelogTopics changelogTopics =
- new ChangelogTopics(LOG_CONTEXT, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
+ new ChangelogTopics(LOG, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
assertDoesNotThrow(changelogTopics::setup);
}
@@ -131,7 +132,7 @@ public class ChangelogTopicsTest {
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_STATELESS);
final ChangelogTopics changelogTopics =
- new ChangelogTopics(LOG_CONTEXT, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
+ new ChangelogTopics(LOG, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
Map<String, Integer> setup = changelogTopics.setup();
assertEquals(Map.of(), setup);
@@ -142,7 +143,7 @@ public class ChangelogTopicsTest {
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_STATEFUL);
final ChangelogTopics changelogTopics =
- new ChangelogTopics(LOG_CONTEXT, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
+ new ChangelogTopics(LOG, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
Map<String, Integer> setup = changelogTopics.setup();
assertEquals(Map.of(CHANGELOG_TOPIC_CONFIG.name(), 3), setup);
@@ -153,7 +154,7 @@ public class ChangelogTopicsTest {
final List<Subtopology> subtopologies =
List.of(SUBTOPOLOGY_SOURCE_CHANGELOG);
final ChangelogTopics changelogTopics =
- new ChangelogTopics(LOG_CONTEXT, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
+ new ChangelogTopics(LOG, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
Map<String, Integer> setup = changelogTopics.setup();
assertEquals(Map.of(SOURCE_TOPIC_NAME, 3), setup);
@@ -164,7 +165,7 @@ public class ChangelogTopicsTest {
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_BOTH);
final ChangelogTopics changelogTopics =
- new ChangelogTopics(LOG_CONTEXT, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
+ new ChangelogTopics(LOG, subtopologies,
ChangelogTopicsTest::topicPartitionProvider);
Map<String, Integer> setup = changelogTopics.setup();
assertEquals(Map.of(CHANGELOG_TOPIC_CONFIG.name(), 3,
SOURCE_TOPIC_NAME, 3), setup);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java
index 1f294f60bb8..bed66df9a56 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java
@@ -17,9 +17,10 @@
package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
-import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.OptionalInt;
@@ -32,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class CopartitionedTopicsEnforcerTest {
- private static final LogContext LOG_CONTEXT = new LogContext();
+ private static final Logger LOG =
LoggerFactory.getLogger(CopartitionedTopicsEnforcerTest.class);
private static final String REPARTITION_TOPIC_1 = "repartitioned-1";
private static final String REPARTITION_TOPIC_2 = "repartitioned-2";
private static final String REPARTITION_TOPIC_3 = "repartitioned-3";
@@ -50,7 +51,7 @@ public class CopartitionedTopicsEnforcerTest {
public void
shouldThrowIllegalStateExceptionIfNoPartitionsFoundForCoPartitionedTopic() {
final Map<String, Integer> topicPartitionCounts = Map.of();
final CopartitionedTopicsEnforcer enforcer =
- new CopartitionedTopicsEnforcer(LOG_CONTEXT,
topicPartitionProvider(topicPartitionCounts));
+ new CopartitionedTopicsEnforcer(LOG,
topicPartitionProvider(topicPartitionCounts));
final IllegalStateException ex =
assertThrows(IllegalStateException.class, () ->
enforcer.enforce(
@@ -65,7 +66,7 @@ public class CopartitionedTopicsEnforcerTest {
public void
shouldThrowTopicConfigurationExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch()
{
final Map<String, Integer> topicPartitionCounts =
Map.of(SOURCE_TOPIC_1, 2, SOURCE_TOPIC_2, 1);
final CopartitionedTopicsEnforcer enforcer =
- new CopartitionedTopicsEnforcer(LOG_CONTEXT,
topicPartitionProvider(topicPartitionCounts));
+ new CopartitionedTopicsEnforcer(LOG,
topicPartitionProvider(topicPartitionCounts));
final TopicConfigurationException ex =
assertThrows(TopicConfigurationException.class, () ->
enforcer.enforce(
@@ -87,7 +88,7 @@ public class CopartitionedTopicsEnforcerTest {
REPARTITION_TOPIC_1, 10
);
final CopartitionedTopicsEnforcer enforcer =
- new CopartitionedTopicsEnforcer(LOG_CONTEXT,
topicPartitionProvider(topicPartitionCounts));
+ new CopartitionedTopicsEnforcer(LOG,
topicPartitionProvider(topicPartitionCounts));
final Map<String, Integer> result =
enforcer.enforce(
@@ -107,7 +108,7 @@ public class CopartitionedTopicsEnforcerTest {
REPARTITION_TOPIC_3, 5
);
final CopartitionedTopicsEnforcer enforcer =
- new CopartitionedTopicsEnforcer(LOG_CONTEXT,
topicPartitionProvider(topicPartitionCounts));
+ new CopartitionedTopicsEnforcer(LOG,
topicPartitionProvider(topicPartitionCounts));
final Map<String, Integer> result = enforcer.enforce(
Set.of(REPARTITION_TOPIC_1, REPARTITION_TOPIC_2,
REPARTITION_TOPIC_3),
@@ -129,7 +130,7 @@ public class CopartitionedTopicsEnforcerTest {
REPARTITION_TOPIC_2, 5
);
final CopartitionedTopicsEnforcer enforcer =
- new CopartitionedTopicsEnforcer(LOG_CONTEXT,
topicPartitionProvider(topicPartitionCounts));
+ new CopartitionedTopicsEnforcer(LOG,
topicPartitionProvider(topicPartitionCounts));
final TopicConfigurationException ex = assertThrows(
TopicConfigurationException.class,
@@ -156,7 +157,7 @@ public class CopartitionedTopicsEnforcerTest {
REPARTITION_TOPIC_2, 10
);
final CopartitionedTopicsEnforcer enforcer =
- new CopartitionedTopicsEnforcer(LOG_CONTEXT,
topicPartitionProvider(topicPartitionCounts));
+ new CopartitionedTopicsEnforcer(LOG,
topicPartitionProvider(topicPartitionCounts));
final Map<String, Integer> enforced = enforcer.enforce(
Set.of(REPARTITION_TOPIC_1, REPARTITION_TOPIC_2),
@@ -177,7 +178,7 @@ public class CopartitionedTopicsEnforcerTest {
SOURCE_TOPIC_1, 2
);
final CopartitionedTopicsEnforcer enforcer =
- new CopartitionedTopicsEnforcer(LOG_CONTEXT,
topicPartitionProvider(topicPartitionCounts));
+ new CopartitionedTopicsEnforcer(LOG,
topicPartitionProvider(topicPartitionCounts));
final TopicConfigurationException ex = assertThrows(
TopicConfigurationException.class,
@@ -201,7 +202,7 @@ public class CopartitionedTopicsEnforcerTest {
SOURCE_TOPIC_1, 2
);
final CopartitionedTopicsEnforcer enforcer =
- new CopartitionedTopicsEnforcer(LOG_CONTEXT,
topicPartitionProvider(topicPartitionCounts));
+ new CopartitionedTopicsEnforcer(LOG,
topicPartitionProvider(topicPartitionCounts));
final Map<String, Integer> enforced = enforcer.enforce(
Set.of(REPARTITION_TOPIC_1, SOURCE_TOPIC_1),
@@ -222,7 +223,7 @@ public class CopartitionedTopicsEnforcerTest {
REPARTITION_TOPIC_3, 2
);
final CopartitionedTopicsEnforcer enforcer =
- new CopartitionedTopicsEnforcer(LOG_CONTEXT,
topicPartitionProvider(topicPartitionCounts));
+ new CopartitionedTopicsEnforcer(LOG,
topicPartitionProvider(topicPartitionCounts));
final Map<String, Integer> enforced = enforcer.enforce(
Set.of(REPARTITION_TOPIC_1, REPARTITION_TOPIC_2,
REPARTITION_TOPIC_3),
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
index ed13f0ef94a..4cc97fb3e53 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -21,7 +21,6 @@ import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
-import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
@@ -31,6 +30,8 @@ import
org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
@@ -44,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class InternalTopicManagerTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(InternalTopicManagerTest.class);
public static final MockTime TIME = new MockTime();
public static final String SOURCE_TOPIC_1 = "source_topic1";
public static final String SOURCE_TOPIC_2 = "source_topic2";
@@ -63,7 +65,7 @@ class InternalTopicManagerTest {
// SOURCE_TOPIC_2 is missing from topicMetadata
StreamsTopology topology = makeTestTopology();
- final ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(new LogContext(), 0, topology, new
KRaftCoordinatorMetadataImage(metadataImage), TIME);
+ final ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(LOG, "test-group", "test-member", 0,
topology, new KRaftCoordinatorMetadataImage(metadataImage), TIME);
assertEquals(Optional.empty(), configuredTopology.subtopologies());
assertTrue(configuredTopology.topicConfigurationException().isPresent());
@@ -80,7 +82,7 @@ class InternalTopicManagerTest {
.build();
StreamsTopology topology = makeTestTopology();
- ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(new LogContext(), 0, topology, new
KRaftCoordinatorMetadataImage(metadataImage), TIME);
+ ConfiguredTopology configuredTopology =
InternalTopicManager.configureTopics(LOG, "test-group", "test-member", 0,
topology, new KRaftCoordinatorMetadataImage(metadataImage), TIME);
final Map<String, CreatableTopic> internalTopicsToBeCreated =
configuredTopology.internalTopicsToBeCreated();
assertEquals(2, internalTopicsToBeCreated.size());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
index 8d1adb69e20..c7ef62b355e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java
@@ -17,11 +17,12 @@
package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
-import org.apache.kafka.common.utils.LogContext;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
@@ -34,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class RepartitionTopicsTest {
- private static final LogContext LOG_CONTEXT = new LogContext();
+ private static final Logger LOG =
LoggerFactory.getLogger(RepartitionTopicsTest.class);
private static final String SOURCE_TOPIC_NAME1 = "source1";
private static final String SOURCE_TOPIC_NAME2 = "source2";
private static final TopicInfo REPARTITION_TOPIC1 = new
TopicInfo().setName("repartition1").setPartitions(4);
@@ -56,7 +57,7 @@ public class RepartitionTopicsTest {
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1));
final List<Subtopology> subtopologies = List.of(subtopology1,
subtopology2);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
- LOG_CONTEXT,
+ LOG,
subtopologies,
RepartitionTopicsTest::sourceTopicPartitionCounts
);
@@ -81,7 +82,7 @@ public class RepartitionTopicsTest {
final Function<String, OptionalInt> topicPartitionCountProvider =
s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() :
sourceTopicPartitionCounts(s);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
- LOG_CONTEXT,
+ LOG,
List.of(subtopology1, subtopology2),
topicPartitionCountProvider
);
@@ -99,7 +100,7 @@ public class RepartitionTopicsTest {
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()));
final RepartitionTopics repartitionTopics = new RepartitionTopics(
- LOG_CONTEXT,
+ LOG,
List.of(subtopology1),
RepartitionTopicsTest::sourceTopicPartitionCounts
);
@@ -118,7 +119,7 @@ public class RepartitionTopicsTest {
.setSubtopologyId("subtopology1")
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT));
final RepartitionTopics repartitionTopics = new RepartitionTopics(
- LOG_CONTEXT,
+ LOG,
List.of(subtopology1),
RepartitionTopicsTest::sourceTopicPartitionCounts
);
@@ -142,7 +143,7 @@ public class RepartitionTopicsTest {
.setSubtopologyId("subtopologyWithoutPartitionCount")
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1,
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT));
final RepartitionTopics repartitionTopics = new RepartitionTopics(
- LOG_CONTEXT,
+ LOG,
List.of(subtopology, subtopologyWithoutPartitionCount),
RepartitionTopicsTest::sourceTopicPartitionCounts
);
@@ -168,7 +169,7 @@ public class RepartitionTopicsTest {
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
final RepartitionTopics repartitionTopics = new RepartitionTopics(
- LOG_CONTEXT,
+ LOG,
List.of(subtopology, subtopologyWithoutPartitionCount),
RepartitionTopicsTest::sourceTopicPartitionCounts
);
@@ -190,7 +191,7 @@ public class RepartitionTopicsTest {
.setSubtopologyId("subtopology0")
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1));
final RepartitionTopics repartitionTopics = new RepartitionTopics(
- LOG_CONTEXT,
+ LOG,
List.of(subtopology),
RepartitionTopicsTest::sourceTopicPartitionCounts
);