Thanks for the response Matthias.
I have updated the KIP to include KIP-1034 handleError() automatic backward
compatibility. DLQ part I already mentioned under the Limitation section.
Let me know if it needs to be improved further.
Thanks and Regards
Arpit Goyal
8861094754


On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]> wrote:

> Thanks for the clarification. Make sense to me.
>
> Might be good to add some of these details (no code reference to
> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie, state
> explicitly that the new handleError() will be used and that it provides
> backward compatibility automatically based on it's current implementaion
> from KIP-1034.
>
> And that DLQ records, if returned, would be ignored and dropped and a
> warning is logged about it for this case.
>
>
>
> -Matthias
>
>
> On 1/12/26 2:29 AM, Arpit Goyal wrote:
> > Thank you for the detailed questions! Let me clarify the implementation
> > approach:
> >
> >    Which Method Will Be Called?
> >
> >    GlobalThread will call the NEW handleError() method (not the
> deprecated
> > handle()).
> >
> >    Key Point: The exception handler is not called directly by
> GlobalThread.
> > Instead, it's called by ProcessorNode.process(), which already invokes
> > handleError() for regular processors.
> >
> >    The implementation is straightforward:
> >
> >    Current code (GlobalStateUpdateTask.initTopology - line 161):
> >    node.init((InternalProcessorContext) this.processorContext);  // No
> > handler passed
> >
> >    Proposed change:
> >    node.init((InternalProcessorContext) this.processorContext,
> > processingExceptionHandler);  // Pass handler
> >
> >    Once the handler is passed to ProcessorNode, the same code path that
> > handles exceptions for regular KStream/KTable processors
> > (ProcessorNode.process() line 236) will automatically handle GlobalKTable
> > exceptions:
> >
> >    Response response =
> > processingExceptionHandler.handleError(errorHandlerContext, record,
> > exception);
> >
> >    There's no separate code path for GlobalThread - it reuses the
> existing
> > ProcessorNode exception handling mechanism.
> >
> >    Backward Compatibility
> >
> >    The handleError() method provides automatic backward compatibility via
> > its default implementation:
> >
> >    default Response handleError(...) {
> >        return new Response(Result.from(handle(...)),
> > Collections.emptyList());
> >    }
> >
> >    - If users implement the old handle() method: handleError() delegates
> to
> > it automatically
> >    - If users implement the new handleError() method: it's used directly
> >    - No code changes required for existing applications
> >
> >    Dead Letter Queue (DLQ) Support
> >
> >    This is where GlobalKTable differs from regular processors:
> >
> >    The Limitation: GlobalThread does not have a Producer, so it cannot
> send
> > DLQ records to Kafka.
> >
> >    Proposed Approach:
> >
> >    1. For KIP-1270: When ProcessorNode detects DLQ records but the
> context
> > doesn't support RecordCollector (i.e., GlobalThread), it will log a
> warning
> > instead of sending:
> >
> >    log.warn("Dead letter queue records cannot be sent for GlobalKTable
> > processors " +
> >             "(no producer available). DLQ support for GlobalKTable will
> be
> > addressed in a future KIP. " +
> >             "Record details logged: topic={}, headers={}", ...);
> >
> >    2. Future KIP: Full DLQ support for GlobalKTable (requires adding
> > Producer infrastructure) will be proposed separately, as it's a larger
> > architectural change.
> >
> >    How This Avoids User Confusion
> >
> >    1. Single handler for all processors: Users configure ONE
> > ProcessingExceptionHandler that works for both regular and global
> processors
> >    2. Consistent behavior: Result.RESUME continues, Result.FAIL stops -
> same
> > for both
> >    3. Clear limitation: DLQ records are logged (not sent) for
> GlobalKTable,
> > with explicit warning message
> >    4. Documentation: Config docs will clearly state DLQ sending
> limitation
> > for GlobalKTable
> > Thanks and Regards
> > Arpit Goyal
> > 8861094754
> >
> >
> > On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]>
> wrote:
> >
> >> Thanks for the KIP Arpit.
> >>
> >> Can you elaborate a little bit more on details? With the newly added DLQ
> >> support for regular `Processor`, the existing
> >> `ProcessingHandlerResponse` and corresponding method `handle()` are
> >> deprecated with upcoming 4.2 release.
> >>
> >> Thus, from AK 4.2+ going forward, users are expected to not implement
> >> the old `handle()` (even if it's still supported, as long as the new
> >> `handleError` is not overwritten).
> >>
> >> Are you proposing, for now, to only add support for the deprecated
> >> `handle()` method, ie, the new `handleError()` method would not be
> >> called by the global-thread code? If yes, this might be confusing for
> >> users?
> >>
> >> If you do not propose this, would it imply that the global-thread code
> >> would call the new `handlerError()` method? For this case, the question
> >> is what the runtime would do if users try to use the DLQ feature?
> >>
> >> Overall, it's unclear to me what you propose in detail and how we can
> >> avoid to confuse users, keep it backward compatible, and make it easy to
> >> understanding how the handler will work.
> >>
> >>
> >> -Matthias
> >>
> >> On 1/10/26 8:33 AM, Arpit Goyal wrote:
> >>> Hi Team
> >>> Just reaching out again.Need your inputs to move it forward
> >>>
> >>> Thanks and Regards
> >>> Arpit Goyal
> >>> 8861094754
> >>>
> >>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <[email protected]>
> >> wrote:
> >>>
> >>>> Hi All,
> >>>> I would like to start a discussion for KIP-1270.  This KIP extends
> >>>> ProcessingExceptionHandler support to GlobalKTable processors,
> enabling
> >>>> consistent exception handling across all stream processing types.
> >>>>
> >>>> * Current Behavior*
> >>>>
> >>>>     When a processing exception occurs in a GlobalKTable processor:
> >>>>     - The exception propagates to GlobalStreamThread
> >>>>     - The GlobalStreamThread terminates
> >>>>     - The entire Kafka Streams application shuts down
> >>>>     - No user-configurable exception handling is available
> >>>>
> >>>> *  Proposed Behavior*
> >>>>
> >>>>     After this KIP, when a processing exception occurs in a
> GlobalKTable
> >>>> processor:
> >>>>     - The configured ProcessingExceptionHandler.handleError() will be
> >> invoked
> >>>>     - If the handler returns Result.RESUME, processing continues with
> the
> >>>> next record
> >>>>     - If the handler returns Result.FAIL, the exception propagates
> (same
> >> as
> >>>> current behavior)
> >>>>     - If no handler is configured, behavior remains unchanged
> (backward
> >>>> compatible)
> >>>>
> >>>> KIP:
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
> >>>>
> >>>> Thanks and Regards
> >>>> Arpit Goyal
> >>>> 8861094754
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to