Thanks Bill and Lucas for the feedback.
LB1: I was wondering precisely what we are logging in the DLQ case. Do
you intend to log the full record content to make he record content
recoverable, or only some metadata. I suppose it's the latter.
> Since GlobalKTable lacks producer infrastructure, DLQ records will be
logged with full metadata but NOT sent to a Kafka topic
LB2: Maybe I missed it (also not super fluent in that part of the
code), but will the implementers of the
`ProcessExceptionalHandler` be
able to tell whether the error originated from a globalThread or a
StreamsThread? The implementer may want to specialize handling for
each case. This is not a must, but would be a nice to have for
sure.
>. Great question! We have two options here
Option 1: Single Handler Configuration
Users define one implementation of
ProcessingExceptionHandler that handles errors for all stream types
(KStream, KTable, and GlobalKTable). This maintains
consistency with the existing
DeserializationExceptionHandler pattern.
Limitation: This will enforce the same handling behavior for
global exception handling that we defined for KStream processing exception
handling. This keeps things
simple but is not flexible enough for users who
may want different behavior for GlobalKTable.
Option 2: Separate Optional Configuration
Introduce a new optional configuration:
global.processing.exception.handler. If configured, it applies specifically
to GlobalKTable processing errors; if not
configured, exceptions bubble up to the uncaught
exception handler (maintaining current behavior and backward
compatibility).
Limitation: Requires two configuration properties if users want
exception handling for both regular streams and GlobalKTable.
With Option 1 - ProcessExceptionalHandler does not have a way
to identify which thread is invoking it as of now. We may need to introduce
ThreadType(Stream or Global) in errorHandlerContext with ThreadType
information.
With Option 2 - Client would always be aware of the class it
has implemented for GlobalKTables.
Thanks and Regards
Arpit Goyal
8861094754
On Fri, Jan 23, 2026 at 9:43 PM Bill Bejeck <[email protected]> wrote:
> Hi All,
>
> Thanks for the KIP! Makes sense to me and helps make KS more robust.
> I don't have any additional comments beyond what's been said so far.
>
> -Bill
>
> On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev <
> [email protected]>
> wrote:
>
> > Hi,
> >
> > Overall, this makes sense to me. Thanks for the KIP!
> >
> > LB1: I was wondering precisely what we are logging in the DLQ case. Do
> > you intend to log the full record content to make he record content
> > recoverable, or only some metadata. I suppose it's the latter.
> >
> > LB2: Maybe I missed it (also not super fluent in that part of the
> > code), but will the implementers of the `ProcessExceptionalHandler` be
> > able to tell whether the error originated from a globalThread or a
> > StreamsThread? The implementer may want to specialize handling for
> > each case. This is not a must, but would be a nice to have for sure.
> >
> > Cheers,
> > Lucas
> >
> > On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <[email protected]>
> > wrote:
> > >
> > > Hi All,
> > > Looking for more inputs and feedback. This would help to move this KIP
> > > forward.
> > >
> > >
> > > Thanks and Regards
> > > Arpit Goyal
> > > 8861094754
> > >
> > > On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <[email protected]>
> > wrote:
> > >
> > > > Thanks for the response Matthias.
> > > > I have updated the KIP to include KIP-1034 handleError() automatic
> > > > backward compatibility. DLQ part I already mentioned under the
> > Limitation
> > > > section. Let me know if it needs to be improved further.
> > > > Thanks and Regards
> > > > Arpit Goyal
> > > > 8861094754
> > > >
> > > >
> > > > On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]>
> > wrote:
> > > >
> > > >> Thanks for the clarification. Make sense to me.
> > > >>
> > > >> Might be good to add some of these details (no code reference to
> > > >> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie,
> > state
> > > >> explicitly that the new handleError() will be used and that it
> > provides
> > > >> backward compatibility automatically based on it's current
> > implementaion
> > > >> from KIP-1034.
> > > >>
> > > >> And that DLQ records, if returned, would be ignored and dropped and
> a
> > > >> warning is logged about it for this case.
> > > >>
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >> On 1/12/26 2:29 AM, Arpit Goyal wrote:
> > > >> > Thank you for the detailed questions! Let me clarify the
> > implementation
> > > >> > approach:
> > > >> >
> > > >> > Which Method Will Be Called?
> > > >> >
> > > >> > GlobalThread will call the NEW handleError() method (not the
> > > >> deprecated
> > > >> > handle()).
> > > >> >
> > > >> > Key Point: The exception handler is not called directly by
> > > >> GlobalThread.
> > > >> > Instead, it's called by ProcessorNode.process(), which already
> > invokes
> > > >> > handleError() for regular processors.
> > > >> >
> > > >> > The implementation is straightforward:
> > > >> >
> > > >> > Current code (GlobalStateUpdateTask.initTopology - line 161):
> > > >> > node.init((InternalProcessorContext) this.processorContext);
> //
> > No
> > > >> > handler passed
> > > >> >
> > > >> > Proposed change:
> > > >> > node.init((InternalProcessorContext) this.processorContext,
> > > >> > processingExceptionHandler); // Pass handler
> > > >> >
> > > >> > Once the handler is passed to ProcessorNode, the same code path
> > that
> > > >> > handles exceptions for regular KStream/KTable processors
> > > >> > (ProcessorNode.process() line 236) will automatically handle
> > > >> GlobalKTable
> > > >> > exceptions:
> > > >> >
> > > >> > Response response =
> > > >> > processingExceptionHandler.handleError(errorHandlerContext,
> record,
> > > >> > exception);
> > > >> >
> > > >> > There's no separate code path for GlobalThread - it reuses the
> > > >> existing
> > > >> > ProcessorNode exception handling mechanism.
> > > >> >
> > > >> > Backward Compatibility
> > > >> >
> > > >> > The handleError() method provides automatic backward
> > compatibility
> > > >> via
> > > >> > its default implementation:
> > > >> >
> > > >> > default Response handleError(...) {
> > > >> > return new Response(Result.from(handle(...)),
> > > >> > Collections.emptyList());
> > > >> > }
> > > >> >
> > > >> > - If users implement the old handle() method: handleError()
> > > >> delegates to
> > > >> > it automatically
> > > >> > - If users implement the new handleError() method: it's used
> > directly
> > > >> > - No code changes required for existing applications
> > > >> >
> > > >> > Dead Letter Queue (DLQ) Support
> > > >> >
> > > >> > This is where GlobalKTable differs from regular processors:
> > > >> >
> > > >> > The Limitation: GlobalThread does not have a Producer, so it
> > cannot
> > > >> send
> > > >> > DLQ records to Kafka.
> > > >> >
> > > >> > Proposed Approach:
> > > >> >
> > > >> > 1. For KIP-1270: When ProcessorNode detects DLQ records but the
> > > >> context
> > > >> > doesn't support RecordCollector (i.e., GlobalThread), it will log
> a
> > > >> warning
> > > >> > instead of sending:
> > > >> >
> > > >> > log.warn("Dead letter queue records cannot be sent for
> > GlobalKTable
> > > >> > processors " +
> > > >> > "(no producer available). DLQ support for GlobalKTable
> > will
> > > >> be
> > > >> > addressed in a future KIP. " +
> > > >> > "Record details logged: topic={}, headers={}", ...);
> > > >> >
> > > >> > 2. Future KIP: Full DLQ support for GlobalKTable (requires
> adding
> > > >> > Producer infrastructure) will be proposed separately, as it's a
> > larger
> > > >> > architectural change.
> > > >> >
> > > >> > How This Avoids User Confusion
> > > >> >
> > > >> > 1. Single handler for all processors: Users configure ONE
> > > >> > ProcessingExceptionHandler that works for both regular and global
> > > >> processors
> > > >> > 2. Consistent behavior: Result.RESUME continues, Result.FAIL
> > stops -
> > > >> same
> > > >> > for both
> > > >> > 3. Clear limitation: DLQ records are logged (not sent) for
> > > >> GlobalKTable,
> > > >> > with explicit warning message
> > > >> > 4. Documentation: Config docs will clearly state DLQ sending
> > > >> limitation
> > > >> > for GlobalKTable
> > > >> > Thanks and Regards
> > > >> > Arpit Goyal
> > > >> > 8861094754
> > > >> >
> > > >> >
> > > >> > On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]
> >
> > > >> wrote:
> > > >> >
> > > >> >> Thanks for the KIP Arpit.
> > > >> >>
> > > >> >> Can you elaborate a little bit more on details? With the newly
> > added
> > > >> DLQ
> > > >> >> support for regular `Processor`, the existing
> > > >> >> `ProcessingHandlerResponse` and corresponding method `handle()`
> are
> > > >> >> deprecated with upcoming 4.2 release.
> > > >> >>
> > > >> >> Thus, from AK 4.2+ going forward, users are expected to not
> > implement
> > > >> >> the old `handle()` (even if it's still supported, as long as the
> > new
> > > >> >> `handleError` is not overwritten).
> > > >> >>
> > > >> >> Are you proposing, for now, to only add support for the
> deprecated
> > > >> >> `handle()` method, ie, the new `handleError()` method would not
> be
> > > >> >> called by the global-thread code? If yes, this might be confusing
> > for
> > > >> >> users?
> > > >> >>
> > > >> >> If you do not propose this, would it imply that the global-thread
> > code
> > > >> >> would call the new `handlerError()` method? For this case, the
> > question
> > > >> >> is what the runtime would do if users try to use the DLQ feature?
> > > >> >>
> > > >> >> Overall, it's unclear to me what you propose in detail and how we
> > can
> > > >> >> avoid to confuse users, keep it backward compatible, and make it
> > easy
> > > >> to
> > > >> >> understanding how the handler will work.
> > > >> >>
> > > >> >>
> > > >> >> -Matthias
> > > >> >>
> > > >> >> On 1/10/26 8:33 AM, Arpit Goyal wrote:
> > > >> >>> Hi Team
> > > >> >>> Just reaching out again.Need your inputs to move it forward
> > > >> >>>
> > > >> >>> Thanks and Regards
> > > >> >>> Arpit Goyal
> > > >> >>> 8861094754
> > > >> >>>
> > > >> >>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <
> > [email protected]>
> > > >> >> wrote:
> > > >> >>>
> > > >> >>>> Hi All,
> > > >> >>>> I would like to start a discussion for KIP-1270. This KIP
> > extends
> > > >> >>>> ProcessingExceptionHandler support to GlobalKTable processors,
> > > >> enabling
> > > >> >>>> consistent exception handling across all stream processing
> types.
> > > >> >>>>
> > > >> >>>> * Current Behavior*
> > > >> >>>>
> > > >> >>>> When a processing exception occurs in a GlobalKTable
> > processor:
> > > >> >>>> - The exception propagates to GlobalStreamThread
> > > >> >>>> - The GlobalStreamThread terminates
> > > >> >>>> - The entire Kafka Streams application shuts down
> > > >> >>>> - No user-configurable exception handling is available
> > > >> >>>>
> > > >> >>>> * Proposed Behavior*
> > > >> >>>>
> > > >> >>>> After this KIP, when a processing exception occurs in a
> > > >> GlobalKTable
> > > >> >>>> processor:
> > > >> >>>> - The configured ProcessingExceptionHandler.handleError()
> > will be
> > > >> >> invoked
> > > >> >>>> - If the handler returns Result.RESUME, processing
> continues
> > > >> with the
> > > >> >>>> next record
> > > >> >>>> - If the handler returns Result.FAIL, the exception
> > propagates
> > > >> (same
> > > >> >> as
> > > >> >>>> current behavior)
> > > >> >>>> - If no handler is configured, behavior remains unchanged
> > > >> (backward
> > > >> >>>> compatible)
> > > >> >>>>
> > > >> >>>> KIP:
> > > >> >>>>
> > > >> >>
> > > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
> > > >> >>>>
> > > >> >>>> Thanks and Regards
> > > >> >>>> Arpit Goyal
> > > >> >>>> 8861094754
> > > >> >>>>
> > > >> >>>
> > > >> >>
> > > >> >>
> > > >> >
> > > >>
> > > >>
> >
>