Hi @Lucas Brutschy <[email protected]>, thanks for reviewing it. These are my view on it. Let me know what you think ?
LB1: To avoid duplicates for DLQ topic, I don't think we should have an EOS specific logic for global thread. This would be a big architectural change also, as we maintain checkpointing in a separate file rather than consumer offsets. I can update the KIP with these details if you are also aligned on it. LB2: Will not have an EOS producer specifically in the global thread. The global thread will use a dedicated non-transactional producer — no transactional.id is set, so there is no collision risk with stream thread producers under EOS. The client.id will follow a distinct naming convention (e.g. <threadId>-global-producer) to avoid any confusion with stream thread producers which use the <threadId>-producer suffix.May be we need to introduce global producer config and does not allow setting EOS for global producer. LB3: sendException is never checked in the poll loop today. I will add a checkForException() call inside StateConsumer.pollAndUpdate() (mirroring what stream threads do after each process() call), so async DLQ send failures like auth errors surface promptly rather than being silently swallowed. LB4: PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG is deprecated since 4.3. The semantic change from "log a warning and drop the record" to "invoke the handler and optionally produce to DLQ" is a meaningful behaviour change for anyone who has set this config. I will add an explicit entry in the KIP's Compatibility section documenting this, and update the @deprecated javadoc on the config to accurately describe what it now does (and does not do) rather than leaving the old description in place. LB5: Yes, RecordDeserializer.handleDeserializationFailure unconditionally casts processorContext to RecordCollector.Supplier at line 124 and today this throws a ClassCastException when the deserialization handler returns DLQ records during global state restoration, because GlobalProcessorContextImpl does not implement RecordCollector.Supplier. I flagged this in a previous PR comment https://github.com/apache/kafka/pull/21535#issuecomment-4012433576. Once GlobalProcessorContextImpl implements RecordCollector.Supplier as part of this KIP, the cast succeeds, and the deserialization DLQ path works correctly. LB6: I will add integration tests mirroring the coverage in DeadLetterQueueIntegrationTest. LB7: Good catch; will rename the field from recordCollector to collector to match ProcessorContextImpl and keep the naming consistent across both context implementations. Thanks and Regards Arpit Goyal 8861094754 On Sun, May 31, 2026 at 7:21 AM Arpit Goyal <[email protected]> wrote: > > > Thanks and Regards > Arpit Goyal > 8861094754 > > On Sat, 30 May, 2026, 10:32 pm Arpit Goyal, <[email protected]> > wrote: > >> Hi @Lucas Brutschy <[email protected]>, thanks for reviewing it. >> These are my view on it. Let me know what you think ? >> >> LB1: To avoid duplicates for DLQ topic, I don't think we should have >> an EOS specific logic for global thread. This would be a big >> architectural change also, as we maintain checkpointing in a separate file >> rather than consumer offsets. I can update the KIP with these details if >> you are also aligned on it. >> >> LB2: Will not have an EOS producer specifically in the global thread. >> The global thread will use a dedicated non-transactional producer — no >> transactional.id is set, so there is no collision risk with stream >> thread producers under EOS. >> The client.id will follow a distinct naming convention (e.g. >> <threadId>-global-producer) to avoid any confusion with stream thread >> producers which use the <threadId>-producer suffix.May be we need to >> introduce global producer config and does not allow setting EOS for global >> producer. >> >> LB3: sendException is never checked in the poll loop today. I will add a >> checkForException() call inside StateConsumer.pollAndUpdate() (mirroring >> what stream threads do after each process() call), so async DLQ send >> failures like auth errors surface promptly rather than being silently >> swallowed. >> >> LB4: PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG is deprecated >> since 4.3. The semantic change from "log a warning and drop the record" to >> "invoke the handler and optionally produce to DLQ" is a meaningful >> behaviour change for anyone who has set this config. I will add an explicit >> entry in the KIP's Compatibility section documenting this, and update the >> @deprecated javadoc on the config to accurately describe what it now does >> (and does not do) rather than leaving the old description in place. >> >> LB5: Yes, RecordDeserializer.handleDeserializationFailure >> unconditionally casts processorContext to RecordCollector.Supplier at line >> 124 and today this throws a ClassCastException when the deserialization >> handler returns DLQ records during global state restoration, because >> GlobalProcessorContextImpl does not implement RecordCollector.Supplier. I >> flagged this in a previous PR comment >> https://github.com/apache/kafka/pull/21535#issuecomment-4012433576. Once >> GlobalProcessorContextImpl implements RecordCollector.Supplier as part of >> this KIP, the cast succeeds, and the deserialization DLQ path works >> correctly. >> >> LB6: I will add integration tests mirroring the coverage in >> DeadLetterQueueIntegrationTest. >> >> LB7: Good catch; will rename the field from recordCollector to >> collector to match ProcessorContextImpl and keep the naming consistent >> across both context implementations. >> >> Thanks and Regards >> Arpit Goyal >> 8861094754 >> >> >> On Mon, May 25, 2026 at 11:11 AM Arpit Goyal <[email protected]> >> wrote: >> >>> Hi Lucas >>> Never mind, i got the link >>> https://lists.apache.org/thread/t7x46hl4ykozrtj641woz3g52cqpwlms . >>> >>> Thanks and Regards >>> Arpit Goyal >>> 8861094754 >>> >>> >>> On Mon, May 25, 2026 at 11:10 AM Arpit Goyal <[email protected]> >>> wrote: >>> >>>> Hi Lucas >>>> Could you help update it or provide the link? I don't have access to >>>> it. >>>> [image: Screenshot 2026-05-25 at 11.08.57 AM.png] >>>> Thanks and Regards >>>> Arpit Goyal >>>> 8861094754 >>>> >>>> >>>> On Fri, May 22, 2026 at 3:20 PM Lucas Brutschy via dev < >>>> [email protected]> wrote: >>>> >>>>> Hi Arpit, >>>>> >>>>> Thanks for picking this up. A few questions before this goes to VOTE. >>>>> >>>>> Nit: the discussion thread link on the KIP is incorrect, can you fix >>>>> it? >>>>> >>>>> LB1: How does this interact with EOS? Under EOS, the normal DLQ write >>>>> is part of the same transaction as the failing record's offset commit >>>>> (via StreamsProducer.maybeBeginTransaction()). The global thread has >>>>> no consumer-group offset commit, and the checkpoint file sits outside >>>>> any Kafka transaction. Do we run the global producer at-least-once >>>>> even under EOS, or do we wrap DLQ sends in a transaction? If the >>>>> latter, how do we handle ordering against maybeCheckpoint(), the new >>>>> transactional.id, and fencing semantics that don't exist for the >>>>> global thread today? >>>>> >>>>> LB2: What does the producer config look like - in particular client.id >>>>> and transactional.id? How do we avoid colliding with the per-thread >>>>> task producers under EOS? >>>>> >>>>> LB3: What's the shutdown ordering with the new producer, and what >>>>> happens if sendException fires (e.g. DLQ topic auth failure)? >>>>> >>>>> LB4: processing.exception.handler.global.enabled is already >>>>> deprecated. Is the semantic change from "drop with warning" to >>>>> "produce to DLQ" called out in Compatibility, and does the deprecation >>>>> javadoc still reflect what the config does? >>>>> >>>>> LB5: How does this interact with the deserialization handler path? >>>>> RecordDeserializer.handleDeserializationFailure casts the context to >>>>> RecordCollector.Supplier unconditionally, and >>>>> GlobalProcessorContextImpl isn't one today - so a deserialization >>>>> handler returning DLQ records during global-state restoration >>>>> currently hits a ClassCastException rather than warn-and-drop. Does >>>>> the KIP intend to cover this case too (it seems to fall out for free >>>>> once GlobalProcessorContextImpl becomes a RecordCollector.Supplier)? >>>>> >>>>> LB6: What test scenarios are planned? For comparison, >>>>> DeadLetterQueueIntegrationTest on the normal path covers >>>>> DSL/ProcessorAPI x FAIL/CONTINUE plus a deserialization variant - are >>>>> we mirroring that coverage for the global thread? >>>>> >>>>> LB7: Small naming nit - the KIP introduces a recordCollector field on >>>>> GlobalProcessorContextImpl, but the analogous field on >>>>> ProcessorContextImpl is just called collector. Any reason not to >>>>> match? >>>>> >>>>> Cheers, >>>>> Lucas >>>>> >>>>
