Thank you for the detailed questions! Let me clarify the implementation
approach:
Which Method Will Be Called?
GlobalThread will call the NEW handleError() method (not the deprecated
handle()).
Key Point: The exception handler is not called directly by GlobalThread.
Instead, it's called by ProcessorNode.process(), which already invokes
handleError() for regular processors.
The implementation is straightforward:
Current code (GlobalStateUpdateTask.initTopology - line 161):
node.init((InternalProcessorContext) this.processorContext); // No
handler passed
Proposed change:
node.init((InternalProcessorContext) this.processorContext,
processingExceptionHandler); // Pass handler
Once the handler is passed to ProcessorNode, the same code path that
handles exceptions for regular KStream/KTable processors
(ProcessorNode.process() line 236) will automatically handle GlobalKTable
exceptions:
Response response =
processingExceptionHandler.handleError(errorHandlerContext, record,
exception);
There's no separate code path for GlobalThread - it reuses the existing
ProcessorNode exception handling mechanism.
Backward Compatibility
The handleError() method provides automatic backward compatibility via
its default implementation:
default Response handleError(...) {
return new Response(Result.from(handle(...)),
Collections.emptyList());
}
- If users implement the old handle() method: handleError() delegates to
it automatically
- If users implement the new handleError() method: it's used directly
- No code changes required for existing applications
Dead Letter Queue (DLQ) Support
This is where GlobalKTable differs from regular processors:
The Limitation: GlobalThread does not have a Producer, so it cannot send
DLQ records to Kafka.
Proposed Approach:
1. For KIP-1270: When ProcessorNode detects DLQ records but the context
doesn't support RecordCollector (i.e., GlobalThread), it will log a warning
instead of sending:
log.warn("Dead letter queue records cannot be sent for GlobalKTable
processors " +
"(no producer available). DLQ support for GlobalKTable will be
addressed in a future KIP. " +
"Record details logged: topic={}, headers={}", ...);
2. Future KIP: Full DLQ support for GlobalKTable (requires adding
Producer infrastructure) will be proposed separately, as it's a larger
architectural change.
How This Avoids User Confusion
1. Single handler for all processors: Users configure ONE
ProcessingExceptionHandler that works for both regular and global processors
2. Consistent behavior: Result.RESUME continues, Result.FAIL stops - same
for both
3. Clear limitation: DLQ records are logged (not sent) for GlobalKTable,
with explicit warning message
4. Documentation: Config docs will clearly state DLQ sending limitation
for GlobalKTable
Thanks and Regards
Arpit Goyal
8861094754
On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]> wrote:
> Thanks for the KIP Arpit.
>
> Can you elaborate a little bit more on details? With the newly added DLQ
> support for regular `Processor`, the existing
> `ProcessingHandlerResponse` and corresponding method `handle()` are
> deprecated with upcoming 4.2 release.
>
> Thus, from AK 4.2+ going forward, users are expected to not implement
> the old `handle()` (even if it's still supported, as long as the new
> `handleError` is not overwritten).
>
> Are you proposing, for now, to only add support for the deprecated
> `handle()` method, ie, the new `handleError()` method would not be
> called by the global-thread code? If yes, this might be confusing for
> users?
>
> If you do not propose this, would it imply that the global-thread code
> would call the new `handlerError()` method? For this case, the question
> is what the runtime would do if users try to use the DLQ feature?
>
> Overall, it's unclear to me what you propose in detail and how we can
> avoid to confuse users, keep it backward compatible, and make it easy to
> understanding how the handler will work.
>
>
> -Matthias
>
> On 1/10/26 8:33 AM, Arpit Goyal wrote:
> > Hi Team
> > Just reaching out again.Need your inputs to move it forward
> >
> > Thanks and Regards
> > Arpit Goyal
> > 8861094754
> >
> > On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <[email protected]>
> wrote:
> >
> >> Hi All,
> >> I would like to start a discussion for KIP-1270. This KIP extends
> >> ProcessingExceptionHandler support to GlobalKTable processors, enabling
> >> consistent exception handling across all stream processing types.
> >>
> >> * Current Behavior*
> >>
> >> When a processing exception occurs in a GlobalKTable processor:
> >> - The exception propagates to GlobalStreamThread
> >> - The GlobalStreamThread terminates
> >> - The entire Kafka Streams application shuts down
> >> - No user-configurable exception handling is available
> >>
> >> * Proposed Behavior*
> >>
> >> After this KIP, when a processing exception occurs in a GlobalKTable
> >> processor:
> >> - The configured ProcessingExceptionHandler.handleError() will be
> invoked
> >> - If the handler returns Result.RESUME, processing continues with the
> >> next record
> >> - If the handler returns Result.FAIL, the exception propagates (same
> as
> >> current behavior)
> >> - If no handler is configured, behavior remains unchanged (backward
> >> compatible)
> >>
> >> KIP:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
> >>
> >> Thanks and Regards
> >> Arpit Goyal
> >> 8861094754
> >>
> >
>
>