Hi,

Overall, this makes sense to me. Thanks for the KIP!

LB1: I was wondering precisely what we are logging in the DLQ case. Do
you intend to log the full record content to make he record content
recoverable, or only some metadata. I suppose it's the latter.

LB2: Maybe I missed it (also not super fluent in that part of the
code), but will the implementers of the `ProcessExceptionalHandler` be
able to tell whether the error originated from a globalThread or a
StreamsThread? The implementer may want to specialize handling for
each case. This is not a must, but would be a nice to have for sure.

Cheers,
Lucas

On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <[email protected]> wrote:
>
> Hi All,
> Looking for more inputs and feedback. This would help to move this KIP
> forward.
>
>
> Thanks and Regards
> Arpit Goyal
> 8861094754
>
> On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <[email protected]> wrote:
>
> > Thanks for the response Matthias.
> > I have updated the KIP to include KIP-1034 handleError() automatic
> > backward compatibility. DLQ part I already mentioned under the Limitation
> > section. Let me know if it needs to be improved further.
> > Thanks and Regards
> > Arpit Goyal
> > 8861094754
> >
> >
> > On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]> wrote:
> >
> >> Thanks for the clarification. Make sense to me.
> >>
> >> Might be good to add some of these details (no code reference to
> >> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie, state
> >> explicitly that the new handleError() will be used and that it provides
> >> backward compatibility automatically based on it's current implementaion
> >> from KIP-1034.
> >>
> >> And that DLQ records, if returned, would be ignored and dropped and a
> >> warning is logged about it for this case.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 1/12/26 2:29 AM, Arpit Goyal wrote:
> >> > Thank you for the detailed questions! Let me clarify the implementation
> >> > approach:
> >> >
> >> >    Which Method Will Be Called?
> >> >
> >> >    GlobalThread will call the NEW handleError() method (not the
> >> deprecated
> >> > handle()).
> >> >
> >> >    Key Point: The exception handler is not called directly by
> >> GlobalThread.
> >> > Instead, it's called by ProcessorNode.process(), which already invokes
> >> > handleError() for regular processors.
> >> >
> >> >    The implementation is straightforward:
> >> >
> >> >    Current code (GlobalStateUpdateTask.initTopology - line 161):
> >> >    node.init((InternalProcessorContext) this.processorContext);  // No
> >> > handler passed
> >> >
> >> >    Proposed change:
> >> >    node.init((InternalProcessorContext) this.processorContext,
> >> > processingExceptionHandler);  // Pass handler
> >> >
> >> >    Once the handler is passed to ProcessorNode, the same code path that
> >> > handles exceptions for regular KStream/KTable processors
> >> > (ProcessorNode.process() line 236) will automatically handle
> >> GlobalKTable
> >> > exceptions:
> >> >
> >> >    Response response =
> >> > processingExceptionHandler.handleError(errorHandlerContext, record,
> >> > exception);
> >> >
> >> >    There's no separate code path for GlobalThread - it reuses the
> >> existing
> >> > ProcessorNode exception handling mechanism.
> >> >
> >> >    Backward Compatibility
> >> >
> >> >    The handleError() method provides automatic backward compatibility
> >> via
> >> > its default implementation:
> >> >
> >> >    default Response handleError(...) {
> >> >        return new Response(Result.from(handle(...)),
> >> > Collections.emptyList());
> >> >    }
> >> >
> >> >    - If users implement the old handle() method: handleError()
> >> delegates to
> >> > it automatically
> >> >    - If users implement the new handleError() method: it's used directly
> >> >    - No code changes required for existing applications
> >> >
> >> >    Dead Letter Queue (DLQ) Support
> >> >
> >> >    This is where GlobalKTable differs from regular processors:
> >> >
> >> >    The Limitation: GlobalThread does not have a Producer, so it cannot
> >> send
> >> > DLQ records to Kafka.
> >> >
> >> >    Proposed Approach:
> >> >
> >> >    1. For KIP-1270: When ProcessorNode detects DLQ records but the
> >> context
> >> > doesn't support RecordCollector (i.e., GlobalThread), it will log a
> >> warning
> >> > instead of sending:
> >> >
> >> >    log.warn("Dead letter queue records cannot be sent for GlobalKTable
> >> > processors " +
> >> >             "(no producer available). DLQ support for GlobalKTable will
> >> be
> >> > addressed in a future KIP. " +
> >> >             "Record details logged: topic={}, headers={}", ...);
> >> >
> >> >    2. Future KIP: Full DLQ support for GlobalKTable (requires adding
> >> > Producer infrastructure) will be proposed separately, as it's a larger
> >> > architectural change.
> >> >
> >> >    How This Avoids User Confusion
> >> >
> >> >    1. Single handler for all processors: Users configure ONE
> >> > ProcessingExceptionHandler that works for both regular and global
> >> processors
> >> >    2. Consistent behavior: Result.RESUME continues, Result.FAIL stops -
> >> same
> >> > for both
> >> >    3. Clear limitation: DLQ records are logged (not sent) for
> >> GlobalKTable,
> >> > with explicit warning message
> >> >    4. Documentation: Config docs will clearly state DLQ sending
> >> limitation
> >> > for GlobalKTable
> >> > Thanks and Regards
> >> > Arpit Goyal
> >> > 8861094754
> >> >
> >> >
> >> > On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]>
> >> wrote:
> >> >
> >> >> Thanks for the KIP Arpit.
> >> >>
> >> >> Can you elaborate a little bit more on details? With the newly added
> >> DLQ
> >> >> support for regular `Processor`, the existing
> >> >> `ProcessingHandlerResponse` and corresponding method `handle()` are
> >> >> deprecated with upcoming 4.2 release.
> >> >>
> >> >> Thus, from AK 4.2+ going forward, users are expected to not implement
> >> >> the old `handle()` (even if it's still supported, as long as the new
> >> >> `handleError` is not overwritten).
> >> >>
> >> >> Are you proposing, for now, to only add support for the deprecated
> >> >> `handle()` method, ie, the new `handleError()` method would not be
> >> >> called by the global-thread code? If yes, this might be confusing for
> >> >> users?
> >> >>
> >> >> If you do not propose this, would it imply that the global-thread code
> >> >> would call the new `handlerError()` method? For this case, the question
> >> >> is what the runtime would do if users try to use the DLQ feature?
> >> >>
> >> >> Overall, it's unclear to me what you propose in detail and how we can
> >> >> avoid to confuse users, keep it backward compatible, and make it easy
> >> to
> >> >> understanding how the handler will work.
> >> >>
> >> >>
> >> >> -Matthias
> >> >>
> >> >> On 1/10/26 8:33 AM, Arpit Goyal wrote:
> >> >>> Hi Team
> >> >>> Just reaching out again.Need your inputs to move it forward
> >> >>>
> >> >>> Thanks and Regards
> >> >>> Arpit Goyal
> >> >>> 8861094754
> >> >>>
> >> >>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <[email protected]>
> >> >> wrote:
> >> >>>
> >> >>>> Hi All,
> >> >>>> I would like to start a discussion for KIP-1270.  This KIP extends
> >> >>>> ProcessingExceptionHandler support to GlobalKTable processors,
> >> enabling
> >> >>>> consistent exception handling across all stream processing types.
> >> >>>>
> >> >>>> * Current Behavior*
> >> >>>>
> >> >>>>     When a processing exception occurs in a GlobalKTable processor:
> >> >>>>     - The exception propagates to GlobalStreamThread
> >> >>>>     - The GlobalStreamThread terminates
> >> >>>>     - The entire Kafka Streams application shuts down
> >> >>>>     - No user-configurable exception handling is available
> >> >>>>
> >> >>>> *  Proposed Behavior*
> >> >>>>
> >> >>>>     After this KIP, when a processing exception occurs in a
> >> GlobalKTable
> >> >>>> processor:
> >> >>>>     - The configured ProcessingExceptionHandler.handleError() will be
> >> >> invoked
> >> >>>>     - If the handler returns Result.RESUME, processing continues
> >> with the
> >> >>>> next record
> >> >>>>     - If the handler returns Result.FAIL, the exception propagates
> >> (same
> >> >> as
> >> >>>> current behavior)
> >> >>>>     - If no handler is configured, behavior remains unchanged
> >> (backward
> >> >>>> compatible)
> >> >>>>
> >> >>>> KIP:
> >> >>>>
> >> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
> >> >>>>
> >> >>>> Thanks and Regards
> >> >>>> Arpit Goyal
> >> >>>> 8861094754
> >> >>>>
> >> >>>
> >> >>
> >> >>
> >> >
> >>
> >>

Reply via email to