This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8f360c5f53 KAFKA-17744: Improve state updater logs when restorating 
state (#17714)
c8f360c5f53 is described below

commit c8f360c5f53716267d80d84d20f24f7043dcc4e6
Author: Sebastien Viale <[email protected]>
AuthorDate: Tue Nov 12 17:20:44 2024 +0100

    KAFKA-17744: Improve state updater logs when restorating state (#17714)
    
    The logs for Kafka Streams local state restoration incorrectly refer to the 
StreamThread instead of the StateUpdater thread, which is responsible for 
decoupling the restoration process. The restore consumer also references 
StreamThread instead of StateUpdater.
    
    This commit corrects the log message for more clarity.
    
    Reviewer: Bruno Cadonna <[email protected]>
---
 .../kafka/streams/processor/internals/StreamThread.java       | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 88e599a92cd..e223355f090 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -369,10 +369,15 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                                       final Runnable shutdownErrorHook,
                                       final BiConsumer<Throwable, Boolean> 
streamsUncaughtExceptionHandler) {
 
+        final boolean stateUpdaterEnabled = 
InternalConfig.stateUpdaterEnabled(config.originals());
+
         final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
+        final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, 
STATE_UPDATER_ID_SUBSTRING);
+        final String restorationThreadId = stateUpdaterEnabled ? 
stateUpdaterId : threadId;
 
         final String logPrefix = String.format("stream-thread [%s] ", 
threadId);
         final LogContext logContext = new LogContext(logPrefix);
+        final LogContext restorationLogContext = stateUpdaterEnabled ? new 
LogContext(String.format("state-updater [%s] ", restorationThreadId)) : 
logContext;
         final Logger log = logContext.logger(StreamThread.class);
 
         final ReferenceContainer referenceContainer = new ReferenceContainer();
@@ -382,13 +387,13 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         referenceContainer.clientTags = config.getClientTags();
 
         log.info("Creating restore consumer client");
-        final Map<String, Object> restoreConsumerConfigs = 
config.getRestoreConsumerConfigs(restoreConsumerClientId(threadId));
+        final Map<String, Object> restoreConsumerConfigs = 
config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId));
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(
             time,
             config,
-            logContext,
+            restorationLogContext,
             adminClient,
             restoreConsumer,
             userStateRestoreListener,
@@ -397,7 +402,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, 
streamsMetrics);
 
-        final boolean stateUpdaterEnabled = 
InternalConfig.stateUpdaterEnabled(config.originals());
         final boolean proceessingThreadsEnabled = 
InternalConfig.processingThreadsEnabled(config.originals());
         final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
             topologyMetadata,
@@ -475,7 +479,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         taskManager.setMainConsumer(mainConsumer);
         referenceContainer.mainConsumer = mainConsumer;
 
-        final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, 
STATE_UPDATER_ID_SUBSTRING);
         final StreamsThreadMetricsDelegatingReporter reporter = new 
StreamsThreadMetricsDelegatingReporter(mainConsumer, threadId, stateUpdaterId);
         streamsMetrics.metricsRegistry().addReporter(reporter);
 

Reply via email to