Hi,

I won't be able to make it to KIP hangout due to conflict.

Anna, here is the use case where knowing if there are messages in the
RecordAccumulator left to be sent to the kafka cluster for a topic is
useful.

1) Consider a pipeline :
A ---> Mirror-maker -----> B

2) We have a topic T in cluster A mirrored to cluster B.

3) Now if we delete topic T in A and immediately proceed to delete the
topic in cluster B, some of the the Mirror-maker machines die because
atleast one of the batches in RecordAccumulator for topic T fail to be
produced to cluster B. We have seen this happening in our clusters.


If we know that there are no more messages left in the RecordAccumulator to
be produced to cluster B, we can safely delete the topic in cluster B
without disturbing the pipeline.

Thanks,

Mayuresh

On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner <a...@confluent.io> wrote:

> Thanks Ismael and Todd for your feedback!
>
> I agree about coming up with lean, but useful interfaces that will be easy
> to extend later.
>
> When we discuss the minimal set of producer and consumer interceptor API in
> today’s KIP meeting (discussion item #2 in my previous email), lets compare
> two options:
>
> *1. Minimal set of immutable API for producer and consumer interceptors*
>
> ProducerInterceptor:
> public void onSend(ProducerRecord<K, V> record);
> public void onAcknowledgement(RecordMetadata metadata, Exception
> exception);
>
> ConsumerInterceptor:
> public void onConsume(ConsumerRecords<K, V> records);
> public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
>
> Use-cases:
> — end-to-end monitoring; custom tracing and logging
>
>
> *2. Minimal set of mutable API for producer and consumer interceptors*
>
> ProducerInterceptor:
> ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
> void onAcknowledgement(RecordMetadata metadata, Exception exception);
>
> ConsumerInterceptor:
> void onConsume(ConsumerRecords<K, V> records);
> void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
>
> Additional use-cases to #1:
> — Ability to add metadata to a message or fill in standard fields for audit
> and routing.
>
> Implications
> — Partition assignment will be done based on modified key/value instead of
> original key/value. If key/value transformation is not consistent (same key
> and value does not mutate to the same, but modified, key/value), then log
> compaction would not work. However, audit and routing use-cases from Todd
> will likely do consistent transformation.
>
>
> *Additional callbacks (discussion item #3 in my previous email):*
>
> If we want to support encryption, we would want to be able to modify
> serialized key/value, rather than key and value objects. This will add the
> following API to producer and consumer interceptors:
>
> ProducerInterceptor:
> SerializedKeyValue onEnqueued(TopicPartition tp, ProducerRecord<K, V>
> record, SerializedKeyValue serializedKeyValue);
>
> ConsumerInterceptor:
> SerializedKeyValue onReceive(TopicPartition tp, SerializedKeyValue
> serializedKeyValue);
>
>
> I am leaning towards implementing the minimal set of immutable or mutable
> interfaces, making sure that we have a compatibility plan that allows us to
> add more callbacks in the future (per Ismael comment), and add more APIs
> later. E.g., for encryption use-case, there could be an argument in doing
> encryption after message compression vs. per-record encryption that could
> be done using the above additional API. There is also more implications for
> every API that modifies records: modifying serialized key/value will again
> impact partition assignment (we will likely do that after partition
> assignment), which may impact log compaction and mirror maker partitioning.
>
>
> Thanks,
> Anna
>
> On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino <tpal...@gmail.com> wrote:
>
> > Finally got a chance to take a look at this. I won’t be able to make the
> > KIP meeting due to a conflict.
> >
> > I’m somewhat disappointed in this proposal. I think that the explicit
> > exclusion of modification of the messages is short-sighted, and not
> > accounting for it now is going to bite us later. Jay, aren’t you the one
> > railing against public interfaces and how difficult they are to work with
> > when you don’t get them right? The “simple” change to one of these
> > interfaces to make it able to return a record is going to be a
> significant
> > change and is going to require all clients to rewrite their interceptors.
> > If we’re not willing to put the time to think through manipulation now,
> > then this KIP should be shelved until we are. Implementing something
> > halfway is going to be worse than taking a little longer. In addition, I
> > don’t believe that manipulation requires anything more than interceptors
> to
> > receive the full record, and then to return it.
> >
> > There are 3 use case I can think of right now without any deep discussion
> > that can make use of interceptors with modification:
> >
> > 1. Auditing. The ability to add metadata to a message for auditing is
> > critical. Hostname, service name, timestamps, etc. are all pieces of data
> > that can be used on the other side of the pipeline to categorize
> messages,
> > determine loss and transport time, and pin down issues. You may say that
> > these things can just be part of the message schema, but anyone who has
> > worked with a multi-user data system (especially those who have been
> > involved with LinkedIn) know how difficult it is to maintain consistent
> > message schemas and to get other people to put in fields for your use.
> >
> > 2. Encryption. This is probably the most obvious case for record
> > manipulation on both sides. The ability to tie in end to end encryption
> is
> > important for data that requires external compliance (PCI, HIPAA, etc.).
> >
> > 3. Routing. By being able to add a bit of information about the source or
> > destination of a message to the metadata, you can easily construct an
> > intelligent mirror maker that can prevent loops. This has the opportunity
> > to result in significant operational savings, as you can get rid of the
> > need for tiered clusters in order to prevent loops in mirroring messages.
> >
> > All three of these share the feature that they add metadata to messages.
> > With the pushback on having arbitrary metadata as an “envelope” to the
> > message, this is a way to provide it and make it the responsibility of
> the
> > client, and not the Kafka broker and system itself.
> >
> > -Todd
> >
> >
> >
> > On Tue, Jan 26, 2016 at 2:30 AM, Ismael Juma <ism...@juma.me.uk> wrote:
> >
> > > Hi Anna and Neha,
> > >
> > > I think it makes a lot of sense to try and keep the interface lean and
> to
> > > add more methods later when/if there is a need. What is the current
> > > thinking with regards to compatibility when/if we add new methods? A
> few
> > > options come to mind:
> > >
> > > 1. Change the interface to an abstract class with empty implementations
> > for
> > > all the methods. This means that the path to adding new methods is
> clear.
> > > 2. Hope we have moved to Java 8 by the time we need to add new methods
> > and
> > > use default methods with an empty implementation for any new method
> (and
> > > potentially make existing methods default methods too at that point for
> > > consistency)
> > > 3. Introduce a new interface that inherits from the existing
> Interceptor
> > > interface when we need to add new methods.
> > >
> > > Option 1 is the easiest and it also means that interceptor users only
> > need
> > > to override the methods that they are interested (more useful if the
> > number
> > > of methods grows). The downside is that interceptor implementations
> > cannot
> > > inherit from another class (a straightforward workaround is to make the
> > > interceptor a forwarder that calls another class). Also, our existing
> > > callbacks are interfaces, so seems a bit inconsistent.
> > >
> > > Option 2 may be the most appealing one as both users and ourselves
> retain
> > > flexibility. The main downside is that it relies on us moving to Java
> 8,
> > > which may be more than a year away potentially (if we support the last
> 2
> > > Java releases).
> > >
> > > Thoughts?
> > >
> > > Ismael
> > >
> > > On Tue, Jan 26, 2016 at 4:59 AM, Neha Narkhede <n...@confluent.io>
> > wrote:
> > >
> > > > Anna,
> > > >
> > > > I'm also in favor of including just the APIs for which we have a
> clear
> > > use
> > > > case. If more use cases for finer monitoring show up in the future,
> we
> > > can
> > > > always update the interface. Would you please highlight in the KIP
> the
> > > APIs
> > > > that you think we have an immediate use for?
> > > >
> > > > Joel,
> > > >
> > > > Broker-side monitoring makes a lot of sense in the long term though I
> > > don't
> > > > think it is a requirement for end-to-end monitoring. With the
> producer
> > > and
> > > > consumer interceptors, you have the ability to get full
> > > > publish-to-subscribe end-to-end monitoring. The broker interceptor
> > > > certainly improves the resolution of monitoring but it is also a
> > riskier
> > > > change. I prefer an incremental approach over a big-bang and
> recommend
> > > > taking baby-steps. Let's first make sure the producer/consumer
> > > interceptors
> > > > are successful. And then come back and add the broker interceptor
> > > > carefully.
> > > >
> > > > Having said that, it would be great to understand your proposal for
> the
> > > > broker interceptor independently. We can either add an interceptor
> > > > on-append or on-commit. If people want to use this for monitoring,
> then
> > > > possibly on-commit might be more useful?
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > > On Mon, Jan 25, 2016 at 6:47 PM, Jay Kreps <j...@confluent.io> wrote:
> > > >
> > > > > Hey Joel,
> > > > >
> > > > > What is the interface you are thinking of? Something like this:
> > > > >     onAppend(String topic, int partition, Records records, long
> time)
> > > > > ?
> > > > >
> > > > > One challenge right now is that we are still using the old
> > > > > Message/MessageSet classes on the broker which I'm not sure if we'd
> > > want
> > > > to
> > > > > support over the long haul but it might be okay just to create the
> > > > records
> > > > > instance for this interface.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy <jjkosh...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > I'm definitely in favor of having such hooks in the
> produce/consume
> > > > > > life-cycle. Not sure if people remember this but in Kafka 0.7
> this
> > > was
> > > > > > pretty much how it was:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > > > > > i.e., we had something similar to the interceptor proposal for
> > > various
> > > > > > stages of the producer request. The producer provided call-backs
> > for
> > > > > > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc.
> So
> > > at
> > > > > > LinkedIn we in fact did auditing within these call-backs (and not
> > > > > > explicitly in the wrapper). Over time and with 0.8 we moved that
> > out
> > > to
> > > > > the
> > > > > > wrapper libraries.
> > > > > >
> > > > > > On a side-note while audit and other monitoring can be done
> > > internally
> > > > > in a
> > > > > > convenient way I think it should be clarified that having a
> wrapper
> > > is
> > > > in
> > > > > > general not a bad idea and I would even consider it to be a
> > > > > best-practice.
> > > > > > Even with 0.7 we still had a wrapper library and that API has
> > largely
> > > > > > stayed the same and has helped protect against (sometimes
> backwards
> > > > > > incompatible) changes in open source.
> > > > > >
> > > > > > While we are on this topic I have one comment and Anna, you may
> > have
> > > > > > already considered this but I don't see mention of it in the KIP:
> > > > > >
> > > > > > Add a custom message interceptor/validator on the broker on
> message
> > > > > > arrival.
> > > > > >
> > > > > > We decompress and do basic validation of messages on arrival. I
> > think
> > > > > there
> > > > > > is value in supporting custom validation and expand it to support
> > > > custom
> > > > > > on-arrival processing. Here is a specific use-case I have in
> mind.
> > > The
> > > > > blog
> > > > > > that James referenced describes our auditing infrastructure. In
> > order
> > > > to
> > > > > > audit the Kafka cluster itself we need to run a "console auditor"
> > > > service
> > > > > > that consumes everything and spits out audit events back to the
> > > > cluster.
> > > > > I
> > > > > > would prefer not having to run this service because:
> > > > > >
> > > > > >    - Well, it is one more service that we have to run and monitor
> > > > > >    - Consuming everything takes up bandwidth which can be avoided
> > > > > >    - The console auditor consumer itself can lag and cause
> > temporary
> > > > > audit
> > > > > >    discrepancies
> > > > > >
> > > > > > One way we can mitigate this is by having mirror-makers in
> between
> > > > > clusters
> > > > > > emit audit events. The problem is that the very last cluster in
> the
> > > > > > pipeline will not have any audit which is why we need to have
> > > something
> > > > > to
> > > > > > audit the cluster.
> > > > > >
> > > > > > If we had a custom message validator then the audit can be done
> > > > > on-arrival
> > > > > > and we won't need a console auditor.
> > > > > >
> > > > > > One potential issue in this approach and any elaborate on-arrival
> > > > > > processing for that matter is that you may need to deserialize
> the
> > > > > message
> > > > > > as well which can drive up produce request handling times.
> However
> > > I'm
> > > > > not
> > > > > > terribly concerned about that especially if the audit header can
> be
> > > > > > separated out easily or even deserialized partially as this Avro
> > > thread
> > > > > > touches on
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Joel
> > > > > >
> > > > > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat <
> > > > > > gharatmayures...@gmail.com> wrote:
> > > > > >
> > > > > > > Nice KIP. Excellent idea.
> > > > > > > Was just thinking if we can add onDequed() to the
> > > ProducerIterceptor
> > > > > > > interface. Since we have the onEnqueued(), it will help the
> > client
> > > or
> > > > > the
> > > > > > > tools to know how much time the message spent in the
> > > > RecordAccumulator.
> > > > > > > Also an API to check if there are any messages left for a
> > > particular
> > > > > > topic
> > > > > > > in the RecordAccumulator would help.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Mayuresh
> > > > > > >
> > > > > > > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino <
> tpal...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Great idea. I’ve been talking about this for 2 years, and I’m
> > > glad
> > > > > > > someone
> > > > > > > > is finally picking it up. Will take a look at the KIP at some
> > > point
> > > > > > > > shortly.
> > > > > > > >
> > > > > > > > -Todd
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps <
> j...@confluent.io>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Becket,
> > > > > > > > >
> > > > > > > > > Yeah this is really similar to the callback. The difference
> > is
> > > > > really
> > > > > > > in
> > > > > > > > > who sets the behavior. The idea of the interceptor is that
> it
> > > > > doesn't
> > > > > > > > > require any code change in apps so you can globally add
> > > behavior
> > > > to
> > > > > > > your
> > > > > > > > > Kafka usage without changing app code. Whereas the callback
> > is
> > > > > added
> > > > > > by
> > > > > > > > the
> > > > > > > > > app. The idea is to kind of obviate the need for the
> wrapper
> > > code
> > > > > > that
> > > > > > > > e.g.
> > > > > > > > > LinkedIn maintains to hold this kind of stuff.
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin <
> > > > becket....@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > This could be a useful feature. And I think there are
> some
> > > use
> > > > > > cases
> > > > > > > to
> > > > > > > > > > mutate the data like rejected alternative one mentioned.
> > > > > > > > > >
> > > > > > > > > > I am wondering if there is functional overlapping between
> > > > > > > > > > ProducerInterceptor.onAcknowledgement() and the producer
> > > > > callback?
> > > > > > I
> > > > > > > > can
> > > > > > > > > > see that the Callback could be a per record setting while
> > > > > > > > > > onAcknowledgement() is a producer level setting. Other
> than
> > > > that,
> > > > > > is
> > > > > > > > > there
> > > > > > > > > > any difference between them?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > >
> > > > > > > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede <
> > > > > n...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > James,
> > > > > > > > > > >
> > > > > > > > > > > That is one of the many monitoring use cases for the
> > > > > interceptor
> > > > > > > > > > interface.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Neha
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng <
> > > > jch...@tivo.com>
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Anna,
> > > > > > > > > > > >
> > > > > > > > > > > > I'm trying to understand a concrete use case. It
> sounds
> > > > like
> > > > > > > > producer
> > > > > > > > > > > > interceptors could be used to implement part of
> > > LinkedIn's
> > > > > > Kafak
> > > > > > > > > Audit
> > > > > > > > > > > > tool?
> > > > > > https://engineering.linkedin.com/kafka/running-kafka-scale
> > > > > > > > > > > >
> > > > > > > > > > > > Part of that is done by a wrapper library around the
> > > kafka
> > > > > > > producer
> > > > > > > > > > that
> > > > > > > > > > > > keeps a count of the number of messages produced, and
> > > then
> > > > > > sends
> > > > > > > > that
> > > > > > > > > > > count
> > > > > > > > > > > > to a side-topic. It sounds like the producer
> > interceptors
> > > > > could
> > > > > > > > > > possibly
> > > > > > > > > > > be
> > > > > > > > > > > > used to implement that?
> > > > > > > > > > > >
> > > > > > > > > > > > -James
> > > > > > > > > > > >
> > > > > > > > > > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner <
> > > > > a...@confluent.io
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I just created a KIP-42 for adding producer and
> > > consumer
> > > > > > > > > interceptors
> > > > > > > > > > > for
> > > > > > > > > > > > > intercepting messages at different points on
> producer
> > > and
> > > > > > > > consumer.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > > > > > > > > > > >
> > > > > > > > > > > > > Comments and suggestions are welcome!
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Anna
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > ________________________________
> > > > > > > > > > > >
> > > > > > > > > > > > This email and any attachments may contain
> confidential
> > > and
> > > > > > > > > privileged
> > > > > > > > > > > > material for the sole use of the intended recipient.
> > Any
> > > > > > review,
> > > > > > > > > > copying,
> > > > > > > > > > > > or distribution of this email (or any attachments) by
> > > > others
> > > > > is
> > > > > > > > > > > prohibited.
> > > > > > > > > > > > If you are not the intended recipient, please contact
> > the
> > > > > > sender
> > > > > > > > > > > > immediately and permanently delete this email and any
> > > > > > > attachments.
> > > > > > > > No
> > > > > > > > > > > > employee or agent of TiVo Inc. is authorized to
> > conclude
> > > > any
> > > > > > > > binding
> > > > > > > > > > > > agreement on behalf of TiVo Inc. by email. Binding
> > > > agreements
> > > > > > > with
> > > > > > > > > TiVo
> > > > > > > > > > > > Inc. may only be made by a signed written agreement.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Neha
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > *—-*
> > > > > > > > *Todd Palino*
> > > > > > > > Staff Site Reliability Engineer
> > > > > > > > Data Infrastructure Streaming
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > linkedin.com/in/toddpalino
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -Regards,
> > > > > > > Mayuresh R. Gharat
> > > > > > > (862) 250-7125
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Neha
> > > >
> > >
> >
> >
> >
> > --
> > *—-*
> > *Todd Palino*
> > Staff Site Reliability Engineer
> > Data Infrastructure Streaming
> >
> >
> >
> > linkedin.com/in/toddpalino
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to