Hi Damien and Sebastien,

1.
I think you can just add a `String topic` argument to the existing
`withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
deadLetterQueueRecord)` method, and then the implementation of the
exception handler could choose the topic to send records to using whatever
logic the user desires. You could perhaps provide a built-in implementation
that leverages your new config to send all records to an untyped DLQ topic?

1a.
BTW you have a typo: in your DeserializationExceptionHandler, the type of
your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should
probably be `ConsumerRecord`.

2.
Agreed. I think it's a good idea to provide an implementation that sends to
a single DLQ by default, but it's important to enable users to customize
this with their own exception handlers.

2a.
I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a
DLQ topic like it's a bad record. To me, a DLQ should only contain records
that failed to process. I'm not even sure how a user would
re-process/action one of these other errors; it seems like the purview of
error logging to me?

4.
My point here was that I think it would be useful for the KIP to contain an
explanation of the behavior both with KIP-1033 and without it. i.e. clarify
if/how records that throw an exception in a processor are handled. At the
moment, I'm assuming that without KIP-1033, processing exceptions would not
cause records to be sent to the DLQ, but with KIP-1033, they would. If this
assumption is correct, I think it should be made explicit in the KIP.

5.
Understood. You may want to make this explicit in the documentation for
users, so they understand the consequences of re-processing data sent to
their DLQ. The main reason I raised this point is it's something that's
tripped me up in numerous KIPs that that committers frequently remind me
of; so I wanted to get ahead of it for once! :D

And one new point:
6.
The DLQ record schema appears to discard all custom headers set on the
source record. Is there a way these can be included? In particular, I'm
concerned with "schema pointer" headers (like those set by Schema
Registry), that may need to be propagated, especially if the records are
fed back into the source topics for re-processing by the user.

Regards,
Nick


On Fri, 12 Apr 2024 at 13:20, Damien Gasparina <d.gaspar...@gmail.com>
wrote:

> Hi Nick,
>
> Thanks a lot for your review and your useful comments!
>
> 1. It is a good point, as you mentioned, I think it would make sense
> in some use cases to have potentially multiple DLQ topics, so we
> should provide an API to let users do it.
> Thinking out-loud here, maybe it is a better approach to create a new
> Record class containing the topic name, e.g. DeadLetterQueueRecord and
> changing the signature to
> withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord>
> deadLetterQueueRecords) instead of
> withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
> deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would
> be something like "class DeadLetterQueueRecord extends
> org.apache.kafka.streams.processor.api;.ProducerRecords { String
> topic; /*  + getter/setter + */ } "
>
> 2. I think the root question here is: should we have one DLQ topic or
> multiple DLQ topics by default. This question highly depends on the
> context, but implementing a default implementation to handle multiple
> DLQ topics would be opinionated, e.g. how to manage errors in a
> punctuate?
> I think it makes sense to have the default implementation writing all
> faulty records to a single DLQ, that's at least the approach I used in
> past applications: one DLQ per Kafka Streams application. Of course
> the message format could change in the DLQ e.g. due to the source
> topic, but those DLQ records will be very likely troubleshooted, and
> maybe replay, manually anyway.
> If a user needs to have multiple DLQ topics or want to enforce a
> specific schema, it's still possible, but they would need to implement
> custom Exception Handlers.
> Coming back to 1. I do agree that it would make sense to have the user
> set the DLQ topic name in the handlers for more flexibility.
>
> 3. Good point, sorry it was a typo, the ProcessingContext makes much
> more sense here indeed.
>
> 4. I do assume that we could implement KIP-1033 (Processing exception
> handler) independently from KIP-1034. I do hope that KIP-1033 would be
> adopted and implemented before KIP-1034, but if that's not the case,
> we could implement KIP-1034 indepantly and update KIP-1033 to include
> the DLQ record afterward (in the same KIP or in a new one if not
> possible).
>
> 5. I think we should be clear that this KIP only covers the DLQ record
> produced.
> Everything related to replay messages or recovery plan should be
> considered out-of-scope as it is use-case and error specific.
>
> Let me know if that's not clear, there are definitely points that
> highly debatable.
>
> Cheers,
> Damien
>
> On Fri, 12 Apr 2024 at 13:00, Nick Telford <nick.telf...@gmail.com> wrote:
> >
> > Oh, and one more thing:
> >
> > 5.
> > Whenever you take a record out of the stream, and then potentially
> > re-introduce it at a later date, you introduce the potential for record
> > ordering issues. For example, that record could have been destined for a
> > Window that has been closed by the time it's re-processed. I'd like to
> see
> > a section that considers these consequences, and perhaps make those risks
> > clear to users. For the record, this is exactly what sunk KIP-990, which
> > was an alternative approach to error handling that introduced the same
> > issues.
> >
> > Cheers,
> >
> > Nick
> >
> > On Fri, 12 Apr 2024 at 11:54, Nick Telford <nick.telf...@gmail.com>
> wrote:
> >
> > > Hi Damien,
> > >
> > > Thanks for the KIP! Dead-letter queues are something that I think a
> lot of
> > > users would like.
> > >
> > > I think there are a few points with this KIP that concern me:
> > >
> > > 1.
> > > It looks like you can only define a single, global DLQ for the entire
> > > Kafka Streams application? What about applications that would like to
> > > define different DLQs for different data flows? This is especially
> > > important when dealing with multiple source topics that have different
> > > record schemas.
> > >
> > > 2.
> > > Your DLQ payload value can either be the record value that failed, or
> an
> > > error string (such as "error during punctuate"). This is likely to
> cause
> > > problems when users try to process the records from the DLQ, as they
> can't
> > > guarantee the format of every record value will be the same. This is
> very
> > > loosely related to point 1. above.
> > >
> > > 3.
> > > You provide a ProcessorContext to both exception handlers, but state
> they
> > > cannot be used to forward records. In that case, I believe you should
> use
> > > ProcessingContext instead, which statically guarantees that it can't be
> > > used to forward records.
> > >
> > > 4.
> > > You mention the KIP-1033 ProcessingExceptionHandler, but what's the
> plan
> > > if KIP-1033 is not adopted, or if KIP-1034 lands before 1033?
> > >
> > > Regards,
> > >
> > > Nick
> > >
> > > On Fri, 12 Apr 2024 at 11:38, Damien Gasparina <d.gaspar...@gmail.com>
> > > wrote:
> > >
> > >> In a general way, if the user does not configure the right ACL, that
> > >> would be a security issue, but that's true for any topic.
> > >>
> > >> This KIP allows users to configure a Dead Letter Queue without writing
> > >> custom Java code in Kafka Streams, not at the topic level.
> > >> A lot of applications are already implementing this pattern, but the
> > >> required code to do it is quite painful and error prone, for example
> > >> most apps I have seen created a new KafkaProducer to send records to
> > >> their DLQ.
> > >>
> > >> As it would be disabled by default for backward compatibility, I doubt
> > >> it would generate any security concern.
> > >> If a user explicitly configures a Deal Letter Queue, it would be up to
> > >> him to configure the relevant ACLs to ensure that the right principal
> > >> can access it.
> > >> It is already the case for all internal, input and output Kafka
> > >> Streams topics (e.g. repartition, changelog topics) that also could
> > >> contain confidential data, so I do not think we should implement a
> > >> different behavior for this one.
> > >>
> > >> In this KIP, we configured the default DLQ record to have the initial
> > >> record key/value as we assume that it is the expected and wanted
> > >> behavior for most applications.
> > >> If a user does not want to have the key/value in the DLQ record for
> > >> any reason, they could still implement exception handlers to build
> > >> their own DLQ record.
> > >>
> > >> Regarding ACL, maybe something smarter could be done in Kafka Streams,
> > >> but this is out of scope for this KIP.
> > >>
> > >> On Fri, 12 Apr 2024 at 11:58, Claude Warren <cla...@xenei.com> wrote:
> > >> >
> > >> > My concern is that someone would create a dead letter queue on a
> > >> sensitive
> > >> > topic and not get the ACL correct from the start.  Thus causing
> > >> potential
> > >> > confidential data leak.  Is there anything in the proposal that
> would
> > >> > prevent that from happening?  If so I did not recognize it as such.
> > >> >
> > >> > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina <
> d.gaspar...@gmail.com
> > >> >
> > >> > wrote:
> > >> >
> > >> > > Hi Claude,
> > >> > >
> > >> > > In  this KIP, the Dead Letter Queue is materialized by a standard
> and
> > >> > > independant topic, thus normal ACL applies to it like any other
> topic.
> > >> > > This should not introduce any security issues, obviously, the
> right
> > >> > > ACL would need to be provided to write to the DLQ if configured.
> > >> > >
> > >> > > Cheers,
> > >> > > Damien
> > >> > >
> > >> > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
> > >> > > <claude.war...@aiven.io.invalid> wrote:
> > >> > > >
> > >> > > > I am new to the Kafka codebase so please excuse any ignorance
> on my
> > >> part.
> > >> > > >
> > >> > > > When a dead letter queue is established is there a process to
> > >> ensure that
> > >> > > > it at least is defined with the same ACL as the original queue?
> > >> Without
> > >> > > > such a guarantee at the start it seems that managing dead letter
> > >> queues
> > >> > > > will be fraught with security issues.
> > >> > > >
> > >> > > >
> > >> > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina <
> > >> d.gaspar...@gmail.com
> > >> > > >
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Hi everyone,
> > >> > > > >
> > >> > > > > To continue on our effort to improve Kafka Streams error
> > >> handling, we
> > >> > > > > propose a new KIP to add out of the box support for Dead
> Letter
> > >> Queue.
> > >> > > > > The goal of this KIP is to provide a default implementation
> that
> > >> > > > > should be suitable for most applications and allow users to
> > >> override
> > >> > > > > it if they have specific requirements.
> > >> > > > >
> > >> > > > > In order to build a suitable payload, some additional changes
> are
> > >> > > > > included in this KIP:
> > >> > > > >   1. extend the ProcessingContext to hold, when available, the
> > >> source
> > >> > > > > node raw key/value byte[]
> > >> > > > >   2. expose the ProcessingContext to the
> > >> ProductionExceptionHandler,
> > >> > > > > it is currently not available in the handle parameters.
> > >> > > > >
> > >> > > > > Regarding point 2.,  to expose the ProcessingContext to the
> > >> > > > > ProductionExceptionHandler, we considered two choices:
> > >> > > > >   1. exposing the ProcessingContext as a parameter in the
> handle()
> > >> > > > > method. That's the cleanest way IMHO, but we would need to
> > >> deprecate
> > >> > > > > the old method.
> > >> > > > >   2. exposing the ProcessingContext as an attribute in the
> > >> interface.
> > >> > > > > This way, no method is deprecated, but we would not be
> consistent
> > >> with
> > >> > > > > the other ExceptionHandler.
> > >> > > > >
> > >> > > > > In the KIP, we chose the 1. solution (new handle signature
> with
> > >> old
> > >> > > > > one deprecated), but we could use other opinions on this part.
> > >> > > > > More information is available directly on the KIP.
> > >> > > > >
> > >> > > > > KIP link:
> > >> > > > >
> > >> > >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams
> > >> > > > >
> > >> > > > > Feedbacks and suggestions are welcome,
> > >> > > > >
> > >> > > > > Cheers,
> > >> > > > > Damien, Sebastien and Loic
> > >> > > > >
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > LinkedIn: http://www.linkedin.com/in/claudewarren
> > >>
> > >
>

Reply via email to