This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 367eca0 KAFKA-12503: inform threads to resize their cache instead of
doing so for them (#10356)
367eca0 is described below
commit 367eca083b44261d4e5fa8aa61b7990a8b35f8b0
Author: Walker Carlson <[email protected]>
AuthorDate: Thu Mar 18 19:34:39 2021 -0700
KAFKA-12503: inform threads to resize their cache instead of doing so for
them (#10356)
Make it so threads do not directly resize other thread's caches
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../kafka/streams/processor/internals/GlobalStreamThread.java | 9 ++++++++-
.../apache/kafka/streams/processor/internals/StreamThread.java | 9 ++++++++-
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index ac8ce54..4d416ad 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -41,6 +41,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.CREATED;
import static
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
@@ -62,6 +63,7 @@ public class GlobalStreamThread extends Thread {
private final ThreadCache cache;
private final StreamsMetricsImpl streamsMetrics;
private final ProcessorTopology topology;
+ private final AtomicLong cacheSize;
private volatile StreamsException startupException;
private java.util.function.Consumer<Throwable>
streamsUncaughtExceptionHandler;
@@ -215,6 +217,7 @@ public class GlobalStreamThread extends Thread {
this.cache = new ThreadCache(logContext, cacheSizeBytes,
this.streamsMetrics);
this.stateRestoreListener = stateRestoreListener;
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
+ this.cacheSize = new AtomicLong(-1L);
}
static class StateConsumer {
@@ -302,6 +305,10 @@ public class GlobalStreamThread extends Thread {
boolean wipeStateStore = false;
try {
while (stillRunning()) {
+ final long size = cacheSize.getAndSet(-1L);
+ if (size != -1L) {
+ cache.resize(size);
+ }
stateConsumer.pollAndUpdate();
}
} catch (final InvalidOffsetException recoverableException) {
@@ -344,7 +351,7 @@ public class GlobalStreamThread extends Thread {
}
public void resize(final long cacheSize) {
- cache.resize(cacheSize);
+ this.cacheSize.set(cacheSize);
}
private StateConsumer initialize() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 84ed4fa..39bf1a5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -303,9 +303,11 @@ public class StreamThread extends Thread {
private java.util.function.Consumer<Throwable>
streamsUncaughtExceptionHandler;
private Runnable shutdownErrorHook;
private AtomicInteger assignmentErrorCode;
+ private AtomicLong cacheResizeSize;
private final ProcessingMode processingMode;
private AtomicBoolean leaveGroupRequested;
+
public static StreamThread create(final InternalTopologyBuilder builder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
@@ -490,6 +492,7 @@ public class StreamThread extends Thread {
this.commitRatioSensor = ThreadMetrics.commitRatioSensor(threadId,
streamsMetrics);
this.failedStreamThreadSensor =
ClientMetrics.failedStreamThreadSensor(streamsMetrics);
this.assignmentErrorCode = assignmentErrorCode;
+ this.cacheResizeSize = new AtomicLong(-1L);
this.shutdownErrorHook = shutdownErrorHook;
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
this.cacheResizer = cacheResizer;
@@ -575,6 +578,10 @@ public class StreamThread extends Thread {
"All clients in this app will now begin to
shutdown");
mainConsumer.enforceRebalance();
}
+ final Long size = cacheResizeSize.getAndSet(-1L);
+ if (size != -1L) {
+ cacheResizer.accept(size);
+ }
runOnce();
if (nextProbingRebalanceMs.get() < time.milliseconds()) {
log.info("Triggering the followup rebalance scheduled for
{} ms.", nextProbingRebalanceMs.get());
@@ -686,7 +693,7 @@ public class StreamThread extends Thread {
}
public void resizeCache(final long size) {
- cacheResizer.accept(size);
+ cacheResizeSize.set(size);
}
/**