Hi, only one high-level comment for now:
LB1) Have we considered introducing a new "dropped record handler" for symmetry with the other DLQ cases? If we had a "LogAndContinue" handler (default), it would replicate the current existing behavior, although "LogAndFail" alternative may also make sense in cases where I absolutely do not want to lose records. Then we'd use exactly the same DLQ integration as for all the other error handlers. Cheers, Lucas On Sun, May 17, 2026 at 6:01 PM Uladzislau Blok <[email protected]> wrote: > > Hello, > > On the PAPI vs. DSL-only question > > > The case for PAPI exposure is mainly convenience and parity. PAPI users who > > want DLQ behavior today can build it themselves with their own producer, > > but they can't reuse what KIP-1034 already wired up > I believe they actually can use it. If they enable the DLQ topic > property, it shouldn't matter whether an exception is thrown from a > DSL .map() operator or a custom .process() node. > > > I aligned with Matthias's suggestion in the JIRA discussion to expose > > writeToDlq() on ProcessingContext, > Looking at the Jira thread, Matthias wrote: "mjsax: but maybe we could > expose the DLQ via the "ProcessorContext", allowing not just the DSL, > but also PAPI users to use it". My understanding is that Matthias was > proposing a broader scope—specifically, exposing this API to the > processor—but we can wait for his clarification. > > Overall, I think exposing this API is a great proposal because the > current implementation can cause a major "blast radius" effect. For > example, if a user has a custom processor that takes one record and > produces five output records, the current DLQ mechanism only allows > dropping the entire input record. With this new proposal, they could > drop just one of those five outputs. > > --- > > bloku 1 / bloku 2: > > The default would be a no-op > > ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG is not set, writeToDlq() is a > > no-op > > To be honest, I don't think that's the right approach. While the DLQ > is disabled by default (when no custom exception handler or property > is set) and a user might not even know the feature exists in Kafka > Streams, we shouldn't silently ignore an explicit call to send > something to the DLQ. I think it should behave as follows: > - If the user sets the DLQ topic parameter directly: Use that topic. > - If the user doesn't set the topic parameter: Look for the > ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG property, and if it's not > configured, throw an exception. > > --- > > bloku3: > I actually see an additional layer of complexity here. In the DLQ API, > users can provide a list of records to be sent to the DLQ, but they > are expected to be in raw byte format. Currently, we pull those bytes > from RecordContext, which isn't available to PAPI users (correct me if > I'm wrong). > If we want to expose a more powerful API that lets users send any > record to the DLQ (like in my 1-to-5 output example), they would > either need to manually convert the records to bytes, or we should > allow them to provide a serializer (similar to how > KafkaStreams#queryMetadataForKey works). > > > On Wed, May 6, 2026 at 7:03 PM Daeho Kwon <[email protected]> wrote: > > > > Hi Uladzislau, > > > > Thanks for the careful review. Answers to your questions below. Happy to > > hear thoughts before I update the KIP. > > > > > On the PAPI vs. DSL-only question > > > > Honestly, I'm not fully certain about this myself. I aligned with > > Matthias's suggestion in the JIRA discussion to expose writeToDlq() on > > ProcessingContext, but I want to revisit this explicitly now that the > > question is on the table. > > > > The case for PAPI exposure is mainly convenience and parity. PAPI users who > > want DLQ behavior today can build it themselves with their own producer, > > but they can't reuse what KIP-1034 already wired up: the standard header > > schema (__streams.errors.*), the configured DLQ topic, or the internal > > producer. Exposing writeToDlq() would let them inherit that infrastructure > > instead of maintaining a parallel implementation. > > > > The alternative you mentioned, putting this on InternalProcessorContext so > > the DSL gets it without expanding public API, keeps the scope tight but > > means PAPI users still maintain their own DLQ plumbing. > > > > I'd appreciate input from you and others on the thread on whether the > > parity is worth the broader scope, or whether the internal-only path is the > > better fit. > > > > > bloku1: Compatibility section contradiction > > > > You're right, that section is wrong as written. Adding a method to > > ProcessingContext is a public API change, and I'll fix the section. > > > > For the implementation, I was thinking of defining writeToDlq() as a > > `default` method on the interface to preserve compatibility. The default > > would be a no-op, and ProcessorContextImpl would override it with the > > actual DLQ-write logic. Does that approach sound reasonable, or would you > > prefer a different one? > > > > > bloku2: Behavior when no DLQ topic is configured > > > > My current thinking is to follow KIP-1034's behavior here: when > > ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG is not set, writeToDlq() is a > > no-op and the observable behavior matches today's (the record is dropped, > > the existing WARN log from the DSL operator fires, and > > dropped-records-total increments). I'd document this in the Javadoc. Let me > > know if you think we should diverge from KIP-1034 here. > > > > > bloku3: Per-call topic override > > > > Fair point. KIP-1034 already allows custom handlers to write to arbitrary > > DLQ topics by constructing ProducerRecord<>(customTopic, ...) themselves. > > To stay consistent, one option is to add an overload: > > > > default void writeToDlq(Record<?, ?> record, Exception exception) { > > writeToDlq(record, exception, null); > > } > > > > default void writeToDlq(Record<?, ?> record, Exception exception, > > String dlqTopic) { > > // no-op by default; overridden by ProcessorContextImpl > > } > > > > The two-argument form uses the configured default topic. The three-argument > > form lets callers route specific dropped records to specific topics. If > > dlqTopic is null and no default is configured, the call is a no-op as > > described in bloku2. > > > > Would this overload work, or do you (or others on the thread) have a > > different preference for how to expose per-call routing? > > > > Let me know your thoughts, and I'll update the KIP accordingly. > > > > Thanks, > > Daeho > > > > -----Original Message----- > > From: "Uladzislau Blok"<[email protected]> > > To: <[email protected]>; > > Cc: > > Sent: 2026-05-02 (토) 21:17:55 (GMT+09:00) > > Subject: Re: [DISCUSS] KIP-1328: Dead Letter Queue Support for DSL Dropped > > Records > > > > Dear Daeho Kwon, > > > > Thanks for the KIP! > > > > From my understanding of the JIRA ticket, two proposals were evaluated: > > > > - Exposing the DLQ method to PAPI (affects public API). > > - Creating an internal utility for DSL operators or extending > > InternalProcessorContext instead of the public one (this still affects the > > public DLQ API, but avoids changes to PAPI). > > > > If we choose to change PAPI, we likely need to extend the scope of the KIP. > > Currently, it focuses on the DSL, but adding this method to > > ProcessingContext would enable it for all clients. I suggest revisiting the > > proposals to consider the trade-offs between using an internal utility for > > the DSL case versus generally enabling this functionality for PAPI. > > > > I also have the following questions regarding the KIP: > > > > - bloku1: The "Compatibility, Deprecation, and Migration Plan" section > > states there are no changes to the public API, but adding a method to > > ProcessingContext seems to contradict this. Can we clarify this point? > > - bloku2: What happens if a user calls the writeToDlq method without > > specifying a DLQ topic name via the > > ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG property? > > - bloku3: KIP-1034 allows users to send records to a specific topic even > > when a default DLQ topic is set. Do we plan to allow this in KIP-1328? > > > > Thanks, > > Uladzislau Blok > > > > > > On Mon, Apr 27, 2026 at 1:43 PM 권대호 <[email protected]> wrote: > > > > > > Hi all, > > > I'd like to start a discussion on KIP-1328: Dead Letter Queue Support > > for DSL Dropped Records. > > > > > > > > KIP: > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1328*3A*Dead*Letter*Queue*Support*for*DSL*Dropped*Records__;JSsrKysrKysr!!Ayb5sqE7!taDV8ekorXKb8UH3fFI0SdjPirtMUS27uG6odt_thOFxB0A2d1iLEpQ29_0X9iusBsucylMIIp-f-UBl041i$ > > > > > > > JIRA: > > > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/KAFKA-20439__;!!Ayb5sqE7!taDV8ekorXKb8UH3fFI0SdjPirtMUS27uG6odt_thOFxB0A2d1iLEpQ29_0X9iusBsucylMIIp-f-WdwPwAN$ > > > > > > > > > > Kafka Streams DSL operators currently drop certain records silently > > (e.g., null key/value, expired window records) with only a log warning and > > a metric increment. There is no way to capture these records > > > for analysis or reprocessing. > > > > > > > > > > This KIP proposes adding a writeToDlq() method to ProcessingContext, > > allowing DSL operators to route dropped records to the existing DLQ > > infrastructure introduced in KIP-1034. > > > > > > Looking forward to your feedback. > > > > > > > > > > Thanks, > > > > > > > Daeho Kwon > >
