Mayuresh, Regarding your use case about mirror maker. Is it good enough as long as we know there is no message for the topic in the producer anymore? If that is the case, call producer.flush() is sufficient.
Thanks, Jiangjie (Becket) Qin On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat <gharatmayures...@gmail.com > wrote: > Hi Anna, > > Thanks a lot for summarizing the discussion on this kip. > > It LGTM. > This is really nice : > We decided not to add any callbacks to producer and consumer > interceptors that will depend on internal implementation as part of this > KIP. > *However, it is possible to add them later as part of another KIP if there > are good use-cases.* > > Do you agree with the use case I explained earlier for knowing the number > of records left in the RecordAccumulator for a particular topic. It might > be orthogonal to this KIP, but will be helpful. What do you think? > > Thanks, > > Mayuresh > > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino <tpal...@gmail.com> wrote: > > > This looks good. As noted, having one mutable interceptor on each side > > allows for the use cases we can envision right now, and I think that’s > > going to provide a great deal of opportunity for implementing things like > > audit, especially within a multi-tenant environment. Looking forward to > > getting this available in the clients. > > > > Thanks! > > > > -Todd > > > > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner <a...@confluent.io> wrote: > > > > > Hi All, > > > > > > Here is meeting notes from today’s KIP meeting: > > > > > > 1. We agreed to keep the scope of this KIP to be producer and consumer > > > interceptors only. Broker-side interceptor will be added later as a > > > separate KIP. The reasons were already mentioned in this thread, but > the > > > summary is: > > > * Broker interceptor is riskier and requires careful consideration > about > > > overheads, whether to intercept leaders vs. leaders/replicas, what to > do > > on > > > leader failover and so on. > > > * Broker interceptors increase monitoring resolution, but not > including > > it > > > in this KIP does not reduce usefulness of producer and consumer > > > interceptors that enable end-to-end monitoring > > > > > > 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor > > callbacks > > > to minimal set of mutable API that are not dependent on producer and > > > consumer internal implementation. > > > > > > ProducerInterceptor: > > > *ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);* > > > *void onAcknowledgement(RecordMetadata metadata, Exception exception);* > > > > > > ConsumerInterceptor: > > > *ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);* > > > *void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);* > > > > > > We will allow interceptors to modify ProducerRecord on producer side, > and > > > modify ConsumerRecords on consumer side. This will support end-to-end > > > monitoring and auditing and support the ability to add metadata for a > > > message. This will support Todd’s Auditing and Routing use-cases. > > > > > > We did not find any use-case for modifying records in onConsume() > > callback, > > > but decided to enable modification of consumer records for symmetry > with > > > onSend(). > > > > > > 3. We agreed to ensure compatibility when/if we add new methods to > > > ProducerInterceptor and ConsumerInterceptor by using default methods > with > > > an empty implementation. Ok to assume Java 8. (This is Ismael’s method > > #2). > > > > > > 4. We decided not to add any callbacks to producer and consumer > > > interceptors that will depend on internal implementation as part of > this > > > KIP. However, it is possible to add them later as part of another KIP > if > > > there are good use-cases. > > > > > > *Reasoning.* We did not have concrete use-cases that justified more > > methods > > > at this point. Some of the use-cases were for more fine-grain latency > > > collection, which could be done with Kafka Metrics. Another use-case > was > > > encryption. However, there are several design options for encryption. > One > > > is to do per-record encryption which would require adding > > > ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive(). > One > > > could argue that in that case encryption could be done by adding a > custom > > > serializer/deserializer. Another option is to do encryption after > message > > > gets compressed, but there are issues that arise regarding broker doing > > > re-compression. We decided that it is better to have that discussion > in a > > > separate KIP and decide that this is something we want to do with > > > interceptors or by other means. > > > > > > > > > Todd, Mayuresh and others who missed the KIP meeting, please let me > know > > > your thoughts on the scope we agreed on during the meeting. > > > > > > I will update the KIP proposal with the current decision by end of > today. > > > > > > Thanks, > > > Anna > > > > > > > > > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat < > > > gharatmayures...@gmail.com> wrote: > > > > > > > Hi, > > > > > > > > I won't be able to make it to KIP hangout due to conflict. > > > > > > > > Anna, here is the use case where knowing if there are messages in the > > > > RecordAccumulator left to be sent to the kafka cluster for a topic is > > > > useful. > > > > > > > > 1) Consider a pipeline : > > > > A ---> Mirror-maker -----> B > > > > > > > > 2) We have a topic T in cluster A mirrored to cluster B. > > > > > > > > 3) Now if we delete topic T in A and immediately proceed to delete > the > > > > topic in cluster B, some of the the Mirror-maker machines die because > > > > atleast one of the batches in RecordAccumulator for topic T fail to > be > > > > produced to cluster B. We have seen this happening in our clusters. > > > > > > > > > > > > If we know that there are no more messages left in the > > RecordAccumulator > > > to > > > > be produced to cluster B, we can safely delete the topic in cluster B > > > > without disturbing the pipeline. > > > > > > > > Thanks, > > > > > > > > Mayuresh > > > > > > > > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner <a...@confluent.io> > > > wrote: > > > > > > > > > Thanks Ismael and Todd for your feedback! > > > > > > > > > > I agree about coming up with lean, but useful interfaces that will > be > > > > easy > > > > > to extend later. > > > > > > > > > > When we discuss the minimal set of producer and consumer > interceptor > > > API > > > > in > > > > > today’s KIP meeting (discussion item #2 in my previous email), lets > > > > compare > > > > > two options: > > > > > > > > > > *1. Minimal set of immutable API for producer and consumer > > > interceptors* > > > > > > > > > > ProducerInterceptor: > > > > > public void onSend(ProducerRecord<K, V> record); > > > > > public void onAcknowledgement(RecordMetadata metadata, Exception > > > > > exception); > > > > > > > > > > ConsumerInterceptor: > > > > > public void onConsume(ConsumerRecords<K, V> records); > > > > > public void onCommit(Map<TopicPartition, OffsetAndMetadata> > offsets); > > > > > > > > > > Use-cases: > > > > > — end-to-end monitoring; custom tracing and logging > > > > > > > > > > > > > > > *2. Minimal set of mutable API for producer and consumer > > interceptors* > > > > > > > > > > ProducerInterceptor: > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); > > > > > void onAcknowledgement(RecordMetadata metadata, Exception > exception); > > > > > > > > > > ConsumerInterceptor: > > > > > void onConsume(ConsumerRecords<K, V> records); > > > > > void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); > > > > > > > > > > Additional use-cases to #1: > > > > > — Ability to add metadata to a message or fill in standard fields > for > > > > audit > > > > > and routing. > > > > > > > > > > Implications > > > > > — Partition assignment will be done based on modified key/value > > instead > > > > of > > > > > original key/value. If key/value transformation is not consistent > > (same > > > > key > > > > > and value does not mutate to the same, but modified, key/value), > then > > > log > > > > > compaction would not work. However, audit and routing use-cases > from > > > Todd > > > > > will likely do consistent transformation. > > > > > > > > > > > > > > > *Additional callbacks (discussion item #3 in my previous email):* > > > > > > > > > > If we want to support encryption, we would want to be able to > modify > > > > > serialized key/value, rather than key and value objects. This will > > add > > > > the > > > > > following API to producer and consumer interceptors: > > > > > > > > > > ProducerInterceptor: > > > > > SerializedKeyValue onEnqueued(TopicPartition tp, ProducerRecord<K, > V> > > > > > record, SerializedKeyValue serializedKeyValue); > > > > > > > > > > ConsumerInterceptor: > > > > > SerializedKeyValue onReceive(TopicPartition tp, SerializedKeyValue > > > > > serializedKeyValue); > > > > > > > > > > > > > > > I am leaning towards implementing the minimal set of immutable or > > > mutable > > > > > interfaces, making sure that we have a compatibility plan that > allows > > > us > > > > to > > > > > add more callbacks in the future (per Ismael comment), and add more > > > APIs > > > > > later. E.g., for encryption use-case, there could be an argument in > > > doing > > > > > encryption after message compression vs. per-record encryption that > > > could > > > > > be done using the above additional API. There is also more > > implications > > > > for > > > > > every API that modifies records: modifying serialized key/value > will > > > > again > > > > > impact partition assignment (we will likely do that after partition > > > > > assignment), which may impact log compaction and mirror maker > > > > partitioning. > > > > > > > > > > > > > > > Thanks, > > > > > Anna > > > > > > > > > > On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino <tpal...@gmail.com> > > > wrote: > > > > > > > > > > > Finally got a chance to take a look at this. I won’t be able to > > make > > > > the > > > > > > KIP meeting due to a conflict. > > > > > > > > > > > > I’m somewhat disappointed in this proposal. I think that the > > explicit > > > > > > exclusion of modification of the messages is short-sighted, and > not > > > > > > accounting for it now is going to bite us later. Jay, aren’t you > > the > > > > one > > > > > > railing against public interfaces and how difficult they are to > > work > > > > with > > > > > > when you don’t get them right? The “simple” change to one of > these > > > > > > interfaces to make it able to return a record is going to be a > > > > > significant > > > > > > change and is going to require all clients to rewrite their > > > > interceptors. > > > > > > If we’re not willing to put the time to think through > manipulation > > > now, > > > > > > then this KIP should be shelved until we are. Implementing > > something > > > > > > halfway is going to be worse than taking a little longer. In > > > addition, > > > > I > > > > > > don’t believe that manipulation requires anything more than > > > > interceptors > > > > > to > > > > > > receive the full record, and then to return it. > > > > > > > > > > > > There are 3 use case I can think of right now without any deep > > > > discussion > > > > > > that can make use of interceptors with modification: > > > > > > > > > > > > 1. Auditing. The ability to add metadata to a message for > auditing > > is > > > > > > critical. Hostname, service name, timestamps, etc. are all pieces > > of > > > > data > > > > > > that can be used on the other side of the pipeline to categorize > > > > > messages, > > > > > > determine loss and transport time, and pin down issues. You may > say > > > > that > > > > > > these things can just be part of the message schema, but anyone > who > > > has > > > > > > worked with a multi-user data system (especially those who have > > been > > > > > > involved with LinkedIn) know how difficult it is to maintain > > > consistent > > > > > > message schemas and to get other people to put in fields for your > > > use. > > > > > > > > > > > > 2. Encryption. This is probably the most obvious case for record > > > > > > manipulation on both sides. The ability to tie in end to end > > > encryption > > > > > is > > > > > > important for data that requires external compliance (PCI, HIPAA, > > > > etc.). > > > > > > > > > > > > 3. Routing. By being able to add a bit of information about the > > > source > > > > or > > > > > > destination of a message to the metadata, you can easily > construct > > an > > > > > > intelligent mirror maker that can prevent loops. This has the > > > > opportunity > > > > > > to result in significant operational savings, as you can get rid > of > > > the > > > > > > need for tiered clusters in order to prevent loops in mirroring > > > > messages. > > > > > > > > > > > > All three of these share the feature that they add metadata to > > > > messages. > > > > > > With the pushback on having arbitrary metadata as an “envelope” > to > > > the > > > > > > message, this is a way to provide it and make it the > responsibility > > > of > > > > > the > > > > > > client, and not the Kafka broker and system itself. > > > > > > > > > > > > -Todd > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 26, 2016 at 2:30 AM, Ismael Juma <ism...@juma.me.uk> > > > > wrote: > > > > > > > > > > > > > Hi Anna and Neha, > > > > > > > > > > > > > > I think it makes a lot of sense to try and keep the interface > > lean > > > > and > > > > > to > > > > > > > add more methods later when/if there is a need. What is the > > current > > > > > > > thinking with regards to compatibility when/if we add new > > methods? > > > A > > > > > few > > > > > > > options come to mind: > > > > > > > > > > > > > > 1. Change the interface to an abstract class with empty > > > > implementations > > > > > > for > > > > > > > all the methods. This means that the path to adding new methods > > is > > > > > clear. > > > > > > > 2. Hope we have moved to Java 8 by the time we need to add new > > > > methods > > > > > > and > > > > > > > use default methods with an empty implementation for any new > > method > > > > > (and > > > > > > > potentially make existing methods default methods too at that > > point > > > > for > > > > > > > consistency) > > > > > > > 3. Introduce a new interface that inherits from the existing > > > > > Interceptor > > > > > > > interface when we need to add new methods. > > > > > > > > > > > > > > Option 1 is the easiest and it also means that interceptor > users > > > only > > > > > > need > > > > > > > to override the methods that they are interested (more useful > if > > > the > > > > > > number > > > > > > > of methods grows). The downside is that interceptor > > implementations > > > > > > cannot > > > > > > > inherit from another class (a straightforward workaround is to > > make > > > > the > > > > > > > interceptor a forwarder that calls another class). Also, our > > > existing > > > > > > > callbacks are interfaces, so seems a bit inconsistent. > > > > > > > > > > > > > > Option 2 may be the most appealing one as both users and > > ourselves > > > > > retain > > > > > > > flexibility. The main downside is that it relies on us moving > to > > > Java > > > > > 8, > > > > > > > which may be more than a year away potentially (if we support > the > > > > last > > > > > 2 > > > > > > > Java releases). > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > On Tue, Jan 26, 2016 at 4:59 AM, Neha Narkhede < > > n...@confluent.io> > > > > > > wrote: > > > > > > > > > > > > > > > Anna, > > > > > > > > > > > > > > > > I'm also in favor of including just the APIs for which we > have > > a > > > > > clear > > > > > > > use > > > > > > > > case. If more use cases for finer monitoring show up in the > > > future, > > > > > we > > > > > > > can > > > > > > > > always update the interface. Would you please highlight in > the > > > KIP > > > > > the > > > > > > > APIs > > > > > > > > that you think we have an immediate use for? > > > > > > > > > > > > > > > > Joel, > > > > > > > > > > > > > > > > Broker-side monitoring makes a lot of sense in the long term > > > > though I > > > > > > > don't > > > > > > > > think it is a requirement for end-to-end monitoring. With the > > > > > producer > > > > > > > and > > > > > > > > consumer interceptors, you have the ability to get full > > > > > > > > publish-to-subscribe end-to-end monitoring. The broker > > > interceptor > > > > > > > > certainly improves the resolution of monitoring but it is > also > > a > > > > > > riskier > > > > > > > > change. I prefer an incremental approach over a big-bang and > > > > > recommend > > > > > > > > taking baby-steps. Let's first make sure the > producer/consumer > > > > > > > interceptors > > > > > > > > are successful. And then come back and add the broker > > interceptor > > > > > > > > carefully. > > > > > > > > > > > > > > > > Having said that, it would be great to understand your > proposal > > > for > > > > > the > > > > > > > > broker interceptor independently. We can either add an > > > interceptor > > > > > > > > on-append or on-commit. If people want to use this for > > > monitoring, > > > > > then > > > > > > > > possibly on-commit might be more useful? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Neha > > > > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 6:47 PM, Jay Kreps <j...@confluent.io > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey Joel, > > > > > > > > > > > > > > > > > > What is the interface you are thinking of? Something like > > this: > > > > > > > > > onAppend(String topic, int partition, Records records, > > long > > > > > time) > > > > > > > > > ? > > > > > > > > > > > > > > > > > > One challenge right now is that we are still using the old > > > > > > > > > Message/MessageSet classes on the broker which I'm not sure > > if > > > > we'd > > > > > > > want > > > > > > > > to > > > > > > > > > support over the long haul but it might be okay just to > > create > > > > the > > > > > > > > records > > > > > > > > > instance for this interface. > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy < > > > > jjkosh...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > I'm definitely in favor of having such hooks in the > > > > > produce/consume > > > > > > > > > > life-cycle. Not sure if people remember this but in Kafka > > 0.7 > > > > > this > > > > > > > was > > > > > > > > > > pretty much how it was: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala > > > > > > > > > > i.e., we had something similar to the interceptor > proposal > > > for > > > > > > > various > > > > > > > > > > stages of the producer request. The producer provided > > > > call-backs > > > > > > for > > > > > > > > > > beforeEnqueue, afterEnqueue, afterDequeuing, > beforeSending, > > > > etc. > > > > > So > > > > > > > at > > > > > > > > > > LinkedIn we in fact did auditing within these call-backs > > (and > > > > not > > > > > > > > > > explicitly in the wrapper). Over time and with 0.8 we > moved > > > > that > > > > > > out > > > > > > > to > > > > > > > > > the > > > > > > > > > > wrapper libraries. > > > > > > > > > > > > > > > > > > > > On a side-note while audit and other monitoring can be > done > > > > > > > internally > > > > > > > > > in a > > > > > > > > > > convenient way I think it should be clarified that > having a > > > > > wrapper > > > > > > > is > > > > > > > > in > > > > > > > > > > general not a bad idea and I would even consider it to > be a > > > > > > > > > best-practice. > > > > > > > > > > Even with 0.7 we still had a wrapper library and that API > > has > > > > > > largely > > > > > > > > > > stayed the same and has helped protect against (sometimes > > > > > backwards > > > > > > > > > > incompatible) changes in open source. > > > > > > > > > > > > > > > > > > > > While we are on this topic I have one comment and Anna, > you > > > may > > > > > > have > > > > > > > > > > already considered this but I don't see mention of it in > > the > > > > KIP: > > > > > > > > > > > > > > > > > > > > Add a custom message interceptor/validator on the broker > on > > > > > message > > > > > > > > > > arrival. > > > > > > > > > > > > > > > > > > > > We decompress and do basic validation of messages on > > > arrival. I > > > > > > think > > > > > > > > > there > > > > > > > > > > is value in supporting custom validation and expand it to > > > > support > > > > > > > > custom > > > > > > > > > > on-arrival processing. Here is a specific use-case I have > > in > > > > > mind. > > > > > > > The > > > > > > > > > blog > > > > > > > > > > that James referenced describes our auditing > > infrastructure. > > > In > > > > > > order > > > > > > > > to > > > > > > > > > > audit the Kafka cluster itself we need to run a "console > > > > auditor" > > > > > > > > service > > > > > > > > > > that consumes everything and spits out audit events back > to > > > the > > > > > > > > cluster. > > > > > > > > > I > > > > > > > > > > would prefer not having to run this service because: > > > > > > > > > > > > > > > > > > > > - Well, it is one more service that we have to run and > > > > monitor > > > > > > > > > > - Consuming everything takes up bandwidth which can be > > > > avoided > > > > > > > > > > - The console auditor consumer itself can lag and > cause > > > > > > temporary > > > > > > > > > audit > > > > > > > > > > discrepancies > > > > > > > > > > > > > > > > > > > > One way we can mitigate this is by having mirror-makers > in > > > > > between > > > > > > > > > clusters > > > > > > > > > > emit audit events. The problem is that the very last > > cluster > > > in > > > > > the > > > > > > > > > > pipeline will not have any audit which is why we need to > > have > > > > > > > something > > > > > > > > > to > > > > > > > > > > audit the cluster. > > > > > > > > > > > > > > > > > > > > If we had a custom message validator then the audit can > be > > > done > > > > > > > > > on-arrival > > > > > > > > > > and we won't need a console auditor. > > > > > > > > > > > > > > > > > > > > One potential issue in this approach and any elaborate > > > > on-arrival > > > > > > > > > > processing for that matter is that you may need to > > > deserialize > > > > > the > > > > > > > > > message > > > > > > > > > > as well which can drive up produce request handling > times. > > > > > However > > > > > > > I'm > > > > > > > > > not > > > > > > > > > > terribly concerned about that especially if the audit > > header > > > > can > > > > > be > > > > > > > > > > separated out easily or even deserialized partially as > this > > > > Avro > > > > > > > thread > > > > > > > > > > touches on > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+ > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Joel > > > > > > > > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat < > > > > > > > > > > gharatmayures...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > Nice KIP. Excellent idea. > > > > > > > > > > > Was just thinking if we can add onDequed() to the > > > > > > > ProducerIterceptor > > > > > > > > > > > interface. Since we have the onEnqueued(), it will help > > the > > > > > > client > > > > > > > or > > > > > > > > > the > > > > > > > > > > > tools to know how much time the message spent in the > > > > > > > > RecordAccumulator. > > > > > > > > > > > Also an API to check if there are any messages left > for a > > > > > > > particular > > > > > > > > > > topic > > > > > > > > > > > in the RecordAccumulator would help. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > Mayuresh > > > > > > > > > > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino < > > > > > tpal...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Great idea. I’ve been talking about this for 2 years, > > and > > > > I’m > > > > > > > glad > > > > > > > > > > > someone > > > > > > > > > > > > is finally picking it up. Will take a look at the KIP > > at > > > > some > > > > > > > point > > > > > > > > > > > > shortly. > > > > > > > > > > > > > > > > > > > > > > > > -Todd > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps < > > > > > j...@confluent.io> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hey Becket, > > > > > > > > > > > > > > > > > > > > > > > > > > Yeah this is really similar to the callback. The > > > > difference > > > > > > is > > > > > > > > > really > > > > > > > > > > > in > > > > > > > > > > > > > who sets the behavior. The idea of the interceptor > is > > > > that > > > > > it > > > > > > > > > doesn't > > > > > > > > > > > > > require any code change in apps so you can globally > > add > > > > > > > behavior > > > > > > > > to > > > > > > > > > > > your > > > > > > > > > > > > > Kafka usage without changing app code. Whereas the > > > > callback > > > > > > is > > > > > > > > > added > > > > > > > > > > by > > > > > > > > > > > > the > > > > > > > > > > > > > app. The idea is to kind of obviate the need for > the > > > > > wrapper > > > > > > > code > > > > > > > > > > that > > > > > > > > > > > > e.g. > > > > > > > > > > > > > LinkedIn maintains to hold this kind of stuff. > > > > > > > > > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin < > > > > > > > > becket....@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > This could be a useful feature. And I think there > > are > > > > > some > > > > > > > use > > > > > > > > > > cases > > > > > > > > > > > to > > > > > > > > > > > > > > mutate the data like rejected alternative one > > > > mentioned. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am wondering if there is functional overlapping > > > > between > > > > > > > > > > > > > > ProducerInterceptor.onAcknowledgement() and the > > > > producer > > > > > > > > > callback? > > > > > > > > > > I > > > > > > > > > > > > can > > > > > > > > > > > > > > see that the Callback could be a per record > setting > > > > while > > > > > > > > > > > > > > onAcknowledgement() is a producer level setting. > > > Other > > > > > than > > > > > > > > that, > > > > > > > > > > is > > > > > > > > > > > > > there > > > > > > > > > > > > > > any difference between them? > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede < > > > > > > > > > n...@confluent.io> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > James, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > That is one of the many monitoring use cases > for > > > the > > > > > > > > > interceptor > > > > > > > > > > > > > > interface. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Neha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng < > > > > > > > > jch...@tivo.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Anna, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm trying to understand a concrete use case. > > It > > > > > sounds > > > > > > > > like > > > > > > > > > > > > producer > > > > > > > > > > > > > > > > interceptors could be used to implement part > of > > > > > > > LinkedIn's > > > > > > > > > > Kafak > > > > > > > > > > > > > Audit > > > > > > > > > > > > > > > > tool? > > > > > > > > > > > https://engineering.linkedin.com/kafka/running-kafka-scale > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Part of that is done by a wrapper library > > around > > > > the > > > > > > > kafka > > > > > > > > > > > producer > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > keeps a count of the number of messages > > produced, > > > > and > > > > > > > then > > > > > > > > > > sends > > > > > > > > > > > > that > > > > > > > > > > > > > > > count > > > > > > > > > > > > > > > > to a side-topic. It sounds like the producer > > > > > > interceptors > > > > > > > > > could > > > > > > > > > > > > > > possibly > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > used to implement that? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -James > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner < > > > > > > > > > a...@confluent.io > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I just created a KIP-42 for adding producer > > and > > > > > > > consumer > > > > > > > > > > > > > interceptors > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > intercepting messages at different points > on > > > > > producer > > > > > > > and > > > > > > > > > > > > consumer. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Comments and suggestions are welcome! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > Anna > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ________________________________ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This email and any attachments may contain > > > > > confidential > > > > > > > and > > > > > > > > > > > > > privileged > > > > > > > > > > > > > > > > material for the sole use of the intended > > > > recipient. > > > > > > Any > > > > > > > > > > review, > > > > > > > > > > > > > > copying, > > > > > > > > > > > > > > > > or distribution of this email (or any > > > attachments) > > > > by > > > > > > > > others > > > > > > > > > is > > > > > > > > > > > > > > > prohibited. > > > > > > > > > > > > > > > > If you are not the intended recipient, please > > > > contact > > > > > > the > > > > > > > > > > sender > > > > > > > > > > > > > > > > immediately and permanently delete this email > > and > > > > any > > > > > > > > > > > attachments. > > > > > > > > > > > > No > > > > > > > > > > > > > > > > employee or agent of TiVo Inc. is authorized > to > > > > > > conclude > > > > > > > > any > > > > > > > > > > > > binding > > > > > > > > > > > > > > > > agreement on behalf of TiVo Inc. by email. > > > Binding > > > > > > > > agreements > > > > > > > > > > > with > > > > > > > > > > > > > TiVo > > > > > > > > > > > > > > > > Inc. may only be made by a signed written > > > > agreement. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Neha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > *—-* > > > > > > > > > > > > *Todd Palino* > > > > > > > > > > > > Staff Site Reliability Engineer > > > > > > > > > > > > Data Infrastructure Streaming > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > linkedin.com/in/toddpalino > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > -Regards, > > > > > > > > > > > Mayuresh R. Gharat > > > > > > > > > > > (862) 250-7125 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Thanks, > > > > > > > > Neha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > *—-* > > > > > > *Todd Palino* > > > > > > Staff Site Reliability Engineer > > > > > > Data Infrastructure Streaming > > > > > > > > > > > > > > > > > > > > > > > > linkedin.com/in/toddpalino > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -Regards, > > > > Mayuresh R. Gharat > > > > (862) 250-7125 > > > > > > > > > > > > > > > -- > > *—-* > > *Todd Palino* > > Staff Site Reliability Engineer > > Data Infrastructure Streaming > > > > > > > > linkedin.com/in/toddpalino > > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 >