Hi anna,

Agreed, its difficult to match which call of onDequeued() corresponds to
onEnqueued(), since ProducerRecord object is not available anymore at the
time the batch is dequeued.

We currently always update the lastAppendTime whenever we append a record
to a batch. We can use that and include it at record level for each record
separately whenever it gets appended in tryAppend().

Also this might be orthogonal, but is there a plan to expose an API to
check if we have messages in RecordAccumulator for a particular topic? I
have a usecase that I can discuss if we plan for this API.

Thanks,

Mayuresh


On Mon, Jan 25, 2016 at 4:49 PM, Anna Povzner <a...@confluent.io> wrote:

> Hi Mayuresh,
>
> I agree that onDequeue() callback in ProducerInterceptor would be useful.
> The API of this callback needs discussion. I propose the following API:
>
> public void onDequeued(TopicPartition tp, long appendTime, int attempts,
> bool moreQueued);
>
> This callback will be called for every record in RecordBatch from
> RecordAccumulator.drain() method. This API will require keeping appendTime
> of every record in RecordBatch.Thunk. An alternative way for the
> interceptor to calculate time spent in the accumulator is to record append
> time itself on onEnqueued() and then record dequeued time in OnDequeued().
> However, there is no way to match which call of onDequeued() corresponds to
> onEnqueued(), since ProducerRecord object is not available anymore at the
> time the batch is dequeued.
>
> Feedback on ProducerInterceptor.onDequeued() API is welcome.
>
> Thanks,
> Anna
>
>
>
>
> On Mon, Jan 25, 2016 at 1:37 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
>
> > I'm definitely in favor of having such hooks in the produce/consume
> > life-cycle. Not sure if people remember this but in Kafka 0.7 this was
> > pretty much how it was:
> >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > i.e., we had something similar to the interceptor proposal for various
> > stages of the producer request. The producer provided call-backs for
> > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at
> > LinkedIn we in fact did auditing within these call-backs (and not
> > explicitly in the wrapper). Over time and with 0.8 we moved that out to
> the
> > wrapper libraries.
> >
> > On a side-note while audit and other monitoring can be done internally
> in a
> > convenient way I think it should be clarified that having a wrapper is in
> > general not a bad idea and I would even consider it to be a
> best-practice.
> > Even with 0.7 we still had a wrapper library and that API has largely
> > stayed the same and has helped protect against (sometimes backwards
> > incompatible) changes in open source.
> >
> > While we are on this topic I have one comment and Anna, you may have
> > already considered this but I don't see mention of it in the KIP:
> >
> > Add a custom message interceptor/validator on the broker on message
> > arrival.
> >
> > We decompress and do basic validation of messages on arrival. I think
> there
> > is value in supporting custom validation and expand it to support custom
> > on-arrival processing. Here is a specific use-case I have in mind. The
> blog
> > that James referenced describes our auditing infrastructure. In order to
> > audit the Kafka cluster itself we need to run a "console auditor" service
> > that consumes everything and spits out audit events back to the cluster.
> I
> > would prefer not having to run this service because:
> >
> >    - Well, it is one more service that we have to run and monitor
> >    - Consuming everything takes up bandwidth which can be avoided
> >    - The console auditor consumer itself can lag and cause temporary
> audit
> >    discrepancies
> >
> > One way we can mitigate this is by having mirror-makers in between
> clusters
> > emit audit events. The problem is that the very last cluster in the
> > pipeline will not have any audit which is why we need to have something
> to
> > audit the cluster.
> >
> > If we had a custom message validator then the audit can be done
> on-arrival
> > and we won't need a console auditor.
> >
> > One potential issue in this approach and any elaborate on-arrival
> > processing for that matter is that you may need to deserialize the
> message
> > as well which can drive up produce request handling times. However I'm
> not
> > terribly concerned about that especially if the audit header can be
> > separated out easily or even deserialized partially as this Avro thread
> > touches on
> >
> >
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
> >
> > Thanks,
> >
> > Joel
> >
> > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Nice KIP. Excellent idea.
> > > Was just thinking if we can add onDequed() to the ProducerIterceptor
> > > interface. Since we have the onEnqueued(), it will help the client or
> the
> > > tools to know how much time the message spent in the RecordAccumulator.
> > > Also an API to check if there are any messages left for a particular
> > topic
> > > in the RecordAccumulator would help.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino <tpal...@gmail.com>
> wrote:
> > >
> > > > Great idea. I’ve been talking about this for 2 years, and I’m glad
> > > someone
> > > > is finally picking it up. Will take a look at the KIP at some point
> > > > shortly.
> > > >
> > > > -Todd
> > > >
> > > >
> > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps <j...@confluent.io>
> wrote:
> > > >
> > > > > Hey Becket,
> > > > >
> > > > > Yeah this is really similar to the callback. The difference is
> really
> > > in
> > > > > who sets the behavior. The idea of the interceptor is that it
> doesn't
> > > > > require any code change in apps so you can globally add behavior to
> > > your
> > > > > Kafka usage without changing app code. Whereas the callback is
> added
> > by
> > > > the
> > > > > app. The idea is to kind of obviate the need for the wrapper code
> > that
> > > > e.g.
> > > > > LinkedIn maintains to hold this kind of stuff.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin <becket....@gmail.com>
> > > > wrote:
> > > > >
> > > > > > This could be a useful feature. And I think there are some use
> > cases
> > > to
> > > > > > mutate the data like rejected alternative one mentioned.
> > > > > >
> > > > > > I am wondering if there is functional overlapping between
> > > > > > ProducerInterceptor.onAcknowledgement() and the producer
> callback?
> > I
> > > > can
> > > > > > see that the Callback could be a per record setting while
> > > > > > onAcknowledgement() is a producer level setting. Other than that,
> > is
> > > > > there
> > > > > > any difference between them?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede <
> n...@confluent.io>
> > > > > wrote:
> > > > > >
> > > > > > > James,
> > > > > > >
> > > > > > > That is one of the many monitoring use cases for the
> interceptor
> > > > > > interface.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > >
> > > > > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng <jch...@tivo.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Anna,
> > > > > > > >
> > > > > > > > I'm trying to understand a concrete use case. It sounds like
> > > > producer
> > > > > > > > interceptors could be used to implement part of LinkedIn's
> > Kafak
> > > > > Audit
> > > > > > > > tool?
> > https://engineering.linkedin.com/kafka/running-kafka-scale
> > > > > > > >
> > > > > > > > Part of that is done by a wrapper library around the kafka
> > > producer
> > > > > > that
> > > > > > > > keeps a count of the number of messages produced, and then
> > sends
> > > > that
> > > > > > > count
> > > > > > > > to a side-topic. It sounds like the producer interceptors
> could
> > > > > > possibly
> > > > > > > be
> > > > > > > > used to implement that?
> > > > > > > >
> > > > > > > > -James
> > > > > > > >
> > > > > > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner <
> a...@confluent.io
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > I just created a KIP-42 for adding producer and consumer
> > > > > interceptors
> > > > > > > for
> > > > > > > > > intercepting messages at different points on producer and
> > > > consumer.
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > > > > > > >
> > > > > > > > > Comments and suggestions are welcome!
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Anna
> > > > > > > >
> > > > > > > >
> > > > > > > > ________________________________
> > > > > > > >
> > > > > > > > This email and any attachments may contain confidential and
> > > > > privileged
> > > > > > > > material for the sole use of the intended recipient. Any
> > review,
> > > > > > copying,
> > > > > > > > or distribution of this email (or any attachments) by others
> is
> > > > > > > prohibited.
> > > > > > > > If you are not the intended recipient, please contact the
> > sender
> > > > > > > > immediately and permanently delete this email and any
> > > attachments.
> > > > No
> > > > > > > > employee or agent of TiVo Inc. is authorized to conclude any
> > > > binding
> > > > > > > > agreement on behalf of TiVo Inc. by email. Binding agreements
> > > with
> > > > > TiVo
> > > > > > > > Inc. may only be made by a signed written agreement.
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > *—-*
> > > > *Todd Palino*
> > > > Staff Site Reliability Engineer
> > > > Data Infrastructure Streaming
> > > >
> > > >
> > > >
> > > > linkedin.com/in/toddpalino
> > > >
> > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to