On a second thought, yes, I think we should expose record size that represents application bytes. This is Becket's option #1.
I updated the KIP wiki with new fields in RecordMetadata and ConsumerRecord. I would like to start a voting thread tomorrow if there are no objections or more things to discuss. Thanks, Anna On Thu, Jan 28, 2016 at 2:25 PM, Anna Povzner <[email protected]> wrote: > Regarding record size as bytes sent over the wire, my concern is that it > is almost impossible to calculate per-record. We could do as: 1) compressed > bytes / number of records in a compressed message, as Todd mentioned; or 2) > or same as #1 but take it proportional to uncompressed record size vs. > total uncompressed size of records. All of these calculations give us an > estimate. So maybe record size as bytes sent over the wire is not a > per-record metadata, but rather per topic/partition measure that is better > to be exposed through metrics? > > > On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino <[email protected]> wrote: > >> It may be difficult (or nearly impossible) to get actual compressed bytes >> for a message from a compressed batch, but I do think it’s useful >> information to have available for the very reason noted, bandwidth >> consumed. Does it make sense to have an interceptor at the batch level >> that >> can provide this? The other option is to estimate it (such as making an >> assumption that the messages in a batch are equal in size, which is not >> necessarily true), which is probably not the right answer. >> >> -Todd >> >> >> On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner <[email protected]> wrote: >> >> > Hi Becket, >> > >> > It will be up to the interceptor to implement their audit or monitoring >> > strategy. I would also imagine there is more than one good way to do >> audit. >> > So, I agree that some of the interceptors may not use CRC, and we will >> not >> > require it. The question is now whether intercepting CRCs is needed. I >> > think they are very useful for monitoring and audit, because CRC >> provides >> > an a easy way to get a summary of a message, rather than using message >> > bytes or key/value objects. >> > >> > Regarding record size, I agree that bandwidth example was not a good >> one. I >> > think it would be hard to get actual bytes sent over the wire (your #2), >> > since multiple records get compressed together and we would need to >> decide >> > which bytes to account to which record. So I am inclined to only do your >> > #1. However, it still makes more sense to me just to return record size >> > including the header, since this is the actual record size. >> > >> > Thanks, >> > Anna >> > >> > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin <[email protected]> >> wrote: >> > >> > > Anna, >> > > >> > > Using CRC to do end2end auditing might be very costly because you will >> > need >> > > to collect all the CRC from both producer and consumer. And it is >> based >> > on >> > > the assumption that broker does not modify the record. >> > > Can you shed some idea on how end to end auditing will be using the >> CRC >> > > before we decide to expose such low level detail to the end user? It >> > would >> > > also be helpful if you can compare it with something like sequence >> number >> > > based auditing. >> > > >> > > About the record size, one thing worth notice is that the size of >> Record >> > is >> > > not the actual bytes sent over the wire if we use compression. So that >> > does >> > > not really tell user how much bandwidth they are using. Personally I >> > think >> > > two kinds of size may be useful. >> > > 1. The record size after serialization, i.e. application bytes. (The >> > > uncompressed record size can be easily derived as well) >> > > 2. The actual bytes sent over the wire. >> > > We can get (1) easily, but (2) is difficult to get at Record level >> when >> > we >> > > use compression. >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner <[email protected]> >> > wrote: >> > > >> > > > Hi Becket, >> > > > >> > > > The use-case for CRC is end-to-end audit, rather than checking >> whether >> > a >> > > > single message is corrupt or not. >> > > > >> > > > Regarding record size, I was thinking to extract record size from >> > Record. >> > > > That will include header overhead as well. I think total record size >> > will >> > > > tell users how much bandwidth their messages take. Since header is >> > > > relatively small and constant, users also will get an idea of their >> > > > key/value sizes. >> > > > >> > > > Thanks, >> > > > Anna >> > > > >> > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin <[email protected]> >> > > wrote: >> > > > >> > > > > I am +1 on #1.2 and #3. >> > > > > >> > > > > #2: Regarding CRC, I am not sure if users care about CRC. is there >> > any >> > > > > specific use case? Currently we validate messages by calling >> > > > ensureValid() >> > > > > to verify the checksum and throw exception if it does not match. >> > > > > >> > > > > Message size would be useful. We can add that to ConsumerRecord. >> Can >> > > you >> > > > > clarify the message size you are referring to? Does it include the >> > > > message >> > > > > header overhead or not? From user's point of view, they probably >> > don't >> > > > care >> > > > > about header size. >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jiangjie (Becket) Qin >> > > > > >> > > > > >> > > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede <[email protected] >> > >> > > > wrote: >> > > > > >> > > > > > Anna, >> > > > > > >> > > > > > Thanks for being diligent. >> > > > > > >> > > > > > +1 on #1.2 and sounds good on #3. I recommend adding checksum >> and >> > > size >> > > > > > fields to RecordMetadata and ConsumerRecord instead of exposing >> > > > metadata >> > > > > > piecemeal in the interceptor APIs. >> > > > > > >> > > > > > Thanks, >> > > > > > Neha >> > > > > > >> > > > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner < >> [email protected]> >> > > > wrote: >> > > > > > >> > > > > > > Hi All, >> > > > > > > >> > > > > > > The KIP wiki page is now up-to-date with the scope we have >> agreed >> > > on: >> > > > > > > Producer and Consumer Interceptors with a minimal set of >> mutable >> > > API >> > > > > that >> > > > > > > are not dependent on producer and consumer internal >> > implementation. >> > > > > > > >> > > > > > > I have few more API details that I would like to bring >> attention >> > to >> > > > > > or/and >> > > > > > > discuss: >> > > > > > > >> > > > > > > 1. Handling exceptions >> > > > > > > >> > > > > > > Exceptions can provide an additional level of control. For >> > example, >> > > > we >> > > > > > can >> > > > > > > filter messages on consumer side or stop messages on producer >> if >> > > they >> > > > > > don’t >> > > > > > > have the right field. >> > > > > > > >> > > > > > > I see two options: >> > > > > > > 1.1. For callbacks that can mutate records (onSend and >> > onConsume), >> > > > > > > propagate exceptions through the original calls >> > > (KafkaProducer.send() >> > > > > and >> > > > > > > KafkaConsumer.poll() respectively). For other callbacks, catch >> > > > > exception, >> > > > > > > log, and ignore. >> > > > > > > 1.2. Catch exceptions from all the interceptor callbacks and >> > > ignore. >> > > > > > > >> > > > > > > The issue with 1.1. is that it effectively changes >> > > > KafkaProducer.send() >> > > > > > and >> > > > > > > KafkaConsumer.poll() API, since now they may throw exceptions >> > that >> > > > are >> > > > > > not >> > > > > > > documented in KafkaProducer/Consumer API. Another option is to >> > > allow >> > > > to >> > > > > > > propagate some exceptions, and ignore others. >> > > > > > > >> > > > > > > I think our use-cases do not require propagating exceptions. >> So, >> > I >> > > > > > propose >> > > > > > > option 1.2. Unless someone has suggestion/use-cases for >> > propagating >> > > > > > > exceptions. Please let me know. >> > > > > > > >> > > > > > > 2. Intercepting record CRC and record size >> > > > > > > >> > > > > > > Since we decided not to add any intermediate callbacks (such >> as >> > > > > onEnqueue >> > > > > > > or onReceive) to interceptors, I think it is still valuable to >> > > > > intercept >> > > > > > > record CRC and record size in bytes for monitoring and audit >> > > > use-cases. >> > > > > > > >> > > > > > > I propose to add checksum and size fields to RecordMetadata >> and >> > > > > > > ConsumerRecord. Another option would be to add them as >> parameters >> > > in >> > > > > > > onAcknowledgement() and onConsume() callbacks. >> > > > > > > >> > > > > > > 3. Callbacks that allow to modify records look as follows: >> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); >> > > > > > > ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> >> records); >> > > > > > > >> > > > > > > This means that interceptors can potentially modify >> > topic/partition >> > > > in >> > > > > > > ProducerRecord and topic/partition/offset in ConsumerRecord. I >> > > > propose >> > > > > > that >> > > > > > > it is up to the interceptor implementation to ensure that >> > > > > > topic/partition, >> > > > > > > etc is correct. KafkaProducer.send() will use topic, >> partition, >> > > key, >> > > > > and >> > > > > > > value from ProducerRecord returned from the onSend(). >> Similarly, >> > > > > > > ConsumerRecords returned from KafkaConsumer.poll() would be >> the >> > > ones >> > > > > > > returned from the interceptor. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Please let me know if you have any suggestions or objections >> to >> > the >> > > > > > above. >> > > > > > > >> > > > > > > Thanks, >> > > > > > > Anna >> > > > > > > >> > > > > > > >> > > > > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner < >> [email protected] >> > > >> > > > > wrote: >> > > > > > > >> > > > > > > > Hi Mayuresh, >> > > > > > > > >> > > > > > > > I see why you would want to check for messages left in the >> > > > > > > > RecordAccumulator. However, I don't think this will >> completely >> > > > solve >> > > > > > the >> > > > > > > > problem. Messages could be in-flight somewhere else, like in >> > the >> > > > > > socket, >> > > > > > > or >> > > > > > > > there maybe in-flight messages on the consumer side of the >> > > > > MirrorMaker. >> > > > > > > So, >> > > > > > > > if we go the route of checking whether there are any >> in-flight >> > > > > messages >> > > > > > > for >> > > > > > > > topic deletion use-case, maybe it is better count them with >> > > > onSend() >> > > > > > and >> > > > > > > > onAcknowledge() -- whether all messages sent were >> > acknowledged. I >> > > > > also >> > > > > > > > think that it would be better to solve this without >> > interceptors, >> > > > > such >> > > > > > as >> > > > > > > > fix error handling in this scenario. However, I do not have >> any >> > > > good >> > > > > > > > proposal right now, so these are just general thoughts. >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Anna >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat < >> > > > > > > > [email protected]> wrote: >> > > > > > > > >> > > > > > > >> Calling producer.flush(), flushes all the data. So this is >> OK. >> > > But >> > > > > > when >> > > > > > > >> you >> > > > > > > >> are running Mirror maker, I am not sure there is a way to >> > > flush() >> > > > > from >> > > > > > > >> outside. >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> Thanks, >> > > > > > > >> >> > > > > > > >> Mayuresh >> > > > > > > >> >> > > > > > > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin < >> > > > [email protected]> >> > > > > > > >> wrote: >> > > > > > > >> >> > > > > > > >> > Mayuresh, >> > > > > > > >> > >> > > > > > > >> > Regarding your use case about mirror maker. Is it good >> > enough >> > > as >> > > > > > long >> > > > > > > >> as we >> > > > > > > >> > know there is no message for the topic in the producer >> > > anymore? >> > > > If >> > > > > > > that >> > > > > > > >> is >> > > > > > > >> > the case, call producer.flush() is sufficient. >> > > > > > > >> > >> > > > > > > >> > Thanks, >> > > > > > > >> > >> > > > > > > >> > Jiangjie (Becket) Qin >> > > > > > > >> > >> > > > > > > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat < >> > > > > > > >> > [email protected] >> > > > > > > >> > > wrote: >> > > > > > > >> > >> > > > > > > >> > > Hi Anna, >> > > > > > > >> > > >> > > > > > > >> > > Thanks a lot for summarizing the discussion on this >> kip. >> > > > > > > >> > > >> > > > > > > >> > > It LGTM. >> > > > > > > >> > > This is really nice : >> > > > > > > >> > > We decided not to add any callbacks to producer and >> > consumer >> > > > > > > >> > > interceptors that will depend on internal >> implementation >> > as >> > > > part >> > > > > > of >> > > > > > > >> this >> > > > > > > >> > > KIP. >> > > > > > > >> > > *However, it is possible to add them later as part of >> > > another >> > > > > KIP >> > > > > > if >> > > > > > > >> > there >> > > > > > > >> > > are good use-cases.* >> > > > > > > >> > > >> > > > > > > >> > > Do you agree with the use case I explained earlier for >> > > knowing >> > > > > the >> > > > > > > >> number >> > > > > > > >> > > of records left in the RecordAccumulator for a >> particular >> > > > topic. >> > > > > > It >> > > > > > > >> might >> > > > > > > >> > > be orthogonal to this KIP, but will be helpful. What do >> > you >> > > > > think? >> > > > > > > >> > > >> > > > > > > >> > > Thanks, >> > > > > > > >> > > >> > > > > > > >> > > Mayuresh >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino < >> > > > [email protected] >> > > > > > >> > > > > > > >> wrote: >> > > > > > > >> > > >> > > > > > > >> > > > This looks good. As noted, having one mutable >> > interceptor >> > > on >> > > > > > each >> > > > > > > >> side >> > > > > > > >> > > > allows for the use cases we can envision right now, >> and >> > I >> > > > > think >> > > > > > > >> that’s >> > > > > > > >> > > > going to provide a great deal of opportunity for >> > > > implementing >> > > > > > > things >> > > > > > > >> > like >> > > > > > > >> > > > audit, especially within a multi-tenant environment. >> > > Looking >> > > > > > > >> forward to >> > > > > > > >> > > > getting this available in the clients. >> > > > > > > >> > > > >> > > > > > > >> > > > Thanks! >> > > > > > > >> > > > >> > > > > > > >> > > > -Todd >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner < >> > > > > > [email protected]> >> > > > > > > >> > wrote: >> > > > > > > >> > > > >> > > > > > > >> > > > > Hi All, >> > > > > > > >> > > > > >> > > > > > > >> > > > > Here is meeting notes from today’s KIP meeting: >> > > > > > > >> > > > > >> > > > > > > >> > > > > 1. We agreed to keep the scope of this KIP to be >> > > producer >> > > > > and >> > > > > > > >> > consumer >> > > > > > > >> > > > > interceptors only. Broker-side interceptor will be >> > added >> > > > > later >> > > > > > > as >> > > > > > > >> a >> > > > > > > >> > > > > separate KIP. The reasons were already mentioned in >> > this >> > > > > > thread, >> > > > > > > >> but >> > > > > > > >> > > the >> > > > > > > >> > > > > summary is: >> > > > > > > >> > > > > * Broker interceptor is riskier and requires >> careful >> > > > > > > >> consideration >> > > > > > > >> > > about >> > > > > > > >> > > > > overheads, whether to intercept leaders vs. >> > > > > leaders/replicas, >> > > > > > > >> what to >> > > > > > > >> > > do >> > > > > > > >> > > > on >> > > > > > > >> > > > > leader failover and so on. >> > > > > > > >> > > > > * Broker interceptors increase monitoring >> resolution, >> > > but >> > > > > not >> > > > > > > >> > > including >> > > > > > > >> > > > it >> > > > > > > >> > > > > in this KIP does not reduce usefulness of producer >> and >> > > > > > consumer >> > > > > > > >> > > > > interceptors that enable end-to-end monitoring >> > > > > > > >> > > > > >> > > > > > > >> > > > > 2. We agreed to scope ProducerInterceptor and >> > > > > > > ConsumerInterceptor >> > > > > > > >> > > > callbacks >> > > > > > > >> > > > > to minimal set of mutable API that are not >> dependent >> > on >> > > > > > producer >> > > > > > > >> and >> > > > > > > >> > > > > consumer internal implementation. >> > > > > > > >> > > > > >> > > > > > > >> > > > > ProducerInterceptor: >> > > > > > > >> > > > > *ProducerRecord<K, V> onSend(ProducerRecord<K, V> >> > > > record);* >> > > > > > > >> > > > > *void onAcknowledgement(RecordMetadata metadata, >> > > Exception >> > > > > > > >> > exception);* >> > > > > > > >> > > > > >> > > > > > > >> > > > > ConsumerInterceptor: >> > > > > > > >> > > > > *ConsumerRecords<K, V> >> onConsume(ConsumerRecords<K, V> >> > > > > > > records);* >> > > > > > > >> > > > > *void onCommit(Map<TopicPartition, >> OffsetAndMetadata> >> > > > > > offsets);* >> > > > > > > >> > > > > >> > > > > > > >> > > > > We will allow interceptors to modify >> ProducerRecord on >> > > > > > producer >> > > > > > > >> side, >> > > > > > > >> > > and >> > > > > > > >> > > > > modify ConsumerRecords on consumer side. This will >> > > support >> > > > > > > >> end-to-end >> > > > > > > >> > > > > monitoring and auditing and support the ability to >> add >> > > > > > metadata >> > > > > > > >> for a >> > > > > > > >> > > > > message. This will support Todd’s Auditing and >> Routing >> > > > > > > use-cases. >> > > > > > > >> > > > > >> > > > > > > >> > > > > We did not find any use-case for modifying records >> in >> > > > > > > onConsume() >> > > > > > > >> > > > callback, >> > > > > > > >> > > > > but decided to enable modification of consumer >> records >> > > for >> > > > > > > >> symmetry >> > > > > > > >> > > with >> > > > > > > >> > > > > onSend(). >> > > > > > > >> > > > > >> > > > > > > >> > > > > 3. We agreed to ensure compatibility when/if we add >> > new >> > > > > > methods >> > > > > > > to >> > > > > > > >> > > > > ProducerInterceptor and ConsumerInterceptor by >> using >> > > > default >> > > > > > > >> methods >> > > > > > > >> > > with >> > > > > > > >> > > > > an empty implementation. Ok to assume Java 8. >> (This is >> > > > > > Ismael’s >> > > > > > > >> > method >> > > > > > > >> > > > #2). >> > > > > > > >> > > > > >> > > > > > > >> > > > > 4. We decided not to add any callbacks to producer >> and >> > > > > > consumer >> > > > > > > >> > > > > interceptors that will depend on internal >> > implementation >> > > > as >> > > > > > part >> > > > > > > >> of >> > > > > > > >> > > this >> > > > > > > >> > > > > KIP. However, it is possible to add them later as >> part >> > > of >> > > > > > > another >> > > > > > > >> KIP >> > > > > > > >> > > if >> > > > > > > >> > > > > there are good use-cases. >> > > > > > > >> > > > > >> > > > > > > >> > > > > *Reasoning.* We did not have concrete use-cases >> that >> > > > > justified >> > > > > > > >> more >> > > > > > > >> > > > methods >> > > > > > > >> > > > > at this point. Some of the use-cases were for more >> > > > > fine-grain >> > > > > > > >> latency >> > > > > > > >> > > > > collection, which could be done with Kafka Metrics. >> > > > Another >> > > > > > > >> use-case >> > > > > > > >> > > was >> > > > > > > >> > > > > encryption. However, there are several design >> options >> > > for >> > > > > > > >> encryption. >> > > > > > > >> > > One >> > > > > > > >> > > > > is to do per-record encryption which would require >> > > adding >> > > > > > > >> > > > > ProducerInterceptor.onEnqueued() and >> > > > > > > >> ConsumerInterceptor.onReceive(). >> > > > > > > >> > > One >> > > > > > > >> > > > > could argue that in that case encryption could be >> done >> > > by >> > > > > > > adding a >> > > > > > > >> > > custom >> > > > > > > >> > > > > serializer/deserializer. Another option is to do >> > > > encryption >> > > > > > > after >> > > > > > > >> > > message >> > > > > > > >> > > > > gets compressed, but there are issues that arise >> > > regarding >> > > > > > > broker >> > > > > > > >> > doing >> > > > > > > >> > > > > re-compression. We decided that it is better to >> have >> > > that >> > > > > > > >> discussion >> > > > > > > >> > > in a >> > > > > > > >> > > > > separate KIP and decide that this is something we >> want >> > > to >> > > > do >> > > > > > > with >> > > > > > > >> > > > > interceptors or by other means. >> > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > Todd, Mayuresh and others who missed the KIP >> meeting, >> > > > please >> > > > > > let >> > > > > > > >> me >> > > > > > > >> > > know >> > > > > > > >> > > > > your thoughts on the scope we agreed on during the >> > > > meeting. >> > > > > > > >> > > > > >> > > > > > > >> > > > > I will update the KIP proposal with the current >> > decision >> > > > by >> > > > > > end >> > > > > > > of >> > > > > > > >> > > today. >> > > > > > > >> > > > > >> > > > > > > >> > > > > Thanks, >> > > > > > > >> > > > > Anna >> > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat < >> > > > > > > >> > > > > [email protected]> wrote: >> > > > > > > >> > > > > >> > > > > > > >> > > > > > Hi, >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > I won't be able to make it to KIP hangout due to >> > > > conflict. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Anna, here is the use case where knowing if there >> > are >> > > > > > messages >> > > > > > > >> in >> > > > > > > >> > the >> > > > > > > >> > > > > > RecordAccumulator left to be sent to the kafka >> > cluster >> > > > > for a >> > > > > > > >> topic >> > > > > > > >> > is >> > > > > > > >> > > > > > useful. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > 1) Consider a pipeline : >> > > > > > > >> > > > > > A ---> Mirror-maker -----> B >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > 2) We have a topic T in cluster A mirrored to >> > cluster >> > > B. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > 3) Now if we delete topic T in A and immediately >> > > proceed >> > > > > to >> > > > > > > >> delete >> > > > > > > >> > > the >> > > > > > > >> > > > > > topic in cluster B, some of the the Mirror-maker >> > > > machines >> > > > > > die >> > > > > > > >> > because >> > > > > > > >> > > > > > atleast one of the batches in RecordAccumulator >> for >> > > > topic >> > > > > T >> > > > > > > >> fail to >> > > > > > > >> > > be >> > > > > > > >> > > > > > produced to cluster B. We have seen this >> happening >> > in >> > > > our >> > > > > > > >> clusters. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > If we know that there are no more messages left >> in >> > the >> > > > > > > >> > > > RecordAccumulator >> > > > > > > >> > > > > to >> > > > > > > >> > > > > > be produced to cluster B, we can safely delete >> the >> > > topic >> > > > > in >> > > > > > > >> > cluster B >> > > > > > > >> > > > > > without disturbing the pipeline. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Thanks, >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Mayuresh >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner < >> > > > > > > >> [email protected]> >> > > > > > > >> > > > > wrote: >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > > Thanks Ismael and Todd for your feedback! >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > I agree about coming up with lean, but useful >> > > > interfaces >> > > > > > > that >> > > > > > > >> > will >> > > > > > > >> > > be >> > > > > > > >> > > > > > easy >> > > > > > > >> > > > > > > to extend later. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > When we discuss the minimal set of producer and >> > > > consumer >> > > > > > > >> > > interceptor >> > > > > > > >> > > > > API >> > > > > > > >> > > > > > in >> > > > > > > >> > > > > > > today’s KIP meeting (discussion item #2 in my >> > > previous >> > > > > > > email), >> > > > > > > >> > lets >> > > > > > > >> > > > > > compare >> > > > > > > >> > > > > > > two options: >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > *1. Minimal set of immutable API for producer >> and >> > > > > consumer >> > > > > > > >> > > > > interceptors* >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > ProducerInterceptor: >> > > > > > > >> > > > > > > public void onSend(ProducerRecord<K, V> >> record); >> > > > > > > >> > > > > > > public void onAcknowledgement(RecordMetadata >> > > metadata, >> > > > > > > >> Exception >> > > > > > > >> > > > > > > exception); >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > ConsumerInterceptor: >> > > > > > > >> > > > > > > public void onConsume(ConsumerRecords<K, V> >> > > records); >> > > > > > > >> > > > > > > public void onCommit(Map<TopicPartition, >> > > > > > OffsetAndMetadata> >> > > > > > > >> > > offsets); >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Use-cases: >> > > > > > > >> > > > > > > — end-to-end monitoring; custom tracing and >> > logging >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > *2. Minimal set of mutable API for producer and >> > > > consumer >> > > > > > > >> > > > interceptors* >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > ProducerInterceptor: >> > > > > > > >> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, >> V> >> > > > > record); >> > > > > > > >> > > > > > > void onAcknowledgement(RecordMetadata metadata, >> > > > > Exception >> > > > > > > >> > > exception); >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > ConsumerInterceptor: >> > > > > > > >> > > > > > > void onConsume(ConsumerRecords<K, V> records); >> > > > > > > >> > > > > > > void onCommit(Map<TopicPartition, >> > OffsetAndMetadata> >> > > > > > > offsets); >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Additional use-cases to #1: >> > > > > > > >> > > > > > > — Ability to add metadata to a message or fill >> in >> > > > > standard >> > > > > > > >> fields >> > > > > > > >> > > for >> > > > > > > >> > > > > > audit >> > > > > > > >> > > > > > > and routing. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Implications >> > > > > > > >> > > > > > > — Partition assignment will be done based on >> > > modified >> > > > > > > >> key/value >> > > > > > > >> > > > instead >> > > > > > > >> > > > > > of >> > > > > > > >> > > > > > > original key/value. If key/value >> transformation is >> > > not >> > > > > > > >> consistent >> > > > > > > >> > > > (same >> > > > > > > >> > > > > > key >> > > > > > > >> > > > > > > and value does not mutate to the same, but >> > modified, >> > > > > > > >> key/value), >> > > > > > > >> > > then >> > > > > > > >> > > > > log >> > > > > > > >> > > > > > > compaction would not work. However, audit and >> > > routing >> > > > > > > >> use-cases >> > > > > > > >> > > from >> > > > > > > >> > > > > Todd >> > > > > > > >> > > > > > > will likely do consistent transformation. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > *Additional callbacks (discussion item #3 in my >> > > > previous >> > > > > > > >> email):* >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > If we want to support encryption, we would >> want to >> > > be >> > > > > able >> > > > > > > to >> > > > > > > >> > > modify >> > > > > > > >> > > > > > > serialized key/value, rather than key and value >> > > > objects. >> > > > > > > This >> > > > > > > >> > will >> > > > > > > >> > > > add >> > > > > > > >> > > > > > the >> > > > > > > >> > > > > > > following API to producer and consumer >> > interceptors: >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > ProducerInterceptor: >> > > > > > > >> > > > > > > SerializedKeyValue onEnqueued(TopicPartition >> tp, >> > > > > > > >> > ProducerRecord<K, >> > > > > > > >> > > V> >> > > > > > > >> > > > > > > record, SerializedKeyValue serializedKeyValue); >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > ConsumerInterceptor: >> > > > > > > >> > > > > > > SerializedKeyValue onReceive(TopicPartition tp, >> > > > > > > >> > SerializedKeyValue >> > > > > > > >> > > > > > > serializedKeyValue); >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > I am leaning towards implementing the minimal >> set >> > of >> > > > > > > >> immutable or >> > > > > > > >> > > > > mutable >> > > > > > > >> > > > > > > interfaces, making sure that we have a >> > compatibility >> > > > > plan >> > > > > > > that >> > > > > > > >> > > allows >> > > > > > > >> > > > > us >> > > > > > > >> > > > > > to >> > > > > > > >> > > > > > > add more callbacks in the future (per Ismael >> > > comment), >> > > > > and >> > > > > > > add >> > > > > > > >> > more >> > > > > > > >> > > > > APIs >> > > > > > > >> > > > > > > later. E.g., for encryption use-case, there >> could >> > be >> > > > an >> > > > > > > >> argument >> > > > > > > >> > in >> > > > > > > >> > > > > doing >> > > > > > > >> > > > > > > encryption after message compression vs. >> > per-record >> > > > > > > encryption >> > > > > > > >> > that >> > > > > > > >> > > > > could >> > > > > > > >> > > > > > > be done using the above additional API. There >> is >> > > also >> > > > > more >> > > > > > > >> > > > implications >> > > > > > > >> > > > > > for >> > > > > > > >> > > > > > > every API that modifies records: modifying >> > > serialized >> > > > > > > >> key/value >> > > > > > > >> > > will >> > > > > > > >> > > > > > again >> > > > > > > >> > > > > > > impact partition assignment (we will likely do >> > that >> > > > > after >> > > > > > > >> > partition >> > > > > > > >> > > > > > > assignment), which may impact log compaction >> and >> > > > mirror >> > > > > > > maker >> > > > > > > >> > > > > > partitioning. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Thanks, >> > > > > > > >> > > > > > > Anna >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino < >> > > > > > > >> [email protected]> >> > > > > > > >> > > > > wrote: >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > Finally got a chance to take a look at this. >> I >> > > won’t >> > > > > be >> > > > > > > >> able to >> > > > > > > >> > > > make >> > > > > > > >> > > > > > the >> > > > > > > >> > > > > > > > KIP meeting due to a conflict. >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > I’m somewhat disappointed in this proposal. I >> > > think >> > > > > that >> > > > > > > the >> > > > > > > >> > > > explicit >> > > > > > > >> > > > > > > > exclusion of modification of the messages is >> > > > > > > short-sighted, >> > > > > > > >> and >> > > > > > > >> > > not >> > > > > > > >> > > > > > > > accounting for it now is going to bite us >> later. >> > > > Jay, >> > > > > > > aren’t >> > > > > > > >> > you >> > > > > > > >> > > > the >> > > > > > > >> > > > > > one >> > > > > > > >> > > > > > > > railing against public interfaces and how >> > > difficult >> > > > > they >> > > > > > > >> are to >> > > > > > > >> > > > work >> > > > > > > >> > > > > > with >> > > > > > > >> > > > > > > > when you don’t get them right? The “simple” >> > change >> > > > to >> > > > > > one >> > > > > > > of >> > > > > > > >> > > these >> > > > > > > >> > > > > > > > interfaces to make it able to return a >> record is >> > > > going >> > > > > > to >> > > > > > > >> be a >> > > > > > > >> > > > > > > significant >> > > > > > > >> > > > > > > > change and is going to require all clients to >> > > > rewrite >> > > > > > > their >> > > > > > > >> > > > > > interceptors. >> > > > > > > >> > > > > > > > If we’re not willing to put the time to think >> > > > through >> > > > > > > >> > > manipulation >> > > > > > > >> > > > > now, >> > > > > > > >> > > > > > > > then this KIP should be shelved until we are. >> > > > > > Implementing >> > > > > > > >> > > > something >> > > > > > > >> > > > > > > > halfway is going to be worse than taking a >> > little >> > > > > > longer. >> > > > > > > In >> > > > > > > >> > > > > addition, >> > > > > > > >> > > > > > I >> > > > > > > >> > > > > > > > don’t believe that manipulation requires >> > anything >> > > > more >> > > > > > > than >> > > > > > > >> > > > > > interceptors >> > > > > > > >> > > > > > > to >> > > > > > > >> > > > > > > > receive the full record, and then to return >> it. >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > There are 3 use case I can think of right now >> > > > without >> > > > > > any >> > > > > > > >> deep >> > > > > > > >> > > > > > discussion >> > > > > > > >> > > > > > > > that can make use of interceptors with >> > > modification: >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > 1. Auditing. The ability to add metadata to a >> > > > message >> > > > > > for >> > > > > > > >> > > auditing >> > > > > > > >> > > > is >> > > > > > > >> > > > > > > > critical. Hostname, service name, timestamps, >> > etc. >> > > > are >> > > > > > all >> > > > > > > >> > pieces >> > > > > > > >> > > > of >> > > > > > > >> > > > > > data >> > > > > > > >> > > > > > > > that can be used on the other side of the >> > pipeline >> > > > to >> > > > > > > >> > categorize >> > > > > > > >> > > > > > > messages, >> > > > > > > >> > > > > > > > determine loss and transport time, and pin >> down >> > > > > issues. >> > > > > > > You >> > > > > > > >> may >> > > > > > > >> > > say >> > > > > > > >> > > > > > that >> > > > > > > >> > > > > > > > these things can just be part of the message >> > > schema, >> > > > > but >> > > > > > > >> anyone >> > > > > > > >> > > who >> > > > > > > >> > > > > has >> > > > > > > >> > > > > > > > worked with a multi-user data system >> (especially >> > > > those >> > > > > > who >> > > > > > > >> have >> > > > > > > >> > > > been >> > > > > > > >> > > > > > > > involved with LinkedIn) know how difficult >> it is >> > > to >> > > > > > > maintain >> > > > > > > >> > > > > consistent >> > > > > > > >> > > > > > > > message schemas and to get other people to >> put >> > in >> > > > > fields >> > > > > > > for >> > > > > > > >> > your >> > > > > > > >> > > > > use. >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > 2. Encryption. This is probably the most >> obvious >> > > > case >> > > > > > for >> > > > > > > >> > record >> > > > > > > >> > > > > > > > manipulation on both sides. The ability to >> tie >> > in >> > > > end >> > > > > to >> > > > > > > end >> > > > > > > >> > > > > encryption >> > > > > > > >> > > > > > > is >> > > > > > > >> > > > > > > > important for data that requires external >> > > compliance >> > > > > > (PCI, >> > > > > > > >> > HIPAA, >> > > > > > > >> > > > > > etc.). >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > 3. Routing. By being able to add a bit of >> > > > information >> > > > > > > about >> > > > > > > >> the >> > > > > > > >> > > > > source >> > > > > > > >> > > > > > or >> > > > > > > >> > > > > > > > destination of a message to the metadata, you >> > can >> > > > > easily >> > > > > > > >> > > construct >> > > > > > > >> > > > an >> > > > > > > >> > > > > > > > intelligent mirror maker that can prevent >> loops. >> > > > This >> > > > > > has >> > > > > > > >> the >> > > > > > > >> > > > > > opportunity >> > > > > > > >> > > > > > > > to result in significant operational >> savings, as >> > > you >> > > > > can >> > > > > > > get >> > > > > > > >> > rid >> > > > > > > >> > > of >> > > > > > > >> > > > > the >> > > > > > > >> > > > > > > > need for tiered clusters in order to prevent >> > loops >> > > > in >> > > > > > > >> mirroring >> > > > > > > >> > > > > > messages. >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > All three of these share the feature that >> they >> > add >> > > > > > > metadata >> > > > > > > >> to >> > > > > > > >> > > > > > messages. >> > > > > > > >> > > > > > > > With the pushback on having arbitrary >> metadata >> > as >> > > an >> > > > > > > >> “envelope” >> > > > > > > >> > > to >> > > > > > > >> > > > > the >> > > > > > > >> > > > > > > > message, this is a way to provide it and >> make it >> > > the >> > > > > > > >> > > responsibility >> > > > > > > >> > > > > of >> > > > > > > >> > > > > > > the >> > > > > > > >> > > > > > > > client, and not the Kafka broker and system >> > > itself. >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > -Todd >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > On Tue, Jan 26, 2016 at 2:30 AM, Ismael Juma >> < >> > > > > > > >> > [email protected]> >> > > > > > > >> > > > > > wrote: >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > Hi Anna and Neha, >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > I think it makes a lot of sense to try and >> > keep >> > > > the >> > > > > > > >> interface >> > > > > > > >> > > > lean >> > > > > > > >> > > > > > and >> > > > > > > >> > > > > > > to >> > > > > > > >> > > > > > > > > add more methods later when/if there is a >> > need. >> > > > What >> > > > > > is >> > > > > > > >> the >> > > > > > > >> > > > current >> > > > > > > >> > > > > > > > > thinking with regards to compatibility >> when/if >> > > we >> > > > > add >> > > > > > > new >> > > > > > > >> > > > methods? >> > > > > > > >> > > > > A >> > > > > > > >> > > > > > > few >> > > > > > > >> > > > > > > > > options come to mind: >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > 1. Change the interface to an abstract >> class >> > > with >> > > > > > empty >> > > > > > > >> > > > > > implementations >> > > > > > > >> > > > > > > > for >> > > > > > > >> > > > > > > > > all the methods. This means that the path >> to >> > > > adding >> > > > > > new >> > > > > > > >> > methods >> > > > > > > >> > > > is >> > > > > > > >> > > > > > > clear. >> > > > > > > >> > > > > > > > > 2. Hope we have moved to Java 8 by the >> time we >> > > > need >> > > > > to >> > > > > > > add >> > > > > > > >> > new >> > > > > > > >> > > > > > methods >> > > > > > > >> > > > > > > > and >> > > > > > > >> > > > > > > > > use default methods with an empty >> > implementation >> > > > for >> > > > > > any >> > > > > > > >> new >> > > > > > > >> > > > method >> > > > > > > >> > > > > > > (and >> > > > > > > >> > > > > > > > > potentially make existing methods default >> > > methods >> > > > > too >> > > > > > at >> > > > > > > >> that >> > > > > > > >> > > > point >> > > > > > > >> > > > > > for >> > > > > > > >> > > > > > > > > consistency) >> > > > > > > >> > > > > > > > > 3. Introduce a new interface that inherits >> > from >> > > > the >> > > > > > > >> existing >> > > > > > > >> > > > > > > Interceptor >> > > > > > > >> > > > > > > > > interface when we need to add new methods. >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > Option 1 is the easiest and it also means >> that >> > > > > > > interceptor >> > > > > > > >> > > users >> > > > > > > >> > > > > only >> > > > > > > >> > > > > > > > need >> > > > > > > >> > > > > > > > > to override the methods that they are >> > interested >> > > > > (more >> > > > > > > >> useful >> > > > > > > >> > > if >> > > > > > > >> > > > > the >> > > > > > > >> > > > > > > > number >> > > > > > > >> > > > > > > > > of methods grows). The downside is that >> > > > interceptor >> > > > > > > >> > > > implementations >> > > > > > > >> > > > > > > > cannot >> > > > > > > >> > > > > > > > > inherit from another class (a >> straightforward >> > > > > > workaround >> > > > > > > >> is >> > > > > > > >> > to >> > > > > > > >> > > > make >> > > > > > > >> > > > > > the >> > > > > > > >> > > > > > > > > interceptor a forwarder that calls another >> > > class). >> > > > > > Also, >> > > > > > > >> our >> > > > > > > >> > > > > existing >> > > > > > > >> > > > > > > > > callbacks are interfaces, so seems a bit >> > > > > inconsistent. >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > Option 2 may be the most appealing one as >> both >> > > > users >> > > > > > and >> > > > > > > >> > > > ourselves >> > > > > > > >> > > > > > > retain >> > > > > > > >> > > > > > > > > flexibility. The main downside is that it >> > relies >> > > > on >> > > > > us >> > > > > > > >> moving >> > > > > > > >> > > to >> > > > > > > >> > > > > Java >> > > > > > > >> > > > > > > 8, >> > > > > > > >> > > > > > > > > which may be more than a year away >> potentially >> > > (if >> > > > > we >> > > > > > > >> support >> > > > > > > >> > > the >> > > > > > > >> > > > > > last >> > > > > > > >> > > > > > > 2 >> > > > > > > >> > > > > > > > > Java releases). >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > Thoughts? >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > Ismael >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > On Tue, Jan 26, 2016 at 4:59 AM, Neha >> > Narkhede < >> > > > > > > >> > > > [email protected]> >> > > > > > > >> > > > > > > > wrote: >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > > Anna, >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > I'm also in favor of including just the >> APIs >> > > for >> > > > > > which >> > > > > > > >> we >> > > > > > > >> > > have >> > > > > > > >> > > > a >> > > > > > > >> > > > > > > clear >> > > > > > > >> > > > > > > > > use >> > > > > > > >> > > > > > > > > > case. If more use cases for finer >> monitoring >> > > > show >> > > > > up >> > > > > > > in >> > > > > > > >> the >> > > > > > > >> > > > > future, >> > > > > > > >> > > > > > > we >> > > > > > > >> > > > > > > > > can >> > > > > > > >> > > > > > > > > > always update the interface. Would you >> > please >> > > > > > > highlight >> > > > > > > >> in >> > > > > > > >> > > the >> > > > > > > >> > > > > KIP >> > > > > > > >> > > > > > > the >> > > > > > > >> > > > > > > > > APIs >> > > > > > > >> > > > > > > > > > that you think we have an immediate use >> for? >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > Joel, >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > Broker-side monitoring makes a lot of >> sense >> > in >> > > > the >> > > > > > > long >> > > > > > > >> > term >> > > > > > > >> > > > > > though I >> > > > > > > >> > > > > > > > > don't >> > > > > > > >> > > > > > > > > > think it is a requirement for end-to-end >> > > > > monitoring. >> > > > > > > >> With >> > > > > > > >> > the >> > > > > > > >> > > > > > > producer >> > > > > > > >> > > > > > > > > and >> > > > > > > >> > > > > > > > > > consumer interceptors, you have the >> ability >> > to >> > > > get >> > > > > > > full >> > > > > > > >> > > > > > > > > > publish-to-subscribe end-to-end >> monitoring. >> > > The >> > > > > > broker >> > > > > > > >> > > > > interceptor >> > > > > > > >> > > > > > > > > > certainly improves the resolution of >> > > monitoring >> > > > > but >> > > > > > it >> > > > > > > >> is >> > > > > > > >> > > also >> > > > > > > >> > > > a >> > > > > > > >> > > > > > > > riskier >> > > > > > > >> > > > > > > > > > change. I prefer an incremental approach >> > over >> > > a >> > > > > > > big-bang >> > > > > > > >> > and >> > > > > > > >> > > > > > > recommend >> > > > > > > >> > > > > > > > > > taking baby-steps. Let's first make sure >> the >> > > > > > > >> > > producer/consumer >> > > > > > > >> > > > > > > > > interceptors >> > > > > > > >> > > > > > > > > > are successful. And then come back and >> add >> > the >> > > > > > broker >> > > > > > > >> > > > interceptor >> > > > > > > >> > > > > > > > > > carefully. >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > Having said that, it would be great to >> > > > understand >> > > > > > your >> > > > > > > >> > > proposal >> > > > > > > >> > > > > for >> > > > > > > >> > > > > > > the >> > > > > > > >> > > > > > > > > > broker interceptor independently. We can >> > > either >> > > > > add >> > > > > > an >> > > > > > > >> > > > > interceptor >> > > > > > > >> > > > > > > > > > on-append or on-commit. If people want to >> > use >> > > > this >> > > > > > for >> > > > > > > >> > > > > monitoring, >> > > > > > > >> > > > > > > then >> > > > > > > >> > > > > > > > > > possibly on-commit might be more useful? >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > Thanks, >> > > > > > > >> > > > > > > > > > Neha >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > On Mon, Jan 25, 2016 at 6:47 PM, Jay >> Kreps < >> > > > > > > >> > [email protected] >> > > > > > > >> > > > >> > > > > > > >> > > > > > wrote: >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > > Hey Joel, >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > What is the interface you are thinking >> of? >> > > > > > Something >> > > > > > > >> like >> > > > > > > >> > > > this: >> > > > > > > >> > > > > > > > > > > onAppend(String topic, int >> partition, >> > > > > Records >> > > > > > > >> > records, >> > > > > > > >> > > > long >> > > > > > > >> > > > > > > time) >> > > > > > > >> > > > > > > > > > > ? >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > One challenge right now is that we are >> > still >> > > > > using >> > > > > > > the >> > > > > > > >> > old >> > > > > > > >> > > > > > > > > > > Message/MessageSet classes on the >> broker >> > > which >> > > > > I'm >> > > > > > > not >> > > > > > > >> > sure >> > > > > > > >> > > > if >> > > > > > > >> > > > > > we'd >> > > > > > > >> > > > > > > > > want >> > > > > > > >> > > > > > > > > > to >> > > > > > > >> > > > > > > > > > > support over the long haul but it >> might be >> > > > okay >> > > > > > just >> > > > > > > >> to >> > > > > > > >> > > > create >> > > > > > > >> > > > > > the >> > > > > > > >> > > > > > > > > > records >> > > > > > > >> > > > > > > > > > > instance for this interface. >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > -Jay >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > On Mon, Jan 25, 2016 at 12:37 PM, Joel >> > > Koshy < >> > > > > > > >> > > > > > [email protected]> >> > > > > > > >> > > > > > > > > > wrote: >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > I'm definitely in favor of having >> such >> > > hooks >> > > > > in >> > > > > > > the >> > > > > > > >> > > > > > > produce/consume >> > > > > > > >> > > > > > > > > > > > life-cycle. Not sure if people >> remember >> > > this >> > > > > but >> > > > > > > in >> > > > > > > >> > Kafka >> > > > > > > >> > > > 0.7 >> > > > > > > >> > > > > > > this >> > > > > > > >> > > > > > > > > was >> > > > > > > >> > > > > > > > > > > > pretty much how it was: >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala >> > > > > > > >> > > > > > > > > > > > i.e., we had something similar to the >> > > > > > interceptor >> > > > > > > >> > > proposal >> > > > > > > >> > > > > for >> > > > > > > >> > > > > > > > > various >> > > > > > > >> > > > > > > > > > > > stages of the producer request. The >> > > producer >> > > > > > > >> provided >> > > > > > > >> > > > > > call-backs >> > > > > > > >> > > > > > > > for >> > > > > > > >> > > > > > > > > > > > beforeEnqueue, afterEnqueue, >> > > afterDequeuing, >> > > > > > > >> > > beforeSending, >> > > > > > > >> > > > > > etc. >> > > > > > > >> > > > > > > So >> > > > > > > >> > > > > > > > > at >> > > > > > > >> > > > > > > > > > > > LinkedIn we in fact did auditing >> within >> > > > these >> > > > > > > >> > call-backs >> > > > > > > >> > > > (and >> > > > > > > >> > > > > > not >> > > > > > > >> > > > > > > > > > > > explicitly in the wrapper). Over time >> > and >> > > > with >> > > > > > 0.8 >> > > > > > > >> we >> > > > > > > >> > > moved >> > > > > > > >> > > > > > that >> > > > > > > >> > > > > > > > out >> > > > > > > >> > > > > > > > > to >> > > > > > > >> > > > > > > > > > > the >> > > > > > > >> > > > > > > > > > > > wrapper libraries. >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > On a side-note while audit and other >> > > > > monitoring >> > > > > > > can >> > > > > > > >> be >> > > > > > > >> > > done >> > > > > > > >> > > > > > > > > internally >> > > > > > > >> > > > > > > > > > > in a >> > > > > > > >> > > > > > > > > > > > convenient way I think it should be >> > > > clarified >> > > > > > that >> > > > > > > >> > > having a >> > > > > > > >> > > > > > > wrapper >> > > > > > > >> > > > > > > > > is >> > > > > > > >> > > > > > > > > > in >> > > > > > > >> > > > > > > > > > > > general not a bad idea and I would >> even >> > > > > consider >> > > > > > > it >> > > > > > > >> to >> > > > > > > >> > > be a >> > > > > > > >> > > > > > > > > > > best-practice. >> > > > > > > >> > > > > > > > > > > > Even with 0.7 we still had a wrapper >> > > library >> > > > > and >> > > > > > > >> that >> > > > > > > >> > API >> > > > > > > >> > > > has >> > > > > > > >> > > > > > > > largely >> > > > > > > >> > > > > > > > > > > > stayed the same and has helped >> protect >> > > > against >> > > > > > > >> > (sometimes >> > > > > > > >> > > > > > > backwards >> > > > > > > >> > > > > > > > > > > > incompatible) changes in open source. >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > While we are on this topic I have one >> > > > comment >> > > > > > and >> > > > > > > >> Anna, >> > > > > > > >> > > you >> > > > > > > >> > > > > may >> > > > > > > >> > > > > > > > have >> > > > > > > >> > > > > > > > > > > > already considered this but I don't >> see >> > > > > mention >> > > > > > of >> > > > > > > >> it >> > > > > > > >> > in >> > > > > > > >> > > > the >> > > > > > > >> > > > > > KIP: >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > Add a custom message >> > interceptor/validator >> > > > on >> > > > > > the >> > > > > > > >> > broker >> > > > > > > >> > > on >> > > > > > > >> > > > > > > message >> > > > > > > >> > > > > > > > > > > > arrival. >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > We decompress and do basic >> validation of >> > > > > > messages >> > > > > > > on >> > > > > > > >> > > > > arrival. I >> > > > > > > >> > > > > > > > think >> > > > > > > >> > > > > > > > > > > there >> > > > > > > >> > > > > > > > > > > > is value in supporting custom >> validation >> > > and >> > > > > > > expand >> > > > > > > >> it >> > > > > > > >> > to >> > > > > > > >> > > > > > support >> > > > > > > >> > > > > > > > > > custom >> > > > > > > >> > > > > > > > > > > > on-arrival processing. Here is a >> > specific >> > > > > > > use-case I >> > > > > > > >> > have >> > > > > > > >> > > > in >> > > > > > > >> > > > > > > mind. >> > > > > > > >> > > > > > > > > The >> > > > > > > >> > > > > > > > > > > blog >> > > > > > > >> > > > > > > > > > > > that James referenced describes our >> > > auditing >> > > > > > > >> > > > infrastructure. >> > > > > > > >> > > > > In >> > > > > > > >> > > > > > > > order >> > > > > > > >> > > > > > > > > > to >> > > > > > > >> > > > > > > > > > > > audit the Kafka cluster itself we >> need >> > to >> > > > run >> > > > > a >> > > > > > > >> > "console >> > > > > > > >> > > > > > auditor" >> > > > > > > >> > > > > > > > > > service >> > > > > > > >> > > > > > > > > > > > that consumes everything and spits >> out >> > > audit >> > > > > > > events >> > > > > > > >> > back >> > > > > > > >> > > to >> > > > > > > >> > > > > the >> > > > > > > >> > > > > > > > > > cluster. >> > > > > > > >> > > > > > > > > > > I >> > > > > > > >> > > > > > > > > > > > would prefer not having to run this >> > > service >> > > > > > > because: >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > - Well, it is one more service >> that >> > we >> > > > have >> > > > > > to >> > > > > > > >> run >> > > > > > > >> > and >> > > > > > > >> > > > > > monitor >> > > > > > > >> > > > > > > > > > > > - Consuming everything takes up >> > > bandwidth >> > > > > > which >> > > > > > > >> can >> > > > > > > >> > be >> > > > > > > >> > > > > > avoided >> > > > > > > >> > > > > > > > > > > > - The console auditor consumer >> itself >> > > can >> > > > > lag >> > > > > > > and >> > > > > > > >> > > cause >> > > > > > > >> > > > > > > > temporary >> > > > > > > >> > > > > > > > > > > audit >> > > > > > > >> > > > > > > > > > > > discrepancies >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > One way we can mitigate this is by >> > having >> > > > > > > >> mirror-makers >> > > > > > > >> > > in >> > > > > > > >> > > > > > > between >> > > > > > > >> > > > > > > > > > > clusters >> > > > > > > >> > > > > > > > > > > > emit audit events. The problem is >> that >> > the >> > > > > very >> > > > > > > last >> > > > > > > >> > > > cluster >> > > > > > > >> > > > > in >> > > > > > > >> > > > > > > the >> > > > > > > >> > > > > > > > > > > > pipeline will not have any audit >> which >> > is >> > > > why >> > > > > we >> > > > > > > >> need >> > > > > > > >> > to >> > > > > > > >> > > > have >> > > > > > > >> > > > > > > > > something >> > > > > > > >> > > > > > > > > > > to >> > > > > > > >> > > > > > > > > > > > audit the cluster. >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > If we had a custom message validator >> > then >> > > > the >> > > > > > > audit >> > > > > > > >> can >> > > > > > > >> > > be >> > > > > > > >> > > > > done >> > > > > > > >> > > > > > > > > > > on-arrival >> > > > > > > >> > > > > > > > > > > > and we won't need a console auditor. >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > One potential issue in this approach >> and >> > > any >> > > > > > > >> elaborate >> > > > > > > >> > > > > > on-arrival >> > > > > > > >> > > > > > > > > > > > processing for that matter is that >> you >> > may >> > > > > need >> > > > > > to >> > > > > > > >> > > > > deserialize >> > > > > > > >> > > > > > > the >> > > > > > > >> > > > > > > > > > > message >> > > > > > > >> > > > > > > > > > > > as well which can drive up produce >> > request >> > > > > > > handling >> > > > > > > >> > > times. >> > > > > > > >> > > > > > > However >> > > > > > > >> > > > > > > > > I'm >> > > > > > > >> > > > > > > > > > > not >> > > > > > > >> > > > > > > > > > > > terribly concerned about that >> especially >> > > if >> > > > > the >> > > > > > > >> audit >> > > > > > > >> > > > header >> > > > > > > >> > > > > > can >> > > > > > > >> > > > > > > be >> > > > > > > >> > > > > > > > > > > > separated out easily or even >> > deserialized >> > > > > > > partially >> > > > > > > >> as >> > > > > > > >> > > this >> > > > > > > >> > > > > > Avro >> > > > > > > >> > > > > > > > > thread >> > > > > > > >> > > > > > > > > > > > touches on >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+ >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > Thanks, >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > Joel >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > On Mon, Jan 25, 2016 at 12:02 PM, >> > Mayuresh >> > > > > > Gharat >> > > > > > > < >> > > > > > > >> > > > > > > > > > > > [email protected]> wrote: >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > Nice KIP. Excellent idea. >> > > > > > > >> > > > > > > > > > > > > Was just thinking if we can add >> > > onDequed() >> > > > > to >> > > > > > > the >> > > > > > > >> > > > > > > > > ProducerIterceptor >> > > > > > > >> > > > > > > > > > > > > interface. Since we have the >> > > onEnqueued(), >> > > > > it >> > > > > > > will >> > > > > > > >> > help >> > > > > > > >> > > > the >> > > > > > > >> > > > > > > > client >> > > > > > > >> > > > > > > > > or >> > > > > > > >> > > > > > > > > > > the >> > > > > > > >> > > > > > > > > > > > > tools to know how much time the >> > message >> > > > > spent >> > > > > > in >> > > > > > > >> the >> > > > > > > >> > > > > > > > > > RecordAccumulator. >> > > > > > > >> > > > > > > > > > > > > Also an API to check if there are >> any >> > > > > messages >> > > > > > > >> left >> > > > > > > >> > > for a >> > > > > > > >> > > > > > > > > particular >> > > > > > > >> > > > > > > > > > > > topic >> > > > > > > >> > > > > > > > > > > > > in the RecordAccumulator would >> help. >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > Thanks, >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > Mayuresh >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:29 AM, >> Todd >> > > > > Palino >> > > > > > < >> > > > > > > >> > > > > > > [email protected] >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > > > wrote: >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > Great idea. I’ve been talking >> about >> > > this >> > > > > > for 2 >> > > > > > > >> > years, >> > > > > > > >> > > > and >> > > > > > > >> > > > > > I’m >> > > > > > > >> > > > > > > > > glad >> > > > > > > >> > > > > > > > > > > > > someone >> > > > > > > >> > > > > > > > > > > > > > is finally picking it up. Will >> take >> > a >> > > > look >> > > > > > at >> > > > > > > >> the >> > > > > > > >> > KIP >> > > > > > > >> > > > at >> > > > > > > >> > > > > > some >> > > > > > > >> > > > > > > > > point >> > > > > > > >> > > > > > > > > > > > > > shortly. >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > -Todd >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, >> > Jay >> > > > > Kreps >> > > > > > < >> > > > > > > >> > > > > > > [email protected]> >> > > > > > > >> > > > > > > > > > > wrote: >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > Hey Becket, >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > Yeah this is really similar to >> the >> > > > > > callback. >> > > > > > > >> The >> > > > > > > >> > > > > > difference >> > > > > > > >> > > > > > > > is >> > > > > > > >> > > > > > > > > > > really >> > > > > > > >> > > > > > > > > > > > > in >> > > > > > > >> > > > > > > > > > > > > > > who sets the behavior. The >> idea of >> > > the >> > > > > > > >> > interceptor >> > > > > > > >> > > is >> > > > > > > >> > > > > > that >> > > > > > > >> > > > > > > it >> > > > > > > >> > > > > > > > > > > doesn't >> > > > > > > >> > > > > > > > > > > > > > > require any code change in >> apps so >> > > you >> > > > > can >> > > > > > > >> > globally >> > > > > > > >> > > > add >> > > > > > > >> > > > > > > > > behavior >> > > > > > > >> > > > > > > > > > to >> > > > > > > >> > > > > > > > > > > > > your >> > > > > > > >> > > > > > > > > > > > > > > Kafka usage without changing >> app >> > > code. >> > > > > > > Whereas >> > > > > > > >> > the >> > > > > > > >> > > > > > callback >> > > > > > > >> > > > > > > > is >> > > > > > > >> > > > > > > > > > > added >> > > > > > > >> > > > > > > > > > > > by >> > > > > > > >> > > > > > > > > > > > > > the >> > > > > > > >> > > > > > > > > > > > > > > app. The idea is to kind of >> > obviate >> > > > the >> > > > > > need >> > > > > > > >> for >> > > > > > > >> > > the >> > > > > > > >> > > > > > > wrapper >> > > > > > > >> > > > > > > > > code >> > > > > > > >> > > > > > > > > > > > that >> > > > > > > >> > > > > > > > > > > > > > e.g. >> > > > > > > >> > > > > > > > > > > > > > > LinkedIn maintains to hold this >> > kind >> > > > of >> > > > > > > stuff. >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > -Jay >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > On Sun, Jan 24, 2016 at 4:21 >> PM, >> > > > Becket >> > > > > > Qin >> > > > > > > < >> > > > > > > >> > > > > > > > > > [email protected]> >> > > > > > > >> > > > > > > > > > > > > > wrote: >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > This could be a useful >> feature. >> > > And >> > > > I >> > > > > > > think >> > > > > > > >> > there >> > > > > > > >> > > > are >> > > > > > > >> > > > > > > some >> > > > > > > >> > > > > > > > > use >> > > > > > > >> > > > > > > > > > > > cases >> > > > > > > >> > > > > > > > > > > > > to >> > > > > > > >> > > > > > > > > > > > > > > > mutate the data like rejected >> > > > > > alternative >> > > > > > > >> one >> > > > > > > >> > > > > > mentioned. >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > I am wondering if there is >> > > > functional >> > > > > > > >> > overlapping >> > > > > > > >> > > > > > between >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > ProducerInterceptor.onAcknowledgement() >> > > > > > > and >> > > > > > > >> the >> > > > > > > >> > > > > > producer >> > > > > > > >> > > > > > > > > > > callback? >> > > > > > > >> > > > > > > > > > > > I >> > > > > > > >> > > > > > > > > > > > > > can >> > > > > > > >> > > > > > > > > > > > > > > > see that the Callback could >> be a >> > > per >> > > > > > > record >> > > > > > > >> > > setting >> > > > > > > >> > > > > > while >> > > > > > > >> > > > > > > > > > > > > > > > onAcknowledgement() is a >> > producer >> > > > > level >> > > > > > > >> > setting. >> > > > > > > >> > > > > Other >> > > > > > > >> > > > > > > than >> > > > > > > >> > > > > > > > > > that, >> > > > > > > >> > > > > > > > > > > > is >> > > > > > > >> > > > > > > > > > > > > > > there >> > > > > > > >> > > > > > > > > > > > > > > > any difference between them? >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > Thanks, >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > Jiangjie (Becket) Qin >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:21 >> PM, >> > > > Neha >> > > > > > > >> Narkhede >> > > > > > > >> > < >> > > > > > > >> > > > > > > > > > > [email protected]> >> > > > > > > >> > > > > > > > > > > > > > > wrote: >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > James, >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > That is one of the many >> > > monitoring >> > > > > use >> > > > > > > >> cases >> > > > > > > >> > > for >> > > > > > > >> > > > > the >> > > > > > > >> > > > > > > > > > > interceptor >> > > > > > > >> > > > > > > > > > > > > > > > interface. >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > Thanks, >> > > > > > > >> > > > > > > > > > > > > > > > > Neha >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at >> 6:18 >> > PM, >> > > > > James >> > > > > > > >> Cheng >> > > > > > > >> > < >> > > > > > > >> > > > > > > > > > [email protected]> >> > > > > > > >> > > > > > > > > > > > > > wrote: >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > Anna, >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > I'm trying to understand >> a >> > > > > concrete >> > > > > > > use >> > > > > > > >> > case. >> > > > > > > >> > > > It >> > > > > > > >> > > > > > > sounds >> > > > > > > >> > > > > > > > > > like >> > > > > > > >> > > > > > > > > > > > > > producer >> > > > > > > >> > > > > > > > > > > > > > > > > > interceptors could be >> used >> > to >> > > > > > > implement >> > > > > > > >> > part >> > > > > > > >> > > of >> > > > > > > >> > > > > > > > > LinkedIn's >> > > > > > > >> > > > > > > > > > > > Kafak >> > > > > > > >> > > > > > > > > > > > > > > Audit >> > > > > > > >> > > > > > > > > > > > > > > > > > tool? >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > >> > https://engineering.linkedin.com/kafka/running-kafka-scale >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > Part of that is done by a >> > > > wrapper >> > > > > > > >> library >> > > > > > > >> > > > around >> > > > > > > >> > > > > > the >> > > > > > > >> > > > > > > > > kafka >> > > > > > > >> > > > > > > > > > > > > producer >> > > > > > > >> > > > > > > > > > > > > > > > that >> > > > > > > >> > > > > > > > > > > > > > > > > > keeps a count of the >> number >> > of >> > > > > > > messages >> > > > > > > >> > > > produced, >> > > > > > > >> > > > > > and >> > > > > > > >> > > > > > > > > then >> > > > > > > >> > > > > > > > > > > > sends >> > > > > > > >> > > > > > > > > > > > > > that >> > > > > > > >> > > > > > > > > > > > > > > > > count >> > > > > > > >> > > > > > > > > > > > > > > > > > to a side-topic. It >> sounds >> > > like >> > > > > the >> > > > > > > >> > producer >> > > > > > > >> > > > > > > > interceptors >> > > > > > > >> > > > > > > > > > > could >> > > > > > > >> > > > > > > > > > > > > > > > possibly >> > > > > > > >> > > > > > > > > > > > > > > > > be >> > > > > > > >> > > > > > > > > > > > > > > > > > used to implement that? >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > -James >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > On Jan 22, 2016, at >> 4:33 >> > PM, >> > > > > Anna >> > > > > > > >> > Povzner < >> > > > > > > >> > > > > > > > > > > [email protected] >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > wrote: >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > Hi, >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > I just created a KIP-42 >> > for >> > > > > adding >> > > > > > > >> > producer >> > > > > > > >> > > > and >> > > > > > > >> > > > > > > > > consumer >> > > > > > > >> > > > > > > > > > > > > > > interceptors >> > > > > > > >> > > > > > > > > > > > > > > > > for >> > > > > > > >> > > > > > > > > > > > > > > > > > > intercepting messages >> at >> > > > > different >> > > > > > > >> points >> > > > > > > >> > > on >> > > > > > > >> > > > > > > producer >> > > > > > > >> > > > > > > > > and >> > > > > > > >> > > > > > > > > > > > > > consumer. >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > Comments and >> suggestions >> > are >> > > > > > > welcome! >> > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > Thanks, >> > > > > > > >> > > > > > > > > > > > > > > > > > > Anna >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > ________________________________ >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > This email and any >> > attachments >> > > > may >> > > > > > > >> contain >> > > > > > > >> > > > > > > confidential >> > > > > > > >> > > > > > > > > and >> > > > > > > >> > > > > > > > > > > > > > > privileged >> > > > > > > >> > > > > > > > > > > > > > > > > > material for the sole >> use of >> > > the >> > > > > > > >> intended >> > > > > > > >> > > > > > recipient. >> > > > > > > >> > > > > > > > Any >> > > > > > > >> > > > > > > > > > > > review, >> > > > > > > >> > > > > > > > > > > > > > > > copying, >> > > > > > > >> > > > > > > > > > > > > > > > > > or distribution of this >> > email >> > > > (or >> > > > > > any >> > > > > > > >> > > > > attachments) >> > > > > > > >> > > > > > by >> > > > > > > >> > > > > > > > > > others >> > > > > > > >> > > > > > > > > > > is >> > > > > > > >> > > > > > > > > > > > > > > > > prohibited. >> > > > > > > >> > > > > > > > > > > > > > > > > > If you are not the >> intended >> > > > > > recipient, >> > > > > > > >> > please >> > > > > > > >> > > > > > contact >> > > > > > > >> > > > > > > > the >> > > > > > > >> > > > > > > > > > > > sender >> > > > > > > >> > > > > > > > > > > > > > > > > > immediately and >> permanently >> > > > delete >> > > > > > > this >> > > > > > > >> > email >> > > > > > > >> > > > and >> > > > > > > >> > > > > > any >> > > > > > > >> > > > > > > > > > > > > attachments. >> > > > > > > >> > > > > > > > > > > > > > No >> > > > > > > >> > > > > > > > > > > > > > > > > > employee or agent of TiVo >> > Inc. >> > > > is >> > > > > > > >> > authorized >> > > > > > > >> > > to >> > > > > > > >> > > > > > > > conclude >> > > > > > > >> > > > > > > > > > any >> > > > > > > >> > > > > > > > > > > > > > binding >> > > > > > > >> > > > > > > > > > > > > > > > > > agreement on behalf of >> TiVo >> > > Inc. >> > > > > by >> > > > > > > >> email. >> > > > > > > >> > > > > Binding >> > > > > > > >> > > > > > > > > > agreements >> > > > > > > >> > > > > > > > > > > > > with >> > > > > > > >> > > > > > > > > > > > > > > TiVo >> > > > > > > >> > > > > > > > > > > > > > > > > > Inc. may only be made by >> a >> > > > signed >> > > > > > > >> written >> > > > > > > >> > > > > > agreement. >> > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > -- >> > > > > > > >> > > > > > > > > > > > > > > > > Thanks, >> > > > > > > >> > > > > > > > > > > > > > > > > Neha >> > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > -- >> > > > > > > >> > > > > > > > > > > > > > *—-* >> > > > > > > >> > > > > > > > > > > > > > *Todd Palino* >> > > > > > > >> > > > > > > > > > > > > > Staff Site Reliability Engineer >> > > > > > > >> > > > > > > > > > > > > > Data Infrastructure Streaming >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > linkedin.com/in/toddpalino >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > -- >> > > > > > > >> > > > > > > > > > > > > -Regards, >> > > > > > > >> > > > > > > > > > > > > Mayuresh R. Gharat >> > > > > > > >> > > > > > > > > > > > > (862) 250-7125 >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > -- >> > > > > > > >> > > > > > > > > > Thanks, >> > > > > > > >> > > > > > > > > > Neha >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > -- >> > > > > > > >> > > > > > > > *—-* >> > > > > > > >> > > > > > > > *Todd Palino* >> > > > > > > >> > > > > > > > Staff Site Reliability Engineer >> > > > > > > >> > > > > > > > Data Infrastructure Streaming >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > linkedin.com/in/toddpalino >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > -- >> > > > > > > >> > > > > > -Regards, >> > > > > > > >> > > > > > Mayuresh R. Gharat >> > > > > > > >> > > > > > (862) 250-7125 >> > > > > > > >> > > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > -- >> > > > > > > >> > > > *—-* >> > > > > > > >> > > > *Todd Palino* >> > > > > > > >> > > > Staff Site Reliability Engineer >> > > > > > > >> > > > Data Infrastructure Streaming >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > linkedin.com/in/toddpalino >> > > > > > > >> > > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > -- >> > > > > > > >> > > -Regards, >> > > > > > > >> > > Mayuresh R. Gharat >> > > > > > > >> > > (862) 250-7125 >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> -- >> > > > > > > >> -Regards, >> > > > > > > >> Mayuresh R. Gharat >> > > > > > > >> (862) 250-7125 >> > > > > > > >> >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > Thanks, >> > > > > > Neha >> > > > > > >> > > > > >> > > > >> > > >> > >> >> >> >> -- >> *—-* >> *Todd Palino* >> Staff Site Reliability Engineer >> Data Infrastructure Streaming >> >> >> >> linkedin.com/in/toddpalino >> > >
