Hi Arpit,

Thanks for picking this up. A few questions before this goes to VOTE.

Nit: the discussion thread link on the KIP is incorrect, can you fix it?

LB1: How does this interact with EOS? Under EOS, the normal DLQ write
is part of the same transaction as the failing record's offset commit
(via StreamsProducer.maybeBeginTransaction()). The global thread has
no consumer-group offset commit, and the checkpoint file sits outside
any Kafka transaction. Do we run the global producer at-least-once
even under EOS, or do we wrap DLQ sends in a transaction? If the
latter, how do we handle ordering against maybeCheckpoint(), the new
transactional.id, and fencing semantics that don't exist for the
global thread today?

LB2: What does the producer config look like - in particular client.id
and transactional.id? How do we avoid colliding with the per-thread
task producers under EOS?

LB3: What's the shutdown ordering with the new producer, and what
happens if sendException fires (e.g. DLQ topic auth failure)?

LB4: processing.exception.handler.global.enabled is already
deprecated. Is the semantic change from "drop with warning" to
"produce to DLQ" called out in Compatibility, and does the deprecation
javadoc still reflect what the config does?

LB5: How does this interact with the deserialization handler path?
RecordDeserializer.handleDeserializationFailure casts the context to
RecordCollector.Supplier unconditionally, and
GlobalProcessorContextImpl isn't one today - so a deserialization
handler returning DLQ records during global-state restoration
currently hits a ClassCastException rather than warn-and-drop. Does
the KIP intend to cover this case too (it seems to fall out for free
once GlobalProcessorContextImpl becomes a RecordCollector.Supplier)?

LB6: What test scenarios are planned? For comparison,
DeadLetterQueueIntegrationTest on the normal path covers
DSL/ProcessorAPI x FAIL/CONTINUE plus a deserialization variant - are
we mirroring that coverage for the global thread?

LB7: Small naming nit - the KIP introduces a recordCollector field on
GlobalProcessorContextImpl, but the analogous field on
ProcessorContextImpl is just called collector. Any reason not to
match?

Cheers,
Lucas

Reply via email to