Hi Everyone
I initiated the voting thread for this KIP.

Thanks and Regards
Arpit Goyal
8861094754

On Tue, 27 Jan, 2026, 10:41 pm Arpit Goyal, <[email protected]>
wrote:

> Thanks everyone for the input. Should I start voting on it ?
>
> Thanks and Regards
> Arpit Goyal
> 8861094754
>
> On Tue, 27 Jan, 2026, 2:10 pm Arpit Goyal, <[email protected]>
> wrote:
>
>> Thanks Matthias.
>> That makes sense. Client can use  the single handler implementation to
>> support error handling for both Stream Thread and Global thread. There is
>> no need to introduce ThreadType parameter or another configuration for the
>> same.
>> @Lucas Brutschy <[email protected]>  It must answered your query ?
>>
>> Thanks and Regards
>> Arpit Goyal
>> 8861094754
>>
>>
>> On Tue, Jan 27, 2026 at 11:43 AM Matthias J. Sax <[email protected]>
>> wrote:
>>
>>> I don't think we would need multiple handlers. The handler is invoked
>>> passing in `ErrorHandlerContext` parameter, which provides enough
>>> information to distinguish the case (eg, topic(), processorNodeId(), and
>>> taskId()), so users can implement different logic inside the same
>>> handler for the different cases if necessary.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 1/23/26 10:05 AM, Arpit Goyal wrote:
>>> > Thanks Bill and Lucas for the feedback.
>>> >
>>> > LB1:  I was wondering precisely what we are logging in the DLQ case. Do
>>> >           you intend to log the full record content to make he record
>>> content
>>> >           recoverable, or only some metadata. I suppose it's the
>>> latter.
>>> >
>>> >
>>> >    >   Since GlobalKTable lacks producer infrastructure, DLQ records
>>> will be
>>> > logged with full metadata but NOT sent to a Kafka topic
>>> > LB2:   Maybe I missed it (also not super fluent in that part of the
>>> >            code), but will the implementers of the
>>> > `ProcessExceptionalHandler` be
>>> >            able to tell whether the error originated from a
>>> globalThread or a
>>> >           StreamsThread? The implementer may want to specialize
>>> handling for
>>> >           each case. This is not a must, but would be a nice to have
>>> for
>>> > sure.
>>> >
>>> >   >.   Great question! We have two options here
>>> >            Option 1: Single Handler Configuration
>>> >
>>> >
>>> >                   Users define one implementation of
>>> > ProcessingExceptionHandler that handles errors for all stream types
>>> > (KStream, KTable, and GlobalKTable). This maintains
>>> >                    consistency with the existing
>>> > DeserializationExceptionHandler pattern.
>>> >           Limitation: This will enforce the same handling behavior for
>>> > global exception handling that we defined for KStream processing
>>> exception
>>> > handling. This keeps things
>>> >                             simple but is not flexible enough for
>>> users who
>>> > may want different behavior for GlobalKTable.
>>> >             Option 2: Separate Optional Configuration
>>> >
>>> >
>>> >                             Introduce a new optional configuration:
>>> > global.processing.exception.handler. If configured, it applies
>>> specifically
>>> > to GlobalKTable processing errors; if not
>>> >                             configured, exceptions bubble up to the
>>> uncaught
>>> > exception handler (maintaining current behavior and backward
>>> > compatibility).
>>> >              Limitation: Requires two configuration properties if
>>> users want
>>> > exception handling for both regular streams and GlobalKTable.
>>> >
>>> >             With Option 1 -  ProcessExceptionalHandler does not have a
>>> way
>>> > to identify which thread is invoking it as of now. We may need to
>>> introduce
>>> > ThreadType(Stream or Global)  in errorHandlerContext with ThreadType
>>> > information.
>>> >             With Option 2 - Client would always be aware of the class
>>> it
>>> > has  implemented for GlobalKTables.
>>> >
>>> > Thanks and Regards
>>> > Arpit Goyal
>>> > 8861094754
>>> >
>>> >
>>> > On Fri, Jan 23, 2026 at 9:43 PM Bill Bejeck <[email protected]>
>>> wrote:
>>> >
>>> >> Hi All,
>>> >>
>>> >> Thanks for the KIP! Makes sense to me and helps make KS more robust.
>>> >> I don't have any additional comments beyond what's been said so far.
>>> >>
>>> >> -Bill
>>> >>
>>> >> On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev <
>>> >> [email protected]>
>>> >> wrote:
>>> >>
>>> >>> Hi,
>>> >>>
>>> >>> Overall, this makes sense to me. Thanks for the KIP!
>>> >>>
>>> >>> LB1: I was wondering precisely what we are logging in the DLQ case.
>>> Do
>>> >>> you intend to log the full record content to make he record content
>>> >>> recoverable, or only some metadata. I suppose it's the latter.
>>> >>>
>>> >>> LB2: Maybe I missed it (also not super fluent in that part of the
>>> >>> code), but will the implementers of the `ProcessExceptionalHandler`
>>> be
>>> >>> able to tell whether the error originated from a globalThread or a
>>> >>> StreamsThread? The implementer may want to specialize handling for
>>> >>> each case. This is not a must, but would be a nice to have for sure.
>>> >>>
>>> >>> Cheers,
>>> >>> Lucas
>>> >>>
>>> >>> On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <
>>> [email protected]>
>>> >>> wrote:
>>> >>>>
>>> >>>> Hi All,
>>> >>>> Looking for more inputs and feedback. This would help to move this
>>> KIP
>>> >>>> forward.
>>> >>>>
>>> >>>>
>>> >>>> Thanks and Regards
>>> >>>> Arpit Goyal
>>> >>>> 8861094754
>>> >>>>
>>> >>>> On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <
>>> [email protected]>
>>> >>> wrote:
>>> >>>>
>>> >>>>> Thanks for the response Matthias.
>>> >>>>> I have updated the KIP to include KIP-1034 handleError() automatic
>>> >>>>> backward compatibility. DLQ part I already mentioned under the
>>> >>> Limitation
>>> >>>>> section. Let me know if it needs to be improved further.
>>> >>>>> Thanks and Regards
>>> >>>>> Arpit Goyal
>>> >>>>> 8861094754
>>> >>>>>
>>> >>>>>
>>> >>>>> On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]>
>>> >>> wrote:
>>> >>>>>
>>> >>>>>> Thanks for the clarification. Make sense to me.
>>> >>>>>>
>>> >>>>>> Might be good to add some of these details (no code reference to
>>> >>>>>> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie,
>>> >>> state
>>> >>>>>> explicitly that the new handleError() will be used and that it
>>> >>> provides
>>> >>>>>> backward compatibility automatically based on it's current
>>> >>> implementaion
>>> >>>>>> from KIP-1034.
>>> >>>>>>
>>> >>>>>> And that DLQ records, if returned, would be ignored and dropped
>>> and
>>> >> a
>>> >>>>>> warning is logged about it for this case.
>>> >>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> -Matthias
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> On 1/12/26 2:29 AM, Arpit Goyal wrote:
>>> >>>>>>> Thank you for the detailed questions! Let me clarify the
>>> >>> implementation
>>> >>>>>>> approach:
>>> >>>>>>>
>>> >>>>>>>     Which Method Will Be Called?
>>> >>>>>>>
>>> >>>>>>>     GlobalThread will call the NEW handleError() method (not the
>>> >>>>>> deprecated
>>> >>>>>>> handle()).
>>> >>>>>>>
>>> >>>>>>>     Key Point: The exception handler is not called directly by
>>> >>>>>> GlobalThread.
>>> >>>>>>> Instead, it's called by ProcessorNode.process(), which already
>>> >>> invokes
>>> >>>>>>> handleError() for regular processors.
>>> >>>>>>>
>>> >>>>>>>     The implementation is straightforward:
>>> >>>>>>>
>>> >>>>>>>     Current code (GlobalStateUpdateTask.initTopology - line 161):
>>> >>>>>>>     node.init((InternalProcessorContext) this.processorContext);
>>> >> //
>>> >>> No
>>> >>>>>>> handler passed
>>> >>>>>>>
>>> >>>>>>>     Proposed change:
>>> >>>>>>>     node.init((InternalProcessorContext) this.processorContext,
>>> >>>>>>> processingExceptionHandler);  // Pass handler
>>> >>>>>>>
>>> >>>>>>>     Once the handler is passed to ProcessorNode, the same code
>>> path
>>> >>> that
>>> >>>>>>> handles exceptions for regular KStream/KTable processors
>>> >>>>>>> (ProcessorNode.process() line 236) will automatically handle
>>> >>>>>> GlobalKTable
>>> >>>>>>> exceptions:
>>> >>>>>>>
>>> >>>>>>>     Response response =
>>> >>>>>>> processingExceptionHandler.handleError(errorHandlerContext,
>>> >> record,
>>> >>>>>>> exception);
>>> >>>>>>>
>>> >>>>>>>     There's no separate code path for GlobalThread - it reuses
>>> the
>>> >>>>>> existing
>>> >>>>>>> ProcessorNode exception handling mechanism.
>>> >>>>>>>
>>> >>>>>>>     Backward Compatibility
>>> >>>>>>>
>>> >>>>>>>     The handleError() method provides automatic backward
>>> >>> compatibility
>>> >>>>>> via
>>> >>>>>>> its default implementation:
>>> >>>>>>>
>>> >>>>>>>     default Response handleError(...) {
>>> >>>>>>>         return new Response(Result.from(handle(...)),
>>> >>>>>>> Collections.emptyList());
>>> >>>>>>>     }
>>> >>>>>>>
>>> >>>>>>>     - If users implement the old handle() method: handleError()
>>> >>>>>> delegates to
>>> >>>>>>> it automatically
>>> >>>>>>>     - If users implement the new handleError() method: it's used
>>> >>> directly
>>> >>>>>>>     - No code changes required for existing applications
>>> >>>>>>>
>>> >>>>>>>     Dead Letter Queue (DLQ) Support
>>> >>>>>>>
>>> >>>>>>>     This is where GlobalKTable differs from regular processors:
>>> >>>>>>>
>>> >>>>>>>     The Limitation: GlobalThread does not have a Producer, so it
>>> >>> cannot
>>> >>>>>> send
>>> >>>>>>> DLQ records to Kafka.
>>> >>>>>>>
>>> >>>>>>>     Proposed Approach:
>>> >>>>>>>
>>> >>>>>>>     1. For KIP-1270: When ProcessorNode detects DLQ records but
>>> the
>>> >>>>>> context
>>> >>>>>>> doesn't support RecordCollector (i.e., GlobalThread), it will log
>>> >> a
>>> >>>>>> warning
>>> >>>>>>> instead of sending:
>>> >>>>>>>
>>> >>>>>>>     log.warn("Dead letter queue records cannot be sent for
>>> >>> GlobalKTable
>>> >>>>>>> processors " +
>>> >>>>>>>              "(no producer available). DLQ support for
>>> GlobalKTable
>>> >>> will
>>> >>>>>> be
>>> >>>>>>> addressed in a future KIP. " +
>>> >>>>>>>              "Record details logged: topic={}, headers={}", ...);
>>> >>>>>>>
>>> >>>>>>>     2. Future KIP: Full DLQ support for GlobalKTable (requires
>>> >> adding
>>> >>>>>>> Producer infrastructure) will be proposed separately, as it's a
>>> >>> larger
>>> >>>>>>> architectural change.
>>> >>>>>>>
>>> >>>>>>>     How This Avoids User Confusion
>>> >>>>>>>
>>> >>>>>>>     1. Single handler for all processors: Users configure ONE
>>> >>>>>>> ProcessingExceptionHandler that works for both regular and global
>>> >>>>>> processors
>>> >>>>>>>     2. Consistent behavior: Result.RESUME continues, Result.FAIL
>>> >>> stops -
>>> >>>>>> same
>>> >>>>>>> for both
>>> >>>>>>>     3. Clear limitation: DLQ records are logged (not sent) for
>>> >>>>>> GlobalKTable,
>>> >>>>>>> with explicit warning message
>>> >>>>>>>     4. Documentation: Config docs will clearly state DLQ sending
>>> >>>>>> limitation
>>> >>>>>>> for GlobalKTable
>>> >>>>>>> Thanks and Regards
>>> >>>>>>> Arpit Goyal
>>> >>>>>>> 8861094754
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <
>>> [email protected]
>>> >>>
>>> >>>>>> wrote:
>>> >>>>>>>
>>> >>>>>>>> Thanks for the KIP Arpit.
>>> >>>>>>>>
>>> >>>>>>>> Can you elaborate a little bit more on details? With the newly
>>> >>> added
>>> >>>>>> DLQ
>>> >>>>>>>> support for regular `Processor`, the existing
>>> >>>>>>>> `ProcessingHandlerResponse` and corresponding method `handle()`
>>> >> are
>>> >>>>>>>> deprecated with upcoming 4.2 release.
>>> >>>>>>>>
>>> >>>>>>>> Thus, from AK 4.2+ going forward, users are expected to not
>>> >>> implement
>>> >>>>>>>> the old `handle()` (even if it's still supported, as long as the
>>> >>> new
>>> >>>>>>>> `handleError` is not overwritten).
>>> >>>>>>>>
>>> >>>>>>>> Are you proposing, for now, to only add support for the
>>> >> deprecated
>>> >>>>>>>> `handle()` method, ie, the new `handleError()` method would not
>>> >> be
>>> >>>>>>>> called by the global-thread code? If yes, this might be
>>> confusing
>>> >>> for
>>> >>>>>>>> users?
>>> >>>>>>>>
>>> >>>>>>>> If you do not propose this, would it imply that the
>>> global-thread
>>> >>> code
>>> >>>>>>>> would call the new `handlerError()` method? For this case, the
>>> >>> question
>>> >>>>>>>> is what the runtime would do if users try to use the DLQ
>>> feature?
>>> >>>>>>>>
>>> >>>>>>>> Overall, it's unclear to me what you propose in detail and how
>>> we
>>> >>> can
>>> >>>>>>>> avoid to confuse users, keep it backward compatible, and make it
>>> >>> easy
>>> >>>>>> to
>>> >>>>>>>> understanding how the handler will work.
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> -Matthias
>>> >>>>>>>>
>>> >>>>>>>> On 1/10/26 8:33 AM, Arpit Goyal wrote:
>>> >>>>>>>>> Hi Team
>>> >>>>>>>>> Just reaching out again.Need your inputs to move it forward
>>> >>>>>>>>>
>>> >>>>>>>>> Thanks and Regards
>>> >>>>>>>>> Arpit Goyal
>>> >>>>>>>>> 8861094754
>>> >>>>>>>>>
>>> >>>>>>>>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <
>>> >>> [email protected]>
>>> >>>>>>>> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>>> Hi All,
>>> >>>>>>>>>> I would like to start a discussion for KIP-1270.  This KIP
>>> >>> extends
>>> >>>>>>>>>> ProcessingExceptionHandler support to GlobalKTable processors,
>>> >>>>>> enabling
>>> >>>>>>>>>> consistent exception handling across all stream processing
>>> >> types.
>>> >>>>>>>>>>
>>> >>>>>>>>>> * Current Behavior*
>>> >>>>>>>>>>
>>> >>>>>>>>>>      When a processing exception occurs in a GlobalKTable
>>> >>> processor:
>>> >>>>>>>>>>      - The exception propagates to GlobalStreamThread
>>> >>>>>>>>>>      - The GlobalStreamThread terminates
>>> >>>>>>>>>>      - The entire Kafka Streams application shuts down
>>> >>>>>>>>>>      - No user-configurable exception handling is available
>>> >>>>>>>>>>
>>> >>>>>>>>>> *  Proposed Behavior*
>>> >>>>>>>>>>
>>> >>>>>>>>>>      After this KIP, when a processing exception occurs in a
>>> >>>>>> GlobalKTable
>>> >>>>>>>>>> processor:
>>> >>>>>>>>>>      - The configured ProcessingExceptionHandler.handleError()
>>> >>> will be
>>> >>>>>>>> invoked
>>> >>>>>>>>>>      - If the handler returns Result.RESUME, processing
>>> >> continues
>>> >>>>>> with the
>>> >>>>>>>>>> next record
>>> >>>>>>>>>>      - If the handler returns Result.FAIL, the exception
>>> >>> propagates
>>> >>>>>> (same
>>> >>>>>>>> as
>>> >>>>>>>>>> current behavior)
>>> >>>>>>>>>>      - If no handler is configured, behavior remains unchanged
>>> >>>>>> (backward
>>> >>>>>>>>>> compatible)
>>> >>>>>>>>>>
>>> >>>>>>>>>> KIP:
>>> >>>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>
>>> >>>
>>> >>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
>>> >>>>>>>>>>
>>> >>>>>>>>>> Thanks and Regards
>>> >>>>>>>>>> Arpit Goyal
>>> >>>>>>>>>> 8861094754
>>> >>>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>
>>> >>
>>> >
>>>
>>>

Reply via email to