Hi All,
Looking for more inputs and feedback. This would help to move this KIP
forward.


Thanks and Regards
Arpit Goyal
8861094754

On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <[email protected]> wrote:

> Thanks for the response Matthias.
> I have updated the KIP to include KIP-1034 handleError() automatic
> backward compatibility. DLQ part I already mentioned under the Limitation
> section. Let me know if it needs to be improved further.
> Thanks and Regards
> Arpit Goyal
> 8861094754
>
>
> On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]> wrote:
>
>> Thanks for the clarification. Make sense to me.
>>
>> Might be good to add some of these details (no code reference to
>> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie, state
>> explicitly that the new handleError() will be used and that it provides
>> backward compatibility automatically based on it's current implementaion
>> from KIP-1034.
>>
>> And that DLQ records, if returned, would be ignored and dropped and a
>> warning is logged about it for this case.
>>
>>
>>
>> -Matthias
>>
>>
>> On 1/12/26 2:29 AM, Arpit Goyal wrote:
>> > Thank you for the detailed questions! Let me clarify the implementation
>> > approach:
>> >
>> >    Which Method Will Be Called?
>> >
>> >    GlobalThread will call the NEW handleError() method (not the
>> deprecated
>> > handle()).
>> >
>> >    Key Point: The exception handler is not called directly by
>> GlobalThread.
>> > Instead, it's called by ProcessorNode.process(), which already invokes
>> > handleError() for regular processors.
>> >
>> >    The implementation is straightforward:
>> >
>> >    Current code (GlobalStateUpdateTask.initTopology - line 161):
>> >    node.init((InternalProcessorContext) this.processorContext);  // No
>> > handler passed
>> >
>> >    Proposed change:
>> >    node.init((InternalProcessorContext) this.processorContext,
>> > processingExceptionHandler);  // Pass handler
>> >
>> >    Once the handler is passed to ProcessorNode, the same code path that
>> > handles exceptions for regular KStream/KTable processors
>> > (ProcessorNode.process() line 236) will automatically handle
>> GlobalKTable
>> > exceptions:
>> >
>> >    Response response =
>> > processingExceptionHandler.handleError(errorHandlerContext, record,
>> > exception);
>> >
>> >    There's no separate code path for GlobalThread - it reuses the
>> existing
>> > ProcessorNode exception handling mechanism.
>> >
>> >    Backward Compatibility
>> >
>> >    The handleError() method provides automatic backward compatibility
>> via
>> > its default implementation:
>> >
>> >    default Response handleError(...) {
>> >        return new Response(Result.from(handle(...)),
>> > Collections.emptyList());
>> >    }
>> >
>> >    - If users implement the old handle() method: handleError()
>> delegates to
>> > it automatically
>> >    - If users implement the new handleError() method: it's used directly
>> >    - No code changes required for existing applications
>> >
>> >    Dead Letter Queue (DLQ) Support
>> >
>> >    This is where GlobalKTable differs from regular processors:
>> >
>> >    The Limitation: GlobalThread does not have a Producer, so it cannot
>> send
>> > DLQ records to Kafka.
>> >
>> >    Proposed Approach:
>> >
>> >    1. For KIP-1270: When ProcessorNode detects DLQ records but the
>> context
>> > doesn't support RecordCollector (i.e., GlobalThread), it will log a
>> warning
>> > instead of sending:
>> >
>> >    log.warn("Dead letter queue records cannot be sent for GlobalKTable
>> > processors " +
>> >             "(no producer available). DLQ support for GlobalKTable will
>> be
>> > addressed in a future KIP. " +
>> >             "Record details logged: topic={}, headers={}", ...);
>> >
>> >    2. Future KIP: Full DLQ support for GlobalKTable (requires adding
>> > Producer infrastructure) will be proposed separately, as it's a larger
>> > architectural change.
>> >
>> >    How This Avoids User Confusion
>> >
>> >    1. Single handler for all processors: Users configure ONE
>> > ProcessingExceptionHandler that works for both regular and global
>> processors
>> >    2. Consistent behavior: Result.RESUME continues, Result.FAIL stops -
>> same
>> > for both
>> >    3. Clear limitation: DLQ records are logged (not sent) for
>> GlobalKTable,
>> > with explicit warning message
>> >    4. Documentation: Config docs will clearly state DLQ sending
>> limitation
>> > for GlobalKTable
>> > Thanks and Regards
>> > Arpit Goyal
>> > 8861094754
>> >
>> >
>> > On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]>
>> wrote:
>> >
>> >> Thanks for the KIP Arpit.
>> >>
>> >> Can you elaborate a little bit more on details? With the newly added
>> DLQ
>> >> support for regular `Processor`, the existing
>> >> `ProcessingHandlerResponse` and corresponding method `handle()` are
>> >> deprecated with upcoming 4.2 release.
>> >>
>> >> Thus, from AK 4.2+ going forward, users are expected to not implement
>> >> the old `handle()` (even if it's still supported, as long as the new
>> >> `handleError` is not overwritten).
>> >>
>> >> Are you proposing, for now, to only add support for the deprecated
>> >> `handle()` method, ie, the new `handleError()` method would not be
>> >> called by the global-thread code? If yes, this might be confusing for
>> >> users?
>> >>
>> >> If you do not propose this, would it imply that the global-thread code
>> >> would call the new `handlerError()` method? For this case, the question
>> >> is what the runtime would do if users try to use the DLQ feature?
>> >>
>> >> Overall, it's unclear to me what you propose in detail and how we can
>> >> avoid to confuse users, keep it backward compatible, and make it easy
>> to
>> >> understanding how the handler will work.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >> On 1/10/26 8:33 AM, Arpit Goyal wrote:
>> >>> Hi Team
>> >>> Just reaching out again.Need your inputs to move it forward
>> >>>
>> >>> Thanks and Regards
>> >>> Arpit Goyal
>> >>> 8861094754
>> >>>
>> >>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <[email protected]>
>> >> wrote:
>> >>>
>> >>>> Hi All,
>> >>>> I would like to start a discussion for KIP-1270.  This KIP extends
>> >>>> ProcessingExceptionHandler support to GlobalKTable processors,
>> enabling
>> >>>> consistent exception handling across all stream processing types.
>> >>>>
>> >>>> * Current Behavior*
>> >>>>
>> >>>>     When a processing exception occurs in a GlobalKTable processor:
>> >>>>     - The exception propagates to GlobalStreamThread
>> >>>>     - The GlobalStreamThread terminates
>> >>>>     - The entire Kafka Streams application shuts down
>> >>>>     - No user-configurable exception handling is available
>> >>>>
>> >>>> *  Proposed Behavior*
>> >>>>
>> >>>>     After this KIP, when a processing exception occurs in a
>> GlobalKTable
>> >>>> processor:
>> >>>>     - The configured ProcessingExceptionHandler.handleError() will be
>> >> invoked
>> >>>>     - If the handler returns Result.RESUME, processing continues
>> with the
>> >>>> next record
>> >>>>     - If the handler returns Result.FAIL, the exception propagates
>> (same
>> >> as
>> >>>> current behavior)
>> >>>>     - If no handler is configured, behavior remains unchanged
>> (backward
>> >>>> compatible)
>> >>>>
>> >>>> KIP:
>> >>>>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
>> >>>>
>> >>>> Thanks and Regards
>> >>>> Arpit Goyal
>> >>>> 8861094754
>> >>>>
>> >>>
>> >>
>> >>
>> >
>>
>>

Reply via email to