Hi, Overall, this makes sense to me. Thanks for the KIP!
LB1: I was wondering precisely what we are logging in the DLQ case. Do you intend to log the full record content to make he record content recoverable, or only some metadata. I suppose it's the latter. LB2: Maybe I missed it (also not super fluent in that part of the code), but will the implementers of the `ProcessExceptionalHandler` be able to tell whether the error originated from a globalThread or a StreamsThread? The implementer may want to specialize handling for each case. This is not a must, but would be a nice to have for sure. Cheers, Lucas On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <[email protected]> wrote: > > Hi All, > Looking for more inputs and feedback. This would help to move this KIP > forward. > > > Thanks and Regards > Arpit Goyal > 8861094754 > > On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <[email protected]> wrote: > > > Thanks for the response Matthias. > > I have updated the KIP to include KIP-1034 handleError() automatic > > backward compatibility. DLQ part I already mentioned under the Limitation > > section. Let me know if it needs to be improved further. > > Thanks and Regards > > Arpit Goyal > > 8861094754 > > > > > > On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]> wrote: > > > >> Thanks for the clarification. Make sense to me. > >> > >> Might be good to add some of these details (no code reference to > >> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie, state > >> explicitly that the new handleError() will be used and that it provides > >> backward compatibility automatically based on it's current implementaion > >> from KIP-1034. > >> > >> And that DLQ records, if returned, would be ignored and dropped and a > >> warning is logged about it for this case. > >> > >> > >> > >> -Matthias > >> > >> > >> On 1/12/26 2:29 AM, Arpit Goyal wrote: > >> > Thank you for the detailed questions! Let me clarify the implementation > >> > approach: > >> > > >> > Which Method Will Be Called? > >> > > >> > GlobalThread will call the NEW handleError() method (not the > >> deprecated > >> > handle()). > >> > > >> > Key Point: The exception handler is not called directly by > >> GlobalThread. > >> > Instead, it's called by ProcessorNode.process(), which already invokes > >> > handleError() for regular processors. > >> > > >> > The implementation is straightforward: > >> > > >> > Current code (GlobalStateUpdateTask.initTopology - line 161): > >> > node.init((InternalProcessorContext) this.processorContext); // No > >> > handler passed > >> > > >> > Proposed change: > >> > node.init((InternalProcessorContext) this.processorContext, > >> > processingExceptionHandler); // Pass handler > >> > > >> > Once the handler is passed to ProcessorNode, the same code path that > >> > handles exceptions for regular KStream/KTable processors > >> > (ProcessorNode.process() line 236) will automatically handle > >> GlobalKTable > >> > exceptions: > >> > > >> > Response response = > >> > processingExceptionHandler.handleError(errorHandlerContext, record, > >> > exception); > >> > > >> > There's no separate code path for GlobalThread - it reuses the > >> existing > >> > ProcessorNode exception handling mechanism. > >> > > >> > Backward Compatibility > >> > > >> > The handleError() method provides automatic backward compatibility > >> via > >> > its default implementation: > >> > > >> > default Response handleError(...) { > >> > return new Response(Result.from(handle(...)), > >> > Collections.emptyList()); > >> > } > >> > > >> > - If users implement the old handle() method: handleError() > >> delegates to > >> > it automatically > >> > - If users implement the new handleError() method: it's used directly > >> > - No code changes required for existing applications > >> > > >> > Dead Letter Queue (DLQ) Support > >> > > >> > This is where GlobalKTable differs from regular processors: > >> > > >> > The Limitation: GlobalThread does not have a Producer, so it cannot > >> send > >> > DLQ records to Kafka. > >> > > >> > Proposed Approach: > >> > > >> > 1. For KIP-1270: When ProcessorNode detects DLQ records but the > >> context > >> > doesn't support RecordCollector (i.e., GlobalThread), it will log a > >> warning > >> > instead of sending: > >> > > >> > log.warn("Dead letter queue records cannot be sent for GlobalKTable > >> > processors " + > >> > "(no producer available). DLQ support for GlobalKTable will > >> be > >> > addressed in a future KIP. " + > >> > "Record details logged: topic={}, headers={}", ...); > >> > > >> > 2. Future KIP: Full DLQ support for GlobalKTable (requires adding > >> > Producer infrastructure) will be proposed separately, as it's a larger > >> > architectural change. > >> > > >> > How This Avoids User Confusion > >> > > >> > 1. Single handler for all processors: Users configure ONE > >> > ProcessingExceptionHandler that works for both regular and global > >> processors > >> > 2. Consistent behavior: Result.RESUME continues, Result.FAIL stops - > >> same > >> > for both > >> > 3. Clear limitation: DLQ records are logged (not sent) for > >> GlobalKTable, > >> > with explicit warning message > >> > 4. Documentation: Config docs will clearly state DLQ sending > >> limitation > >> > for GlobalKTable > >> > Thanks and Regards > >> > Arpit Goyal > >> > 8861094754 > >> > > >> > > >> > On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]> > >> wrote: > >> > > >> >> Thanks for the KIP Arpit. > >> >> > >> >> Can you elaborate a little bit more on details? With the newly added > >> DLQ > >> >> support for regular `Processor`, the existing > >> >> `ProcessingHandlerResponse` and corresponding method `handle()` are > >> >> deprecated with upcoming 4.2 release. > >> >> > >> >> Thus, from AK 4.2+ going forward, users are expected to not implement > >> >> the old `handle()` (even if it's still supported, as long as the new > >> >> `handleError` is not overwritten). > >> >> > >> >> Are you proposing, for now, to only add support for the deprecated > >> >> `handle()` method, ie, the new `handleError()` method would not be > >> >> called by the global-thread code? If yes, this might be confusing for > >> >> users? > >> >> > >> >> If you do not propose this, would it imply that the global-thread code > >> >> would call the new `handlerError()` method? For this case, the question > >> >> is what the runtime would do if users try to use the DLQ feature? > >> >> > >> >> Overall, it's unclear to me what you propose in detail and how we can > >> >> avoid to confuse users, keep it backward compatible, and make it easy > >> to > >> >> understanding how the handler will work. > >> >> > >> >> > >> >> -Matthias > >> >> > >> >> On 1/10/26 8:33 AM, Arpit Goyal wrote: > >> >>> Hi Team > >> >>> Just reaching out again.Need your inputs to move it forward > >> >>> > >> >>> Thanks and Regards > >> >>> Arpit Goyal > >> >>> 8861094754 > >> >>> > >> >>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <[email protected]> > >> >> wrote: > >> >>> > >> >>>> Hi All, > >> >>>> I would like to start a discussion for KIP-1270. This KIP extends > >> >>>> ProcessingExceptionHandler support to GlobalKTable processors, > >> enabling > >> >>>> consistent exception handling across all stream processing types. > >> >>>> > >> >>>> * Current Behavior* > >> >>>> > >> >>>> When a processing exception occurs in a GlobalKTable processor: > >> >>>> - The exception propagates to GlobalStreamThread > >> >>>> - The GlobalStreamThread terminates > >> >>>> - The entire Kafka Streams application shuts down > >> >>>> - No user-configurable exception handling is available > >> >>>> > >> >>>> * Proposed Behavior* > >> >>>> > >> >>>> After this KIP, when a processing exception occurs in a > >> GlobalKTable > >> >>>> processor: > >> >>>> - The configured ProcessingExceptionHandler.handleError() will be > >> >> invoked > >> >>>> - If the handler returns Result.RESUME, processing continues > >> with the > >> >>>> next record > >> >>>> - If the handler returns Result.FAIL, the exception propagates > >> (same > >> >> as > >> >>>> current behavior) > >> >>>> - If no handler is configured, behavior remains unchanged > >> (backward > >> >>>> compatible) > >> >>>> > >> >>>> KIP: > >> >>>> > >> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread > >> >>>> > >> >>>> Thanks and Regards > >> >>>> Arpit Goyal > >> >>>> 8861094754 > >> >>>> > >> >>> > >> >> > >> >> > >> > > >> > >>
