Thanks and Regards
Arpit Goyal
8861094754

On Sat, 30 May, 2026, 10:32 pm Arpit Goyal, <[email protected]>
wrote:

> Hi @Lucas Brutschy <[email protected]>, thanks for reviewing it.
> These are my view on it. Let me know what you think ?
>
> LB1:  To avoid duplicates for  DLQ topic, I don't think we should have an
> EOS specific logic  for global thread. This would be a big architectural
> change also, as we maintain checkpointing in a separate file  rather than 
> consumer
> offsets. I can update the KIP with these details if you are also aligned on
> it.
>
> LB2:  Will not have an EOS producer specifically in the global thread. The
> global thread will use a dedicated non-transactional producer — no
> transactional.id is set, so there is no collision risk with stream thread
> producers under EOS.
>    The client.id will follow a distinct naming convention (e.g.
> <threadId>-global-producer) to avoid any confusion with stream thread
> producers which use the <threadId>-producer suffix.May be we need to
> introduce global producer config and does not allow setting EOS for global
> producer.
>
> LB3:  sendException is never checked in the poll loop today. I will add a
> checkForException() call inside StateConsumer.pollAndUpdate() (mirroring
> what stream threads  do after each process() call), so async DLQ send
> failures like auth errors surface promptly rather than being silently
> swallowed.
>
> LB4: PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG is deprecated
> since 4.3. The semantic change  from "log a warning and drop the record" to
> "invoke the handler and optionally produce to DLQ"  is a meaningful
> behaviour change for anyone who has set this config. I will add an explicit
> entry in the KIP's Compatibility section documenting this, and update the
> @deprecated javadoc on the config to accurately describe what it now does
> (and does not do) rather than leaving the old description in place.
>
> LB5:  Yes, RecordDeserializer.handleDeserializationFailure unconditionally
> casts processorContext to RecordCollector.Supplier at line 124 and today
> this throws a  ClassCastException when the deserialization handler returns
> DLQ records during global state restoration, because
> GlobalProcessorContextImpl does not implement RecordCollector.Supplier. I
> flagged this in a previous PR comment
> https://github.com/apache/kafka/pull/21535#issuecomment-4012433576. Once
> GlobalProcessorContextImpl implements RecordCollector.Supplier as part of
> this KIP, the cast succeeds, and the deserialization DLQ path works
> correctly.
>
> LB6:  I will add integration tests mirroring the coverage in
> DeadLetterQueueIntegrationTest.
>
> LB7:    Good catch; will rename the field from recordCollector to
> collector to match ProcessorContextImpl and keep the naming consistent
> across both context implementations.
>
> Thanks and Regards
> Arpit Goyal
> 8861094754
>
>
> On Mon, May 25, 2026 at 11:11 AM Arpit Goyal <[email protected]>
> wrote:
>
>> Hi Lucas
>> Never mind, i got the link
>> https://lists.apache.org/thread/t7x46hl4ykozrtj641woz3g52cqpwlms .
>>
>> Thanks and Regards
>> Arpit Goyal
>> 8861094754
>>
>>
>> On Mon, May 25, 2026 at 11:10 AM Arpit Goyal <[email protected]>
>> wrote:
>>
>>> Hi Lucas
>>> Could you help update it or provide the link? I don't have access to it.
>>> [image: Screenshot 2026-05-25 at 11.08.57 AM.png]
>>> Thanks and Regards
>>> Arpit Goyal
>>> 8861094754
>>>
>>>
>>> On Fri, May 22, 2026 at 3:20 PM Lucas Brutschy via dev <
>>> [email protected]> wrote:
>>>
>>>> Hi Arpit,
>>>>
>>>> Thanks for picking this up. A few questions before this goes to VOTE.
>>>>
>>>> Nit: the discussion thread link on the KIP is incorrect, can you fix it?
>>>>
>>>> LB1: How does this interact with EOS? Under EOS, the normal DLQ write
>>>> is part of the same transaction as the failing record's offset commit
>>>> (via StreamsProducer.maybeBeginTransaction()). The global thread has
>>>> no consumer-group offset commit, and the checkpoint file sits outside
>>>> any Kafka transaction. Do we run the global producer at-least-once
>>>> even under EOS, or do we wrap DLQ sends in a transaction? If the
>>>> latter, how do we handle ordering against maybeCheckpoint(), the new
>>>> transactional.id, and fencing semantics that don't exist for the
>>>> global thread today?
>>>>
>>>> LB2: What does the producer config look like - in particular client.id
>>>> and transactional.id? How do we avoid colliding with the per-thread
>>>> task producers under EOS?
>>>>
>>>> LB3: What's the shutdown ordering with the new producer, and what
>>>> happens if sendException fires (e.g. DLQ topic auth failure)?
>>>>
>>>> LB4: processing.exception.handler.global.enabled is already
>>>> deprecated. Is the semantic change from "drop with warning" to
>>>> "produce to DLQ" called out in Compatibility, and does the deprecation
>>>> javadoc still reflect what the config does?
>>>>
>>>> LB5: How does this interact with the deserialization handler path?
>>>> RecordDeserializer.handleDeserializationFailure casts the context to
>>>> RecordCollector.Supplier unconditionally, and
>>>> GlobalProcessorContextImpl isn't one today - so a deserialization
>>>> handler returning DLQ records during global-state restoration
>>>> currently hits a ClassCastException rather than warn-and-drop. Does
>>>> the KIP intend to cover this case too (it seems to fall out for free
>>>> once GlobalProcessorContextImpl becomes a RecordCollector.Supplier)?
>>>>
>>>> LB6: What test scenarios are planned? For comparison,
>>>> DeadLetterQueueIntegrationTest on the normal path covers
>>>> DSL/ProcessorAPI x FAIL/CONTINUE plus a deserialization variant - are
>>>> we mirroring that coverage for the global thread?
>>>>
>>>> LB7: Small naming nit - the KIP introduces a recordCollector field on
>>>> GlobalProcessorContextImpl, but the analogous field on
>>>> ProcessorContextImpl is just called collector. Any reason not to
>>>> match?
>>>>
>>>> Cheers,
>>>> Lucas
>>>>
>>>

Reply via email to