This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ec3694e83d5 KAFKA-19939 StreamsConfig should log WARN by default
(#21682)
ec3694e83d5 is described below
commit ec3694e83d512eea929ffac2cb5e40ccf6d4b110
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 9 06:39:58 2026 -0700
KAFKA-19939 StreamsConfig should log WARN by default (#21682)
For backward compatibility, we keep the processing exception handler
disabled on the global thread by default. StreamsConfig should WARN
about it, to encourage users to enable it.
Reviewers: Lucas Brutschy <[email protected]>, Arpit goyal
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/streams/StreamsConfig.java | 6 +++++
.../apache/kafka/streams/StreamsConfigTest.java | 28 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index d4767aa184b..f2f402034c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1536,6 +1536,12 @@ public class StreamsConfig extends AbstractConfig {
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
verifyClientTelemetryConfigs();
verifyStreamsProtocolCompatibility(doLog);
+ if
(!getBoolean(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG))
{
+ log.warn("Processing exception handler is not enabled for the
GlobalThread. " +
+ "It's recommended to set `" +
StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG + "` to true
to enable it. " +
+ "Enabling the processing exception handler for global
state/KTable processing now, ensures future backward compatibility. " +
+ "The processing exception handler will get enabled by default
with Apache Kafka 5.0 release.");
+ }
}
private void verifyStreamsProtocolCompatibility(final boolean doLog) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index d58909094cf..f33aa9bd767 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1786,11 +1786,39 @@ public class StreamsConfigTest {
"Please set group.protocol=classic or remove group.instance.id
from the configuration."));
}
+ @Test
public void shouldSetDefaultDeadLetterQueue() {
final StreamsConfig config = new StreamsConfig(props);
assertNull(config.getString(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
+ @Test
+ public void
shouldLogWarningWhenProcessingExceptionHandlerIsNotEnabledOnGlobalThread() {
+ try (LogCaptureAppender streamsConfigLogs =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ streamsConfigLogs.setClassLogger(StreamsConfig.class, Level.WARN);
+ streamsConfig = new StreamsConfig(props);
+
+ assertEquals(1, streamsConfigLogs.getMessages().size());
+ assertTrue(streamsConfigLogs
+ .getMessages(Level.WARN.name())
+ .get(0)
+ .startsWith("Processing exception handler is not enabled for
the GlobalThread.")
+ );
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void
shouldNotLogWarningWhenProcessingExceptionHandlerIsEnabledOnGlobalThread() {
+
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
true);
+ try (LogCaptureAppender streamsConfigLogs =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ streamsConfigLogs.setClassLogger(StreamsConfig.class, Level.WARN);
+ streamsConfig = new StreamsConfig(props);
+
+ assertEquals(0, streamsConfigLogs.getMessages().size());
+ }
+ }
+
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean
isKey) {