Hi All,

Thanks for the KIP! Makes sense to me and helps make KS more robust.
I don't have any additional comments beyond what's been said so far.

-Bill

On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev <[email protected]>
wrote:

> Hi,
>
> Overall, this makes sense to me. Thanks for the KIP!
>
> LB1: I was wondering precisely what we are logging in the DLQ case. Do
> you intend to log the full record content to make he record content
> recoverable, or only some metadata. I suppose it's the latter.
>
> LB2: Maybe I missed it (also not super fluent in that part of the
> code), but will the implementers of the `ProcessExceptionalHandler` be
> able to tell whether the error originated from a globalThread or a
> StreamsThread? The implementer may want to specialize handling for
> each case. This is not a must, but would be a nice to have for sure.
>
> Cheers,
> Lucas
>
> On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <[email protected]>
> wrote:
> >
> > Hi All,
> > Looking for more inputs and feedback. This would help to move this KIP
> > forward.
> >
> >
> > Thanks and Regards
> > Arpit Goyal
> > 8861094754
> >
> > On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <[email protected]>
> wrote:
> >
> > > Thanks for the response Matthias.
> > > I have updated the KIP to include KIP-1034 handleError() automatic
> > > backward compatibility. DLQ part I already mentioned under the
> Limitation
> > > section. Let me know if it needs to be improved further.
> > > Thanks and Regards
> > > Arpit Goyal
> > > 8861094754
> > >
> > >
> > > On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]>
> wrote:
> > >
> > >> Thanks for the clarification. Make sense to me.
> > >>
> > >> Might be good to add some of these details (no code reference to
> > >> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie,
> state
> > >> explicitly that the new handleError() will be used and that it
> provides
> > >> backward compatibility automatically based on it's current
> implementaion
> > >> from KIP-1034.
> > >>
> > >> And that DLQ records, if returned, would be ignored and dropped and a
> > >> warning is logged about it for this case.
> > >>
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 1/12/26 2:29 AM, Arpit Goyal wrote:
> > >> > Thank you for the detailed questions! Let me clarify the
> implementation
> > >> > approach:
> > >> >
> > >> >    Which Method Will Be Called?
> > >> >
> > >> >    GlobalThread will call the NEW handleError() method (not the
> > >> deprecated
> > >> > handle()).
> > >> >
> > >> >    Key Point: The exception handler is not called directly by
> > >> GlobalThread.
> > >> > Instead, it's called by ProcessorNode.process(), which already
> invokes
> > >> > handleError() for regular processors.
> > >> >
> > >> >    The implementation is straightforward:
> > >> >
> > >> >    Current code (GlobalStateUpdateTask.initTopology - line 161):
> > >> >    node.init((InternalProcessorContext) this.processorContext);  //
> No
> > >> > handler passed
> > >> >
> > >> >    Proposed change:
> > >> >    node.init((InternalProcessorContext) this.processorContext,
> > >> > processingExceptionHandler);  // Pass handler
> > >> >
> > >> >    Once the handler is passed to ProcessorNode, the same code path
> that
> > >> > handles exceptions for regular KStream/KTable processors
> > >> > (ProcessorNode.process() line 236) will automatically handle
> > >> GlobalKTable
> > >> > exceptions:
> > >> >
> > >> >    Response response =
> > >> > processingExceptionHandler.handleError(errorHandlerContext, record,
> > >> > exception);
> > >> >
> > >> >    There's no separate code path for GlobalThread - it reuses the
> > >> existing
> > >> > ProcessorNode exception handling mechanism.
> > >> >
> > >> >    Backward Compatibility
> > >> >
> > >> >    The handleError() method provides automatic backward
> compatibility
> > >> via
> > >> > its default implementation:
> > >> >
> > >> >    default Response handleError(...) {
> > >> >        return new Response(Result.from(handle(...)),
> > >> > Collections.emptyList());
> > >> >    }
> > >> >
> > >> >    - If users implement the old handle() method: handleError()
> > >> delegates to
> > >> > it automatically
> > >> >    - If users implement the new handleError() method: it's used
> directly
> > >> >    - No code changes required for existing applications
> > >> >
> > >> >    Dead Letter Queue (DLQ) Support
> > >> >
> > >> >    This is where GlobalKTable differs from regular processors:
> > >> >
> > >> >    The Limitation: GlobalThread does not have a Producer, so it
> cannot
> > >> send
> > >> > DLQ records to Kafka.
> > >> >
> > >> >    Proposed Approach:
> > >> >
> > >> >    1. For KIP-1270: When ProcessorNode detects DLQ records but the
> > >> context
> > >> > doesn't support RecordCollector (i.e., GlobalThread), it will log a
> > >> warning
> > >> > instead of sending:
> > >> >
> > >> >    log.warn("Dead letter queue records cannot be sent for
> GlobalKTable
> > >> > processors " +
> > >> >             "(no producer available). DLQ support for GlobalKTable
> will
> > >> be
> > >> > addressed in a future KIP. " +
> > >> >             "Record details logged: topic={}, headers={}", ...);
> > >> >
> > >> >    2. Future KIP: Full DLQ support for GlobalKTable (requires adding
> > >> > Producer infrastructure) will be proposed separately, as it's a
> larger
> > >> > architectural change.
> > >> >
> > >> >    How This Avoids User Confusion
> > >> >
> > >> >    1. Single handler for all processors: Users configure ONE
> > >> > ProcessingExceptionHandler that works for both regular and global
> > >> processors
> > >> >    2. Consistent behavior: Result.RESUME continues, Result.FAIL
> stops -
> > >> same
> > >> > for both
> > >> >    3. Clear limitation: DLQ records are logged (not sent) for
> > >> GlobalKTable,
> > >> > with explicit warning message
> > >> >    4. Documentation: Config docs will clearly state DLQ sending
> > >> limitation
> > >> > for GlobalKTable
> > >> > Thanks and Regards
> > >> > Arpit Goyal
> > >> > 8861094754
> > >> >
> > >> >
> > >> > On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]>
> > >> wrote:
> > >> >
> > >> >> Thanks for the KIP Arpit.
> > >> >>
> > >> >> Can you elaborate a little bit more on details? With the newly
> added
> > >> DLQ
> > >> >> support for regular `Processor`, the existing
> > >> >> `ProcessingHandlerResponse` and corresponding method `handle()` are
> > >> >> deprecated with upcoming 4.2 release.
> > >> >>
> > >> >> Thus, from AK 4.2+ going forward, users are expected to not
> implement
> > >> >> the old `handle()` (even if it's still supported, as long as the
> new
> > >> >> `handleError` is not overwritten).
> > >> >>
> > >> >> Are you proposing, for now, to only add support for the deprecated
> > >> >> `handle()` method, ie, the new `handleError()` method would not be
> > >> >> called by the global-thread code? If yes, this might be confusing
> for
> > >> >> users?
> > >> >>
> > >> >> If you do not propose this, would it imply that the global-thread
> code
> > >> >> would call the new `handlerError()` method? For this case, the
> question
> > >> >> is what the runtime would do if users try to use the DLQ feature?
> > >> >>
> > >> >> Overall, it's unclear to me what you propose in detail and how we
> can
> > >> >> avoid to confuse users, keep it backward compatible, and make it
> easy
> > >> to
> > >> >> understanding how the handler will work.
> > >> >>
> > >> >>
> > >> >> -Matthias
> > >> >>
> > >> >> On 1/10/26 8:33 AM, Arpit Goyal wrote:
> > >> >>> Hi Team
> > >> >>> Just reaching out again.Need your inputs to move it forward
> > >> >>>
> > >> >>> Thanks and Regards
> > >> >>> Arpit Goyal
> > >> >>> 8861094754
> > >> >>>
> > >> >>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <
> [email protected]>
> > >> >> wrote:
> > >> >>>
> > >> >>>> Hi All,
> > >> >>>> I would like to start a discussion for KIP-1270.  This KIP
> extends
> > >> >>>> ProcessingExceptionHandler support to GlobalKTable processors,
> > >> enabling
> > >> >>>> consistent exception handling across all stream processing types.
> > >> >>>>
> > >> >>>> * Current Behavior*
> > >> >>>>
> > >> >>>>     When a processing exception occurs in a GlobalKTable
> processor:
> > >> >>>>     - The exception propagates to GlobalStreamThread
> > >> >>>>     - The GlobalStreamThread terminates
> > >> >>>>     - The entire Kafka Streams application shuts down
> > >> >>>>     - No user-configurable exception handling is available
> > >> >>>>
> > >> >>>> *  Proposed Behavior*
> > >> >>>>
> > >> >>>>     After this KIP, when a processing exception occurs in a
> > >> GlobalKTable
> > >> >>>> processor:
> > >> >>>>     - The configured ProcessingExceptionHandler.handleError()
> will be
> > >> >> invoked
> > >> >>>>     - If the handler returns Result.RESUME, processing continues
> > >> with the
> > >> >>>> next record
> > >> >>>>     - If the handler returns Result.FAIL, the exception
> propagates
> > >> (same
> > >> >> as
> > >> >>>> current behavior)
> > >> >>>>     - If no handler is configured, behavior remains unchanged
> > >> (backward
> > >> >>>> compatible)
> > >> >>>>
> > >> >>>> KIP:
> > >> >>>>
> > >> >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
> > >> >>>>
> > >> >>>> Thanks and Regards
> > >> >>>> Arpit Goyal
> > >> >>>> 8861094754
> > >> >>>>
> > >> >>>
> > >> >>
> > >> >>
> > >> >
> > >>
> > >>
>

Reply via email to