This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new cdf10200d6f KAFKA-19930 Updating doc for process exceptional handler
in case of global threads (#21109)
cdf10200d6f is described below
commit cdf10200d6fc5804d4ad7bf8ed6dc5ce58b21bea
Author: Arpit Goyal <[email protected]>
AuthorDate: Thu Dec 11 10:12:26 2025 +0530
KAFKA-19930 Updating doc for process exceptional handler in case of global
threads (#21109)
Cherrypicking from this https://github.com/apache/kafka/pull/21016
Reviewers: Matthias J. Sax <[email protected]>
---
docs/streams/developer-guide/config-streams.html | 5 ++++-
.../java/org/apache/kafka/streams/StreamsConfig.java | 4 +++-
.../streams/processor/internals/ProcessorNode.java | 9 +++++++++
.../streams/processor/internals/ProcessorNodeTest.java | 18 ++++++++++++++++++
4 files changed, 34 insertions(+), 2 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.html
b/docs/streams/developer-guide/config-streams.html
index b14bd04272b..562bc93ebf3 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -990,7 +990,10 @@ rack.aware.assignment.tags: zone,cluster |
rack.aware.assignment.tags: zone,cl
<div><p>The processing exception handler allows you to manage
exceptions triggered during the processing of a record. The implemented
exception
handler needs to return a <code>FAIL</code> or
<code>CONTINUE</code> depending on the record and the exception thrown.
Returning
<code>FAIL</code> will signal that Streams should shut down and
<code>CONTINUE</code> will signal that Streams should ignore the issue
- and continue processing. The following library built-in
exception handlers are available:</p>
+ and continue processing.</p>
+ <p><strong>Note:</strong> This handler applies only to regular
stream processing tasks. It does not apply to global state store updates
+ (global threads). Exceptions occurring in global threads will
bubble up to the configured uncaught exception handler.</p>
+ <p>The following library built-in exception handlers are
available:</p>
<ul class="simple">
<li><a class="reference external"
href="/{{version}}/javadoc/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.html">LogAndContinueProcessingExceptionHandler</a>:
This handler logs the processing exception and then signals
the processing pipeline to continue processing more records.
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 2e156d43d94..fe4c3c943e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -668,7 +668,9 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG =
"processing.exception.handler";
@Deprecated
- public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC =
"Exception handling class that implements the
<code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code>
interface.";
+ public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC =
"Exception handling class that implements the
<code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code>
interface. " +
+ "Note: This handler applies only to regular stream processing
tasks. It does not apply to global state store updates (global threads). " +
+ "Exceptions occurring in global threads will bubble up to the
configured uncaught exception handler.";
/** {@code processing.guarantee} */
@SuppressWarnings("WeakerAccess")
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 2bb58eb6b82..3e953d955fb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -207,6 +207,15 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
// while Java distinguishes checked vs unchecked exceptions, other
languages
// like Scala or Kotlin do not, and thus we need to catch
`Exception`
// (instead of `RuntimeException`) to work well with those
languages
+
+ // If the processing exception handler is not set (e.g., for
global threads),
+ // rethrow the exception to let it bubble up to the uncaught
exception handler.
+ // The processing exception handler is only set for regular stream
tasks, not for
+ // global state update tasks which use a different error handling
mechanism.
+ if (processingExceptionHandler == null) {
+ throw processingException;
+ }
+
final ErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
null, // only required to pass for
DeserializationExceptionHandler
internalProcessorContext.topic(),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 5a786b7174a..300c213b1d1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -123,6 +123,24 @@ public class ProcessorNodeTest {
assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE,
TIMESTAMP)));
}
+ @Test
+ public void shouldRethrowExceptionWhenProcessingExceptionHandlerIsNull() {
+ // This simulates the global thread case where no
ProcessingExceptionHandler is set
+ final ProcessorNode<Object, Object, Object, Object> node =
+ new ProcessorNode<>(NAME, new
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
+
+ final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
+ // Initialize without a ProcessingExceptionHandler (simulates global
thread initialization)
+ node.init(internalProcessorContext);
+
+ // The exception should be rethrown since there's no handler to
process it
+ final RuntimeException exception = assertThrows(RuntimeException.class,
+ () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
+
+ assertEquals("Processing exception should be caught and handled by the
processing exception handler.",
+ exception.getMessage());
+ }
+
@ParameterizedTest
@CsvSource({
"FailedProcessingException,java.lang.RuntimeException,Fail processing",