Hi,

only one high-level comment for now:

LB1) Have we considered introducing a new "dropped record handler" for
symmetry with the other DLQ cases? If we had a "LogAndContinue"
handler (default), it would replicate the current existing behavior,
although "LogAndFail" alternative may also make sense in cases where I
absolutely do not want to lose records. Then we'd use exactly the same
DLQ integration as for all the other error handlers.

Cheers,
Lucas

On Sun, May 17, 2026 at 6:01 PM Uladzislau Blok <[email protected]> wrote:
>
> Hello,
>
> On the PAPI vs. DSL-only question
>
> > The case for PAPI exposure is mainly convenience and parity. PAPI users who 
> > want DLQ behavior today can build it themselves with their own producer, 
> > but they can't reuse what KIP-1034 already wired up
> I believe they actually can use it. If they enable the DLQ topic
> property, it shouldn't matter whether an exception is thrown from a
> DSL .map() operator or a custom .process() node.
>
> > I aligned with Matthias's suggestion in the JIRA discussion to expose 
> > writeToDlq() on ProcessingContext,
> Looking at the Jira thread, Matthias wrote: "mjsax: but maybe we could
> expose the DLQ via the "ProcessorContext", allowing not just the DSL,
> but also PAPI users to use it". My understanding is that Matthias was
> proposing a broader scope—specifically, exposing this API to the
> processor—but we can wait for his clarification.
>
> Overall, I think exposing this API is a great proposal because the
> current implementation can cause a major "blast radius" effect. For
> example, if a user has a custom processor that takes one record and
> produces five output records, the current DLQ mechanism only allows
> dropping the entire input record. With this new proposal, they could
> drop just one of those five outputs.
>
> ---
>
> bloku 1 / bloku 2:
> > The default would be a no-op
> > ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG is not set, writeToDlq() is a 
> > no-op
>
> To be honest, I don't think that's the right approach. While the DLQ
> is disabled by default (when no custom exception handler or property
> is set) and a user might not even know the feature exists in Kafka
> Streams, we shouldn't silently ignore an explicit call to send
> something to the DLQ. I think it should behave as follows:
> - If the user sets the DLQ topic parameter directly: Use that topic.
> - If the user doesn't set the topic parameter: Look for the
> ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG property, and if it's not
> configured, throw an exception.
>
> ---
>
> bloku3:
> I actually see an additional layer of complexity here. In the DLQ API,
> users can provide a list of records to be sent to the DLQ, but they
> are expected to be in raw byte format. Currently, we pull those bytes
> from RecordContext, which isn't available to PAPI users (correct me if
> I'm wrong).
> If we want to expose a more powerful API that lets users send any
> record to the DLQ (like in my 1-to-5 output example), they would
> either need to manually convert the records to bytes, or we should
> allow them to provide a serializer (similar to how
> KafkaStreams#queryMetadataForKey works).
>
>
> On Wed, May 6, 2026 at 7:03 PM Daeho Kwon <[email protected]> wrote:
> >
> > Hi Uladzislau,
> >
> > Thanks for the careful review. Answers to your questions below. Happy to 
> > hear thoughts before I update the KIP.
> >
> > > On the PAPI vs. DSL-only question
> >
> > Honestly, I'm not fully certain about this myself. I aligned with 
> > Matthias's suggestion in the JIRA discussion to expose writeToDlq() on 
> > ProcessingContext, but I want to revisit this explicitly now that the 
> > question is on the table.
> >
> > The case for PAPI exposure is mainly convenience and parity. PAPI users who 
> > want DLQ behavior today can build it themselves with their own producer, 
> > but they can't reuse what KIP-1034 already wired up: the standard header 
> > schema (__streams.errors.*), the configured DLQ topic, or the internal 
> > producer. Exposing writeToDlq() would let them inherit that infrastructure 
> > instead of maintaining a parallel implementation.
> >
> > The alternative you mentioned, putting this on InternalProcessorContext so 
> > the DSL gets it without expanding public API, keeps the scope tight but 
> > means PAPI users still maintain their own DLQ plumbing.
> >
> > I'd appreciate input from you and others on the thread on whether the 
> > parity is worth the broader scope, or whether the internal-only path is the 
> > better fit.
> >
> > > bloku1: Compatibility section contradiction
> >
> > You're right, that section is wrong as written. Adding a method to 
> > ProcessingContext is a public API change, and I'll fix the section.
> >
> > For the implementation, I was thinking of defining writeToDlq() as a 
> > `default` method on the interface to preserve compatibility. The default 
> > would be a no-op, and ProcessorContextImpl would override it with the 
> > actual DLQ-write logic. Does that approach sound reasonable, or would you 
> > prefer a different one?
> >
> > > bloku2: Behavior when no DLQ topic is configured
> >
> > My current thinking is to follow KIP-1034's behavior here: when 
> > ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG is not set, writeToDlq() is a 
> > no-op and the observable behavior matches today's (the record is dropped, 
> > the existing WARN log from the DSL operator fires, and 
> > dropped-records-total increments). I'd document this in the Javadoc. Let me 
> > know if you think we should diverge from KIP-1034 here.
> >
> > > bloku3: Per-call topic override
> >
> > Fair point. KIP-1034 already allows custom handlers to write to arbitrary 
> > DLQ topics by constructing ProducerRecord<>(customTopic, ...) themselves. 
> > To stay consistent, one option is to add an overload:
> >
> >     default void writeToDlq(Record<?, ?> record, Exception exception) {
> >         writeToDlq(record, exception, null);
> >     }
> >
> >     default void writeToDlq(Record<?, ?> record, Exception exception, 
> > String dlqTopic) {
> >         // no-op by default; overridden by ProcessorContextImpl
> >     }
> >
> > The two-argument form uses the configured default topic. The three-argument 
> > form lets callers route specific dropped records to specific topics. If 
> > dlqTopic is null and no default is configured, the call is a no-op as 
> > described in bloku2.
> >
> > Would this overload work, or do you (or others on the thread) have a 
> > different preference for how to expose per-call routing?
> >
> > Let me know your thoughts, and I'll update the KIP accordingly.
> >
> > Thanks,
> > Daeho
> >
> > -----Original Message-----
> > From: "Uladzislau Blok"<[email protected]>
> > To: <[email protected]>;
> > Cc:
> > Sent: 2026-05-02 (토) 21:17:55 (GMT+09:00)
> > Subject: Re: [DISCUSS] KIP-1328: Dead Letter Queue Support for DSL Dropped 
> > Records
> >
> > Dear Daeho Kwon,
> >
> > Thanks for the KIP!
> >
> > From my understanding of the JIRA ticket, two proposals were evaluated:
> >
> > - Exposing the DLQ method to PAPI (affects public API).
> > - Creating an internal utility for DSL operators or extending
> > InternalProcessorContext instead of the public one (this still affects the
> > public DLQ API, but avoids changes to PAPI).
> >
> > If we choose to change PAPI, we likely need to extend the scope of the KIP.
> > Currently, it focuses on the DSL, but adding this method to
> > ProcessingContext would enable it for all clients. I suggest revisiting the
> > proposals to consider the trade-offs between using an internal utility for
> > the DSL case versus generally enabling this functionality for PAPI.
> >
> > I also have the following questions regarding the KIP:
> >
> > - bloku1: The "Compatibility, Deprecation, and Migration Plan" section
> > states there are no changes to the public API, but adding a method to
> > ProcessingContext seems to contradict this. Can we clarify this point?
> > - bloku2: What happens if a user calls the writeToDlq method without
> > specifying a DLQ topic name via the
> > ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG property?
> > - bloku3: KIP-1034 allows users to send records to a specific topic even
> > when a default DLQ topic is set. Do we plan to allow this in KIP-1328?
> >
> > Thanks,
> > Uladzislau Blok
> >
> >
> > On Mon, Apr 27, 2026 at 1:43 PM 권대호 <[email protected]> wrote:
> > >
> > >   Hi all,
> > >   I'd like to start a discussion on KIP-1328: Dead Letter Queue Support
> > for DSL Dropped Records.
> >
> > >
> > >   KIP:
> > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1328*3A*Dead*Letter*Queue*Support*for*DSL*Dropped*Records__;JSsrKysrKysr!!Ayb5sqE7!taDV8ekorXKb8UH3fFI0SdjPirtMUS27uG6odt_thOFxB0A2d1iLEpQ29_0X9iusBsucylMIIp-f-UBl041i$
> >
> >
> > >   JIRA: 
> > > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/KAFKA-20439__;!!Ayb5sqE7!taDV8ekorXKb8UH3fFI0SdjPirtMUS27uG6odt_thOFxB0A2d1iLEpQ29_0X9iusBsucylMIIp-f-WdwPwAN$
> >
> >
> > >
> > >   Kafka Streams DSL operators currently drop certain records silently
> > (e.g., null key/value, expired window records) with only a log warning and
> > a metric increment. There is no way to capture these records
> > >   for analysis or reprocessing.
> >
> >
> > >
> > >   This KIP proposes adding a writeToDlq() method to ProcessingContext,
> > allowing DSL operators to route dropped records to the existing DLQ
> > infrastructure introduced in KIP-1034.
> > >
> > >   Looking forward to your feedback.
> >
> >
> > >
> > >   Thanks,
> >
> >
> > >   Daeho Kwon
> >

Reply via email to