Hi All, Looking for more inputs and feedback. This would help to move this KIP forward.
Thanks and Regards Arpit Goyal 8861094754 On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <[email protected]> wrote: > Thanks for the response Matthias. > I have updated the KIP to include KIP-1034 handleError() automatic > backward compatibility. DLQ part I already mentioned under the Limitation > section. Let me know if it needs to be improved further. > Thanks and Regards > Arpit Goyal > 8861094754 > > > On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]> wrote: > >> Thanks for the clarification. Make sense to me. >> >> Might be good to add some of these details (no code reference to >> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie, state >> explicitly that the new handleError() will be used and that it provides >> backward compatibility automatically based on it's current implementaion >> from KIP-1034. >> >> And that DLQ records, if returned, would be ignored and dropped and a >> warning is logged about it for this case. >> >> >> >> -Matthias >> >> >> On 1/12/26 2:29 AM, Arpit Goyal wrote: >> > Thank you for the detailed questions! Let me clarify the implementation >> > approach: >> > >> > Which Method Will Be Called? >> > >> > GlobalThread will call the NEW handleError() method (not the >> deprecated >> > handle()). >> > >> > Key Point: The exception handler is not called directly by >> GlobalThread. >> > Instead, it's called by ProcessorNode.process(), which already invokes >> > handleError() for regular processors. >> > >> > The implementation is straightforward: >> > >> > Current code (GlobalStateUpdateTask.initTopology - line 161): >> > node.init((InternalProcessorContext) this.processorContext); // No >> > handler passed >> > >> > Proposed change: >> > node.init((InternalProcessorContext) this.processorContext, >> > processingExceptionHandler); // Pass handler >> > >> > Once the handler is passed to ProcessorNode, the same code path that >> > handles exceptions for regular KStream/KTable processors >> > (ProcessorNode.process() line 236) will automatically handle >> GlobalKTable >> > exceptions: >> > >> > Response response = >> > processingExceptionHandler.handleError(errorHandlerContext, record, >> > exception); >> > >> > There's no separate code path for GlobalThread - it reuses the >> existing >> > ProcessorNode exception handling mechanism. >> > >> > Backward Compatibility >> > >> > The handleError() method provides automatic backward compatibility >> via >> > its default implementation: >> > >> > default Response handleError(...) { >> > return new Response(Result.from(handle(...)), >> > Collections.emptyList()); >> > } >> > >> > - If users implement the old handle() method: handleError() >> delegates to >> > it automatically >> > - If users implement the new handleError() method: it's used directly >> > - No code changes required for existing applications >> > >> > Dead Letter Queue (DLQ) Support >> > >> > This is where GlobalKTable differs from regular processors: >> > >> > The Limitation: GlobalThread does not have a Producer, so it cannot >> send >> > DLQ records to Kafka. >> > >> > Proposed Approach: >> > >> > 1. For KIP-1270: When ProcessorNode detects DLQ records but the >> context >> > doesn't support RecordCollector (i.e., GlobalThread), it will log a >> warning >> > instead of sending: >> > >> > log.warn("Dead letter queue records cannot be sent for GlobalKTable >> > processors " + >> > "(no producer available). DLQ support for GlobalKTable will >> be >> > addressed in a future KIP. " + >> > "Record details logged: topic={}, headers={}", ...); >> > >> > 2. Future KIP: Full DLQ support for GlobalKTable (requires adding >> > Producer infrastructure) will be proposed separately, as it's a larger >> > architectural change. >> > >> > How This Avoids User Confusion >> > >> > 1. Single handler for all processors: Users configure ONE >> > ProcessingExceptionHandler that works for both regular and global >> processors >> > 2. Consistent behavior: Result.RESUME continues, Result.FAIL stops - >> same >> > for both >> > 3. Clear limitation: DLQ records are logged (not sent) for >> GlobalKTable, >> > with explicit warning message >> > 4. Documentation: Config docs will clearly state DLQ sending >> limitation >> > for GlobalKTable >> > Thanks and Regards >> > Arpit Goyal >> > 8861094754 >> > >> > >> > On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]> >> wrote: >> > >> >> Thanks for the KIP Arpit. >> >> >> >> Can you elaborate a little bit more on details? With the newly added >> DLQ >> >> support for regular `Processor`, the existing >> >> `ProcessingHandlerResponse` and corresponding method `handle()` are >> >> deprecated with upcoming 4.2 release. >> >> >> >> Thus, from AK 4.2+ going forward, users are expected to not implement >> >> the old `handle()` (even if it's still supported, as long as the new >> >> `handleError` is not overwritten). >> >> >> >> Are you proposing, for now, to only add support for the deprecated >> >> `handle()` method, ie, the new `handleError()` method would not be >> >> called by the global-thread code? If yes, this might be confusing for >> >> users? >> >> >> >> If you do not propose this, would it imply that the global-thread code >> >> would call the new `handlerError()` method? For this case, the question >> >> is what the runtime would do if users try to use the DLQ feature? >> >> >> >> Overall, it's unclear to me what you propose in detail and how we can >> >> avoid to confuse users, keep it backward compatible, and make it easy >> to >> >> understanding how the handler will work. >> >> >> >> >> >> -Matthias >> >> >> >> On 1/10/26 8:33 AM, Arpit Goyal wrote: >> >>> Hi Team >> >>> Just reaching out again.Need your inputs to move it forward >> >>> >> >>> Thanks and Regards >> >>> Arpit Goyal >> >>> 8861094754 >> >>> >> >>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <[email protected]> >> >> wrote: >> >>> >> >>>> Hi All, >> >>>> I would like to start a discussion for KIP-1270. This KIP extends >> >>>> ProcessingExceptionHandler support to GlobalKTable processors, >> enabling >> >>>> consistent exception handling across all stream processing types. >> >>>> >> >>>> * Current Behavior* >> >>>> >> >>>> When a processing exception occurs in a GlobalKTable processor: >> >>>> - The exception propagates to GlobalStreamThread >> >>>> - The GlobalStreamThread terminates >> >>>> - The entire Kafka Streams application shuts down >> >>>> - No user-configurable exception handling is available >> >>>> >> >>>> * Proposed Behavior* >> >>>> >> >>>> After this KIP, when a processing exception occurs in a >> GlobalKTable >> >>>> processor: >> >>>> - The configured ProcessingExceptionHandler.handleError() will be >> >> invoked >> >>>> - If the handler returns Result.RESUME, processing continues >> with the >> >>>> next record >> >>>> - If the handler returns Result.FAIL, the exception propagates >> (same >> >> as >> >>>> current behavior) >> >>>> - If no handler is configured, behavior remains unchanged >> (backward >> >>>> compatible) >> >>>> >> >>>> KIP: >> >>>> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread >> >>>> >> >>>> Thanks and Regards >> >>>> Arpit Goyal >> >>>> 8861094754 >> >>>> >> >>> >> >> >> >> >> > >> >>
