This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 367eca0  KAFKA-12503: inform threads to resize their cache instead of 
doing so for them (#10356)
367eca0 is described below

commit 367eca083b44261d4e5fa8aa61b7990a8b35f8b0
Author: Walker Carlson <[email protected]>
AuthorDate: Thu Mar 18 19:34:39 2021 -0700

    KAFKA-12503: inform threads to resize their cache instead of doing so for 
them (#10356)
    
    Make it so threads do not directly resize other thread's caches
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../kafka/streams/processor/internals/GlobalStreamThread.java    | 9 ++++++++-
 .../apache/kafka/streams/processor/internals/StreamThread.java   | 9 ++++++++-
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index ac8ce54..4d416ad 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -41,6 +41,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.CREATED;
 import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
@@ -62,6 +63,7 @@ public class GlobalStreamThread extends Thread {
     private final ThreadCache cache;
     private final StreamsMetricsImpl streamsMetrics;
     private final ProcessorTopology topology;
+    private final AtomicLong cacheSize;
     private volatile StreamsException startupException;
     private java.util.function.Consumer<Throwable> 
streamsUncaughtExceptionHandler;
 
@@ -215,6 +217,7 @@ public class GlobalStreamThread extends Thread {
         this.cache = new ThreadCache(logContext, cacheSizeBytes, 
this.streamsMetrics);
         this.stateRestoreListener = stateRestoreListener;
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
+        this.cacheSize = new AtomicLong(-1L);
     }
 
     static class StateConsumer {
@@ -302,6 +305,10 @@ public class GlobalStreamThread extends Thread {
         boolean wipeStateStore = false;
         try {
             while (stillRunning()) {
+                final long size = cacheSize.getAndSet(-1L);
+                if (size != -1L) {
+                    cache.resize(size);
+                }
                 stateConsumer.pollAndUpdate();
             }
         } catch (final InvalidOffsetException recoverableException) {
@@ -344,7 +351,7 @@ public class GlobalStreamThread extends Thread {
     }
 
     public void resize(final long cacheSize) {
-        cache.resize(cacheSize);
+        this.cacheSize.set(cacheSize);
     }
 
     private StateConsumer initialize() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 84ed4fa..39bf1a5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -303,9 +303,11 @@ public class StreamThread extends Thread {
     private java.util.function.Consumer<Throwable> 
streamsUncaughtExceptionHandler;
     private Runnable shutdownErrorHook;
     private AtomicInteger assignmentErrorCode;
+    private AtomicLong cacheResizeSize;
     private final ProcessingMode processingMode;
     private AtomicBoolean leaveGroupRequested;
 
+
     public static StreamThread create(final InternalTopologyBuilder builder,
                                       final StreamsConfig config,
                                       final KafkaClientSupplier clientSupplier,
@@ -490,6 +492,7 @@ public class StreamThread extends Thread {
         this.commitRatioSensor = ThreadMetrics.commitRatioSensor(threadId, 
streamsMetrics);
         this.failedStreamThreadSensor = 
ClientMetrics.failedStreamThreadSensor(streamsMetrics);
         this.assignmentErrorCode = assignmentErrorCode;
+        this.cacheResizeSize = new AtomicLong(-1L);
         this.shutdownErrorHook = shutdownErrorHook;
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
         this.cacheResizer = cacheResizer;
@@ -575,6 +578,10 @@ public class StreamThread extends Thread {
                             "All clients in this app will now begin to 
shutdown");
                     mainConsumer.enforceRebalance();
                 }
+                final Long size = cacheResizeSize.getAndSet(-1L);
+                if (size != -1L) {
+                    cacheResizer.accept(size);
+                }
                 runOnce();
                 if (nextProbingRebalanceMs.get() < time.milliseconds()) {
                     log.info("Triggering the followup rebalance scheduled for 
{} ms.", nextProbingRebalanceMs.get());
@@ -686,7 +693,7 @@ public class StreamThread extends Thread {
     }
 
     public void resizeCache(final long size) {
-        cacheResizer.accept(size);
+        cacheResizeSize.set(size);
     }
 
     /**

Reply via email to