Thanks everyone for the input. Should I start voting on it ? Thanks and Regards Arpit Goyal 8861094754
On Tue, 27 Jan, 2026, 2:10 pm Arpit Goyal, <[email protected]> wrote: > Thanks Matthias. > That makes sense. Client can use the single handler implementation to > support error handling for both Stream Thread and Global thread. There is > no need to introduce ThreadType parameter or another configuration for the > same. > @Lucas Brutschy <[email protected]> It must answered your query ? > > Thanks and Regards > Arpit Goyal > 8861094754 > > > On Tue, Jan 27, 2026 at 11:43 AM Matthias J. Sax <[email protected]> wrote: > >> I don't think we would need multiple handlers. The handler is invoked >> passing in `ErrorHandlerContext` parameter, which provides enough >> information to distinguish the case (eg, topic(), processorNodeId(), and >> taskId()), so users can implement different logic inside the same >> handler for the different cases if necessary. >> >> >> -Matthias >> >> >> On 1/23/26 10:05 AM, Arpit Goyal wrote: >> > Thanks Bill and Lucas for the feedback. >> > >> > LB1: I was wondering precisely what we are logging in the DLQ case. Do >> > you intend to log the full record content to make he record >> content >> > recoverable, or only some metadata. I suppose it's the latter. >> > >> > >> > > Since GlobalKTable lacks producer infrastructure, DLQ records >> will be >> > logged with full metadata but NOT sent to a Kafka topic >> > LB2: Maybe I missed it (also not super fluent in that part of the >> > code), but will the implementers of the >> > `ProcessExceptionalHandler` be >> > able to tell whether the error originated from a >> globalThread or a >> > StreamsThread? The implementer may want to specialize >> handling for >> > each case. This is not a must, but would be a nice to have for >> > sure. >> > >> > >. Great question! We have two options here >> > Option 1: Single Handler Configuration >> > >> > >> > Users define one implementation of >> > ProcessingExceptionHandler that handles errors for all stream types >> > (KStream, KTable, and GlobalKTable). This maintains >> > consistency with the existing >> > DeserializationExceptionHandler pattern. >> > Limitation: This will enforce the same handling behavior for >> > global exception handling that we defined for KStream processing >> exception >> > handling. This keeps things >> > simple but is not flexible enough for users >> who >> > may want different behavior for GlobalKTable. >> > Option 2: Separate Optional Configuration >> > >> > >> > Introduce a new optional configuration: >> > global.processing.exception.handler. If configured, it applies >> specifically >> > to GlobalKTable processing errors; if not >> > configured, exceptions bubble up to the >> uncaught >> > exception handler (maintaining current behavior and backward >> > compatibility). >> > Limitation: Requires two configuration properties if users >> want >> > exception handling for both regular streams and GlobalKTable. >> > >> > With Option 1 - ProcessExceptionalHandler does not have a >> way >> > to identify which thread is invoking it as of now. We may need to >> introduce >> > ThreadType(Stream or Global) in errorHandlerContext with ThreadType >> > information. >> > With Option 2 - Client would always be aware of the class it >> > has implemented for GlobalKTables. >> > >> > Thanks and Regards >> > Arpit Goyal >> > 8861094754 >> > >> > >> > On Fri, Jan 23, 2026 at 9:43 PM Bill Bejeck <[email protected]> wrote: >> > >> >> Hi All, >> >> >> >> Thanks for the KIP! Makes sense to me and helps make KS more robust. >> >> I don't have any additional comments beyond what's been said so far. >> >> >> >> -Bill >> >> >> >> On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev < >> >> [email protected]> >> >> wrote: >> >> >> >>> Hi, >> >>> >> >>> Overall, this makes sense to me. Thanks for the KIP! >> >>> >> >>> LB1: I was wondering precisely what we are logging in the DLQ case. Do >> >>> you intend to log the full record content to make he record content >> >>> recoverable, or only some metadata. I suppose it's the latter. >> >>> >> >>> LB2: Maybe I missed it (also not super fluent in that part of the >> >>> code), but will the implementers of the `ProcessExceptionalHandler` be >> >>> able to tell whether the error originated from a globalThread or a >> >>> StreamsThread? The implementer may want to specialize handling for >> >>> each case. This is not a must, but would be a nice to have for sure. >> >>> >> >>> Cheers, >> >>> Lucas >> >>> >> >>> On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <[email protected] >> > >> >>> wrote: >> >>>> >> >>>> Hi All, >> >>>> Looking for more inputs and feedback. This would help to move this >> KIP >> >>>> forward. >> >>>> >> >>>> >> >>>> Thanks and Regards >> >>>> Arpit Goyal >> >>>> 8861094754 >> >>>> >> >>>> On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <[email protected] >> > >> >>> wrote: >> >>>> >> >>>>> Thanks for the response Matthias. >> >>>>> I have updated the KIP to include KIP-1034 handleError() automatic >> >>>>> backward compatibility. DLQ part I already mentioned under the >> >>> Limitation >> >>>>> section. Let me know if it needs to be improved further. >> >>>>> Thanks and Regards >> >>>>> Arpit Goyal >> >>>>> 8861094754 >> >>>>> >> >>>>> >> >>>>> On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]> >> >>> wrote: >> >>>>> >> >>>>>> Thanks for the clarification. Make sense to me. >> >>>>>> >> >>>>>> Might be good to add some of these details (no code reference to >> >>>>>> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie, >> >>> state >> >>>>>> explicitly that the new handleError() will be used and that it >> >>> provides >> >>>>>> backward compatibility automatically based on it's current >> >>> implementaion >> >>>>>> from KIP-1034. >> >>>>>> >> >>>>>> And that DLQ records, if returned, would be ignored and dropped and >> >> a >> >>>>>> warning is logged about it for this case. >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> -Matthias >> >>>>>> >> >>>>>> >> >>>>>> On 1/12/26 2:29 AM, Arpit Goyal wrote: >> >>>>>>> Thank you for the detailed questions! Let me clarify the >> >>> implementation >> >>>>>>> approach: >> >>>>>>> >> >>>>>>> Which Method Will Be Called? >> >>>>>>> >> >>>>>>> GlobalThread will call the NEW handleError() method (not the >> >>>>>> deprecated >> >>>>>>> handle()). >> >>>>>>> >> >>>>>>> Key Point: The exception handler is not called directly by >> >>>>>> GlobalThread. >> >>>>>>> Instead, it's called by ProcessorNode.process(), which already >> >>> invokes >> >>>>>>> handleError() for regular processors. >> >>>>>>> >> >>>>>>> The implementation is straightforward: >> >>>>>>> >> >>>>>>> Current code (GlobalStateUpdateTask.initTopology - line 161): >> >>>>>>> node.init((InternalProcessorContext) this.processorContext); >> >> // >> >>> No >> >>>>>>> handler passed >> >>>>>>> >> >>>>>>> Proposed change: >> >>>>>>> node.init((InternalProcessorContext) this.processorContext, >> >>>>>>> processingExceptionHandler); // Pass handler >> >>>>>>> >> >>>>>>> Once the handler is passed to ProcessorNode, the same code >> path >> >>> that >> >>>>>>> handles exceptions for regular KStream/KTable processors >> >>>>>>> (ProcessorNode.process() line 236) will automatically handle >> >>>>>> GlobalKTable >> >>>>>>> exceptions: >> >>>>>>> >> >>>>>>> Response response = >> >>>>>>> processingExceptionHandler.handleError(errorHandlerContext, >> >> record, >> >>>>>>> exception); >> >>>>>>> >> >>>>>>> There's no separate code path for GlobalThread - it reuses the >> >>>>>> existing >> >>>>>>> ProcessorNode exception handling mechanism. >> >>>>>>> >> >>>>>>> Backward Compatibility >> >>>>>>> >> >>>>>>> The handleError() method provides automatic backward >> >>> compatibility >> >>>>>> via >> >>>>>>> its default implementation: >> >>>>>>> >> >>>>>>> default Response handleError(...) { >> >>>>>>> return new Response(Result.from(handle(...)), >> >>>>>>> Collections.emptyList()); >> >>>>>>> } >> >>>>>>> >> >>>>>>> - If users implement the old handle() method: handleError() >> >>>>>> delegates to >> >>>>>>> it automatically >> >>>>>>> - If users implement the new handleError() method: it's used >> >>> directly >> >>>>>>> - No code changes required for existing applications >> >>>>>>> >> >>>>>>> Dead Letter Queue (DLQ) Support >> >>>>>>> >> >>>>>>> This is where GlobalKTable differs from regular processors: >> >>>>>>> >> >>>>>>> The Limitation: GlobalThread does not have a Producer, so it >> >>> cannot >> >>>>>> send >> >>>>>>> DLQ records to Kafka. >> >>>>>>> >> >>>>>>> Proposed Approach: >> >>>>>>> >> >>>>>>> 1. For KIP-1270: When ProcessorNode detects DLQ records but >> the >> >>>>>> context >> >>>>>>> doesn't support RecordCollector (i.e., GlobalThread), it will log >> >> a >> >>>>>> warning >> >>>>>>> instead of sending: >> >>>>>>> >> >>>>>>> log.warn("Dead letter queue records cannot be sent for >> >>> GlobalKTable >> >>>>>>> processors " + >> >>>>>>> "(no producer available). DLQ support for >> GlobalKTable >> >>> will >> >>>>>> be >> >>>>>>> addressed in a future KIP. " + >> >>>>>>> "Record details logged: topic={}, headers={}", ...); >> >>>>>>> >> >>>>>>> 2. Future KIP: Full DLQ support for GlobalKTable (requires >> >> adding >> >>>>>>> Producer infrastructure) will be proposed separately, as it's a >> >>> larger >> >>>>>>> architectural change. >> >>>>>>> >> >>>>>>> How This Avoids User Confusion >> >>>>>>> >> >>>>>>> 1. Single handler for all processors: Users configure ONE >> >>>>>>> ProcessingExceptionHandler that works for both regular and global >> >>>>>> processors >> >>>>>>> 2. Consistent behavior: Result.RESUME continues, Result.FAIL >> >>> stops - >> >>>>>> same >> >>>>>>> for both >> >>>>>>> 3. Clear limitation: DLQ records are logged (not sent) for >> >>>>>> GlobalKTable, >> >>>>>>> with explicit warning message >> >>>>>>> 4. Documentation: Config docs will clearly state DLQ sending >> >>>>>> limitation >> >>>>>>> for GlobalKTable >> >>>>>>> Thanks and Regards >> >>>>>>> Arpit Goyal >> >>>>>>> 8861094754 >> >>>>>>> >> >>>>>>> >> >>>>>>> On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected] >> >>> >> >>>>>> wrote: >> >>>>>>> >> >>>>>>>> Thanks for the KIP Arpit. >> >>>>>>>> >> >>>>>>>> Can you elaborate a little bit more on details? With the newly >> >>> added >> >>>>>> DLQ >> >>>>>>>> support for regular `Processor`, the existing >> >>>>>>>> `ProcessingHandlerResponse` and corresponding method `handle()` >> >> are >> >>>>>>>> deprecated with upcoming 4.2 release. >> >>>>>>>> >> >>>>>>>> Thus, from AK 4.2+ going forward, users are expected to not >> >>> implement >> >>>>>>>> the old `handle()` (even if it's still supported, as long as the >> >>> new >> >>>>>>>> `handleError` is not overwritten). >> >>>>>>>> >> >>>>>>>> Are you proposing, for now, to only add support for the >> >> deprecated >> >>>>>>>> `handle()` method, ie, the new `handleError()` method would not >> >> be >> >>>>>>>> called by the global-thread code? If yes, this might be confusing >> >>> for >> >>>>>>>> users? >> >>>>>>>> >> >>>>>>>> If you do not propose this, would it imply that the global-thread >> >>> code >> >>>>>>>> would call the new `handlerError()` method? For this case, the >> >>> question >> >>>>>>>> is what the runtime would do if users try to use the DLQ feature? >> >>>>>>>> >> >>>>>>>> Overall, it's unclear to me what you propose in detail and how we >> >>> can >> >>>>>>>> avoid to confuse users, keep it backward compatible, and make it >> >>> easy >> >>>>>> to >> >>>>>>>> understanding how the handler will work. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> -Matthias >> >>>>>>>> >> >>>>>>>> On 1/10/26 8:33 AM, Arpit Goyal wrote: >> >>>>>>>>> Hi Team >> >>>>>>>>> Just reaching out again.Need your inputs to move it forward >> >>>>>>>>> >> >>>>>>>>> Thanks and Regards >> >>>>>>>>> Arpit Goyal >> >>>>>>>>> 8861094754 >> >>>>>>>>> >> >>>>>>>>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, < >> >>> [email protected]> >> >>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> Hi All, >> >>>>>>>>>> I would like to start a discussion for KIP-1270. This KIP >> >>> extends >> >>>>>>>>>> ProcessingExceptionHandler support to GlobalKTable processors, >> >>>>>> enabling >> >>>>>>>>>> consistent exception handling across all stream processing >> >> types. >> >>>>>>>>>> >> >>>>>>>>>> * Current Behavior* >> >>>>>>>>>> >> >>>>>>>>>> When a processing exception occurs in a GlobalKTable >> >>> processor: >> >>>>>>>>>> - The exception propagates to GlobalStreamThread >> >>>>>>>>>> - The GlobalStreamThread terminates >> >>>>>>>>>> - The entire Kafka Streams application shuts down >> >>>>>>>>>> - No user-configurable exception handling is available >> >>>>>>>>>> >> >>>>>>>>>> * Proposed Behavior* >> >>>>>>>>>> >> >>>>>>>>>> After this KIP, when a processing exception occurs in a >> >>>>>> GlobalKTable >> >>>>>>>>>> processor: >> >>>>>>>>>> - The configured ProcessingExceptionHandler.handleError() >> >>> will be >> >>>>>>>> invoked >> >>>>>>>>>> - If the handler returns Result.RESUME, processing >> >> continues >> >>>>>> with the >> >>>>>>>>>> next record >> >>>>>>>>>> - If the handler returns Result.FAIL, the exception >> >>> propagates >> >>>>>> (same >> >>>>>>>> as >> >>>>>>>>>> current behavior) >> >>>>>>>>>> - If no handler is configured, behavior remains unchanged >> >>>>>> (backward >> >>>>>>>>>> compatible) >> >>>>>>>>>> >> >>>>>>>>>> KIP: >> >>>>>>>>>> >> >>>>>>>> >> >>>>>> >> >>> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread >> >>>>>>>>>> >> >>>>>>>>>> Thanks and Regards >> >>>>>>>>>> Arpit Goyal >> >>>>>>>>>> 8861094754 >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>>> >> >>> >> >> >> > >> >>
