Thanks for the response Matthias. I have updated the KIP to include KIP-1034 handleError() automatic backward compatibility. DLQ part I already mentioned under the Limitation section. Let me know if it needs to be improved further. Thanks and Regards Arpit Goyal 8861094754
On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]> wrote: > Thanks for the clarification. Make sense to me. > > Might be good to add some of these details (no code reference to > `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie, state > explicitly that the new handleError() will be used and that it provides > backward compatibility automatically based on it's current implementaion > from KIP-1034. > > And that DLQ records, if returned, would be ignored and dropped and a > warning is logged about it for this case. > > > > -Matthias > > > On 1/12/26 2:29 AM, Arpit Goyal wrote: > > Thank you for the detailed questions! Let me clarify the implementation > > approach: > > > > Which Method Will Be Called? > > > > GlobalThread will call the NEW handleError() method (not the > deprecated > > handle()). > > > > Key Point: The exception handler is not called directly by > GlobalThread. > > Instead, it's called by ProcessorNode.process(), which already invokes > > handleError() for regular processors. > > > > The implementation is straightforward: > > > > Current code (GlobalStateUpdateTask.initTopology - line 161): > > node.init((InternalProcessorContext) this.processorContext); // No > > handler passed > > > > Proposed change: > > node.init((InternalProcessorContext) this.processorContext, > > processingExceptionHandler); // Pass handler > > > > Once the handler is passed to ProcessorNode, the same code path that > > handles exceptions for regular KStream/KTable processors > > (ProcessorNode.process() line 236) will automatically handle GlobalKTable > > exceptions: > > > > Response response = > > processingExceptionHandler.handleError(errorHandlerContext, record, > > exception); > > > > There's no separate code path for GlobalThread - it reuses the > existing > > ProcessorNode exception handling mechanism. > > > > Backward Compatibility > > > > The handleError() method provides automatic backward compatibility via > > its default implementation: > > > > default Response handleError(...) { > > return new Response(Result.from(handle(...)), > > Collections.emptyList()); > > } > > > > - If users implement the old handle() method: handleError() delegates > to > > it automatically > > - If users implement the new handleError() method: it's used directly > > - No code changes required for existing applications > > > > Dead Letter Queue (DLQ) Support > > > > This is where GlobalKTable differs from regular processors: > > > > The Limitation: GlobalThread does not have a Producer, so it cannot > send > > DLQ records to Kafka. > > > > Proposed Approach: > > > > 1. For KIP-1270: When ProcessorNode detects DLQ records but the > context > > doesn't support RecordCollector (i.e., GlobalThread), it will log a > warning > > instead of sending: > > > > log.warn("Dead letter queue records cannot be sent for GlobalKTable > > processors " + > > "(no producer available). DLQ support for GlobalKTable will > be > > addressed in a future KIP. " + > > "Record details logged: topic={}, headers={}", ...); > > > > 2. Future KIP: Full DLQ support for GlobalKTable (requires adding > > Producer infrastructure) will be proposed separately, as it's a larger > > architectural change. > > > > How This Avoids User Confusion > > > > 1. Single handler for all processors: Users configure ONE > > ProcessingExceptionHandler that works for both regular and global > processors > > 2. Consistent behavior: Result.RESUME continues, Result.FAIL stops - > same > > for both > > 3. Clear limitation: DLQ records are logged (not sent) for > GlobalKTable, > > with explicit warning message > > 4. Documentation: Config docs will clearly state DLQ sending > limitation > > for GlobalKTable > > Thanks and Regards > > Arpit Goyal > > 8861094754 > > > > > > On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]> > wrote: > > > >> Thanks for the KIP Arpit. > >> > >> Can you elaborate a little bit more on details? With the newly added DLQ > >> support for regular `Processor`, the existing > >> `ProcessingHandlerResponse` and corresponding method `handle()` are > >> deprecated with upcoming 4.2 release. > >> > >> Thus, from AK 4.2+ going forward, users are expected to not implement > >> the old `handle()` (even if it's still supported, as long as the new > >> `handleError` is not overwritten). > >> > >> Are you proposing, for now, to only add support for the deprecated > >> `handle()` method, ie, the new `handleError()` method would not be > >> called by the global-thread code? If yes, this might be confusing for > >> users? > >> > >> If you do not propose this, would it imply that the global-thread code > >> would call the new `handlerError()` method? For this case, the question > >> is what the runtime would do if users try to use the DLQ feature? > >> > >> Overall, it's unclear to me what you propose in detail and how we can > >> avoid to confuse users, keep it backward compatible, and make it easy to > >> understanding how the handler will work. > >> > >> > >> -Matthias > >> > >> On 1/10/26 8:33 AM, Arpit Goyal wrote: > >>> Hi Team > >>> Just reaching out again.Need your inputs to move it forward > >>> > >>> Thanks and Regards > >>> Arpit Goyal > >>> 8861094754 > >>> > >>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <[email protected]> > >> wrote: > >>> > >>>> Hi All, > >>>> I would like to start a discussion for KIP-1270. This KIP extends > >>>> ProcessingExceptionHandler support to GlobalKTable processors, > enabling > >>>> consistent exception handling across all stream processing types. > >>>> > >>>> * Current Behavior* > >>>> > >>>> When a processing exception occurs in a GlobalKTable processor: > >>>> - The exception propagates to GlobalStreamThread > >>>> - The GlobalStreamThread terminates > >>>> - The entire Kafka Streams application shuts down > >>>> - No user-configurable exception handling is available > >>>> > >>>> * Proposed Behavior* > >>>> > >>>> After this KIP, when a processing exception occurs in a > GlobalKTable > >>>> processor: > >>>> - The configured ProcessingExceptionHandler.handleError() will be > >> invoked > >>>> - If the handler returns Result.RESUME, processing continues with > the > >>>> next record > >>>> - If the handler returns Result.FAIL, the exception propagates > (same > >> as > >>>> current behavior) > >>>> - If no handler is configured, behavior remains unchanged > (backward > >>>> compatible) > >>>> > >>>> KIP: > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread > >>>> > >>>> Thanks and Regards > >>>> Arpit Goyal > >>>> 8861094754 > >>>> > >>> > >> > >> > > > >
