Hi Becket,

The use-case for CRC is end-to-end audit, rather than checking whether a
single message is corrupt or not.

Regarding record size, I was thinking to extract record size from Record.
That will include header overhead as well. I think total record size will
tell users how much bandwidth their messages take. Since header is
relatively small and constant, users also will get an idea of their
key/value sizes.

Thanks,
Anna

On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin <becket....@gmail.com> wrote:

> I am +1 on #1.2 and #3.
>
> #2: Regarding CRC, I am not sure if users care about CRC. is there any
> specific use case? Currently we validate messages by calling ensureValid()
> to verify the checksum and throw exception if it does not match.
>
> Message size would be useful. We can add that to ConsumerRecord. Can you
> clarify the message size you are referring to? Does it include the message
> header overhead or not? From user's point of view, they probably don't care
> about header size.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede <n...@confluent.io> wrote:
>
> > Anna,
> >
> > Thanks for being diligent.
> >
> > +1 on #1.2 and sounds good on #3. I recommend adding checksum and size
> > fields to RecordMetadata and ConsumerRecord instead of exposing metadata
> > piecemeal in the interceptor APIs.
> >
> > Thanks,
> > Neha
> >
> > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner <a...@confluent.io> wrote:
> >
> > > Hi All,
> > >
> > > The KIP wiki page is now up-to-date with the scope we have agreed on:
> > > Producer and Consumer Interceptors with a minimal set of mutable API
> that
> > > are not dependent on producer and consumer internal implementation.
> > >
> > > I have few more API details that I would like to bring attention to
> > or/and
> > > discuss:
> > >
> > > 1. Handling exceptions
> > >
> > > Exceptions can provide an additional level of control. For example, we
> > can
> > > filter messages on consumer side or stop messages on producer if they
> > don’t
> > > have the right field.
> > >
> > > I see two options:
> > > 1.1. For callbacks that can mutate records (onSend and onConsume),
> > > propagate exceptions through the original calls (KafkaProducer.send()
> and
> > > KafkaConsumer.poll() respectively). For other callbacks, catch
> exception,
> > > log, and ignore.
> > > 1.2. Catch exceptions from all the interceptor callbacks and ignore.
> > >
> > > The issue with 1.1. is that it effectively changes KafkaProducer.send()
> > and
> > > KafkaConsumer.poll() API, since now they may throw exceptions that are
> > not
> > > documented in KafkaProducer/Consumer API. Another option is to allow to
> > > propagate some exceptions, and ignore others.
> > >
> > > I think our use-cases do not require propagating exceptions. So, I
> > propose
> > > option 1.2. Unless someone has suggestion/use-cases for propagating
> > > exceptions. Please let me know.
> > >
> > > 2. Intercepting record CRC and record size
> > >
> > > Since we decided not to add any intermediate callbacks (such as
> onEnqueue
> > > or onReceive) to interceptors, I think it is still valuable to
> intercept
> > > record CRC and record size in bytes for monitoring and audit use-cases.
> > >
> > > I propose to add checksum and size fields to RecordMetadata and
> > > ConsumerRecord. Another option would be to add them as parameters in
> > > onAcknowledgement() and onConsume() callbacks.
> > >
> > > 3. Callbacks that allow to modify records look as follows:
> > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
> > > ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
> > >
> > > This means that interceptors can potentially modify topic/partition in
> > > ProducerRecord and topic/partition/offset in ConsumerRecord. I propose
> > that
> > > it is up to the interceptor implementation to ensure that
> > topic/partition,
> > > etc is correct. KafkaProducer.send() will use topic, partition, key,
> and
> > > value from ProducerRecord returned from the onSend(). Similarly,
> > > ConsumerRecords returned from KafkaConsumer.poll() would be the ones
> > > returned from the interceptor.
> > >
> > >
> > >
> > > Please let me know if you have any suggestions or objections to the
> > above.
> > >
> > > Thanks,
> > > Anna
> > >
> > >
> > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner <a...@confluent.io>
> wrote:
> > >
> > > > Hi Mayuresh,
> > > >
> > > > I see why you would want to check for messages left in the
> > > > RecordAccumulator. However, I don't think this will completely solve
> > the
> > > > problem. Messages could be in-flight somewhere else, like in the
> > socket,
> > > or
> > > > there maybe in-flight messages on the consumer side of the
> MirrorMaker.
> > > So,
> > > > if we go the route of checking whether there are any in-flight
> messages
> > > for
> > > > topic deletion use-case, maybe it is better count them with onSend()
> > and
> > > > onAcknowledge() -- whether all messages sent were acknowledged. I
> also
> > > > think that it would be better to solve this without interceptors,
> such
> > as
> > > > fix error handling in this scenario. However, I do not have any good
> > > > proposal right now, so these are just general thoughts.
> > > >
> > > > Thanks,
> > > > Anna
> > > >
> > > >
> > > >
> > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat <
> > > > gharatmayures...@gmail.com> wrote:
> > > >
> > > >> Calling producer.flush(), flushes all the data. So this is OK. But
> > when
> > > >> you
> > > >> are running Mirror maker, I am not sure there is a way to flush()
> from
> > > >> outside.
> > > >>
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Mayuresh
> > > >>
> > > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin <becket....@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Mayuresh,
> > > >> >
> > > >> > Regarding your use case about mirror maker. Is it good enough as
> > long
> > > >> as we
> > > >> > know there is no message for the topic in the producer anymore? If
> > > that
> > > >> is
> > > >> > the case, call producer.flush() is sufficient.
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Jiangjie (Becket) Qin
> > > >> >
> > > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat <
> > > >> > gharatmayures...@gmail.com
> > > >> > > wrote:
> > > >> >
> > > >> > > Hi Anna,
> > > >> > >
> > > >> > > Thanks a lot for summarizing the discussion on this kip.
> > > >> > >
> > > >> > > It LGTM.
> > > >> > > This is really nice :
> > > >> > > We decided not to add any callbacks to producer and consumer
> > > >> > > interceptors that will depend on internal implementation as part
> > of
> > > >> this
> > > >> > > KIP.
> > > >> > > *However, it is possible to add them later as part of another
> KIP
> > if
> > > >> > there
> > > >> > > are good use-cases.*
> > > >> > >
> > > >> > > Do you agree with the use case I explained earlier for knowing
> the
> > > >> number
> > > >> > > of records left in the RecordAccumulator for a particular topic.
> > It
> > > >> might
> > > >> > > be orthogonal to this KIP, but will be helpful. What do you
> think?
> > > >> > >
> > > >> > > Thanks,
> > > >> > >
> > > >> > > Mayuresh
> > > >> > >
> > > >> > >
> > > >> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino <tpal...@gmail.com
> >
> > > >> wrote:
> > > >> > >
> > > >> > > > This looks good. As noted, having one mutable interceptor on
> > each
> > > >> side
> > > >> > > > allows for the use cases we can envision right now, and I
> think
> > > >> that’s
> > > >> > > > going to provide a great deal of opportunity for implementing
> > > things
> > > >> > like
> > > >> > > > audit, especially within a multi-tenant environment. Looking
> > > >> forward to
> > > >> > > > getting this available in the clients.
> > > >> > > >
> > > >> > > > Thanks!
> > > >> > > >
> > > >> > > > -Todd
> > > >> > > >
> > > >> > > >
> > > >> > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner <
> > a...@confluent.io>
> > > >> > wrote:
> > > >> > > >
> > > >> > > > > Hi All,
> > > >> > > > >
> > > >> > > > > Here is meeting notes from today’s KIP meeting:
> > > >> > > > >
> > > >> > > > > 1. We agreed to keep the scope of this KIP to be producer
> and
> > > >> > consumer
> > > >> > > > > interceptors only. Broker-side interceptor will be added
> later
> > > as
> > > >> a
> > > >> > > > > separate KIP. The reasons were already mentioned in this
> > thread,
> > > >> but
> > > >> > > the
> > > >> > > > > summary is:
> > > >> > > > >  * Broker interceptor is riskier and requires careful
> > > >> consideration
> > > >> > > about
> > > >> > > > > overheads, whether to intercept leaders vs.
> leaders/replicas,
> > > >> what to
> > > >> > > do
> > > >> > > > on
> > > >> > > > > leader failover and so on.
> > > >> > > > >  * Broker interceptors increase monitoring resolution, but
> not
> > > >> > > including
> > > >> > > > it
> > > >> > > > > in this KIP does not reduce usefulness of producer and
> > consumer
> > > >> > > > > interceptors that enable end-to-end monitoring
> > > >> > > > >
> > > >> > > > > 2. We agreed to scope ProducerInterceptor and
> > > ConsumerInterceptor
> > > >> > > > callbacks
> > > >> > > > > to minimal set of mutable API that are not dependent on
> > producer
> > > >> and
> > > >> > > > > consumer internal implementation.
> > > >> > > > >
> > > >> > > > > ProducerInterceptor:
> > > >> > > > > *ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);*
> > > >> > > > > *void onAcknowledgement(RecordMetadata metadata, Exception
> > > >> > exception);*
> > > >> > > > >
> > > >> > > > > ConsumerInterceptor:
> > > >> > > > > *ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V>
> > > records);*
> > > >> > > > > *void onCommit(Map<TopicPartition, OffsetAndMetadata>
> > offsets);*
> > > >> > > > >
> > > >> > > > > We will allow interceptors to modify ProducerRecord on
> > producer
> > > >> side,
> > > >> > > and
> > > >> > > > > modify ConsumerRecords on consumer side. This will support
> > > >> end-to-end
> > > >> > > > > monitoring and auditing and support the ability to add
> > metadata
> > > >> for a
> > > >> > > > > message. This will support Todd’s Auditing and Routing
> > > use-cases.
> > > >> > > > >
> > > >> > > > > We did not find any use-case for modifying records in
> > > onConsume()
> > > >> > > > callback,
> > > >> > > > > but decided to enable modification of consumer records for
> > > >> symmetry
> > > >> > > with
> > > >> > > > > onSend().
> > > >> > > > >
> > > >> > > > > 3. We agreed to ensure compatibility when/if we add new
> > methods
> > > to
> > > >> > > > > ProducerInterceptor and ConsumerInterceptor by using default
> > > >> methods
> > > >> > > with
> > > >> > > > > an empty implementation. Ok to assume Java 8. (This is
> > Ismael’s
> > > >> > method
> > > >> > > > #2).
> > > >> > > > >
> > > >> > > > > 4. We decided not to add any callbacks to producer and
> > consumer
> > > >> > > > > interceptors that will depend on internal implementation as
> > part
> > > >> of
> > > >> > > this
> > > >> > > > > KIP. However, it is possible to add them later as part of
> > > another
> > > >> KIP
> > > >> > > if
> > > >> > > > > there are good use-cases.
> > > >> > > > >
> > > >> > > > > *Reasoning.* We did not have concrete use-cases that
> justified
> > > >> more
> > > >> > > > methods
> > > >> > > > > at this point. Some of the use-cases were for more
> fine-grain
> > > >> latency
> > > >> > > > > collection, which could be done with Kafka Metrics. Another
> > > >> use-case
> > > >> > > was
> > > >> > > > > encryption. However, there are several design options for
> > > >> encryption.
> > > >> > > One
> > > >> > > > > is to do per-record encryption which would require adding
> > > >> > > > > ProducerInterceptor.onEnqueued() and
> > > >> ConsumerInterceptor.onReceive().
> > > >> > > One
> > > >> > > > > could argue that in that case encryption could be done by
> > > adding a
> > > >> > > custom
> > > >> > > > > serializer/deserializer. Another option is to do encryption
> > > after
> > > >> > > message
> > > >> > > > > gets compressed, but there are issues that arise regarding
> > > broker
> > > >> > doing
> > > >> > > > > re-compression. We decided that it is better to have that
> > > >> discussion
> > > >> > > in a
> > > >> > > > > separate KIP and decide that this is something we want to do
> > > with
> > > >> > > > > interceptors or by other means.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > Todd, Mayuresh and others who missed the KIP meeting, please
> > let
> > > >> me
> > > >> > > know
> > > >> > > > > your thoughts on the scope we agreed on during the meeting.
> > > >> > > > >
> > > >> > > > > I will update the KIP proposal with the current decision by
> > end
> > > of
> > > >> > > today.
> > > >> > > > >
> > > >> > > > > Thanks,
> > > >> > > > > Anna
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat <
> > > >> > > > > gharatmayures...@gmail.com> wrote:
> > > >> > > > >
> > > >> > > > > > Hi,
> > > >> > > > > >
> > > >> > > > > > I won't be able to make it to KIP hangout due to conflict.
> > > >> > > > > >
> > > >> > > > > > Anna, here is the use case where knowing if there are
> > messages
> > > >> in
> > > >> > the
> > > >> > > > > > RecordAccumulator left to be sent to the kafka cluster
> for a
> > > >> topic
> > > >> > is
> > > >> > > > > > useful.
> > > >> > > > > >
> > > >> > > > > > 1) Consider a pipeline :
> > > >> > > > > > A ---> Mirror-maker -----> B
> > > >> > > > > >
> > > >> > > > > > 2) We have a topic T in cluster A mirrored to cluster B.
> > > >> > > > > >
> > > >> > > > > > 3) Now if we delete topic T in A and immediately proceed
> to
> > > >> delete
> > > >> > > the
> > > >> > > > > > topic in cluster B, some of the the Mirror-maker machines
> > die
> > > >> > because
> > > >> > > > > > atleast one of the batches in RecordAccumulator for topic
> T
> > > >> fail to
> > > >> > > be
> > > >> > > > > > produced to cluster B. We have seen this happening in our
> > > >> clusters.
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > If we know that there are no more messages left in the
> > > >> > > > RecordAccumulator
> > > >> > > > > to
> > > >> > > > > > be produced to cluster B, we can safely delete the topic
> in
> > > >> > cluster B
> > > >> > > > > > without disturbing the pipeline.
> > > >> > > > > >
> > > >> > > > > > Thanks,
> > > >> > > > > >
> > > >> > > > > > Mayuresh
> > > >> > > > > >
> > > >> > > > > > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner <
> > > >> a...@confluent.io>
> > > >> > > > > wrote:
> > > >> > > > > >
> > > >> > > > > > > Thanks Ismael and Todd for your feedback!
> > > >> > > > > > >
> > > >> > > > > > > I agree about coming up with lean, but useful interfaces
> > > that
> > > >> > will
> > > >> > > be
> > > >> > > > > > easy
> > > >> > > > > > > to extend later.
> > > >> > > > > > >
> > > >> > > > > > > When we discuss the minimal set of producer and consumer
> > > >> > > interceptor
> > > >> > > > > API
> > > >> > > > > > in
> > > >> > > > > > > today’s KIP meeting (discussion item #2 in my previous
> > > email),
> > > >> > lets
> > > >> > > > > > compare
> > > >> > > > > > > two options:
> > > >> > > > > > >
> > > >> > > > > > > *1. Minimal set of immutable API for producer and
> consumer
> > > >> > > > > interceptors*
> > > >> > > > > > >
> > > >> > > > > > > ProducerInterceptor:
> > > >> > > > > > > public void onSend(ProducerRecord<K, V> record);
> > > >> > > > > > > public void onAcknowledgement(RecordMetadata metadata,
> > > >> Exception
> > > >> > > > > > > exception);
> > > >> > > > > > >
> > > >> > > > > > > ConsumerInterceptor:
> > > >> > > > > > > public void onConsume(ConsumerRecords<K, V> records);
> > > >> > > > > > > public void onCommit(Map<TopicPartition,
> > OffsetAndMetadata>
> > > >> > > offsets);
> > > >> > > > > > >
> > > >> > > > > > > Use-cases:
> > > >> > > > > > > — end-to-end monitoring; custom tracing and logging
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > *2. Minimal set of mutable API for producer and consumer
> > > >> > > > interceptors*
> > > >> > > > > > >
> > > >> > > > > > > ProducerInterceptor:
> > > >> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V>
> record);
> > > >> > > > > > > void onAcknowledgement(RecordMetadata metadata,
> Exception
> > > >> > > exception);
> > > >> > > > > > >
> > > >> > > > > > > ConsumerInterceptor:
> > > >> > > > > > > void onConsume(ConsumerRecords<K, V> records);
> > > >> > > > > > > void onCommit(Map<TopicPartition, OffsetAndMetadata>
> > > offsets);
> > > >> > > > > > >
> > > >> > > > > > > Additional use-cases to #1:
> > > >> > > > > > > — Ability to add metadata to a message or fill in
> standard
> > > >> fields
> > > >> > > for
> > > >> > > > > > audit
> > > >> > > > > > > and routing.
> > > >> > > > > > >
> > > >> > > > > > > Implications
> > > >> > > > > > > — Partition assignment will be done based on modified
> > > >> key/value
> > > >> > > > instead
> > > >> > > > > > of
> > > >> > > > > > > original key/value. If key/value transformation is not
> > > >> consistent
> > > >> > > > (same
> > > >> > > > > > key
> > > >> > > > > > > and value does not mutate to the same, but modified,
> > > >> key/value),
> > > >> > > then
> > > >> > > > > log
> > > >> > > > > > > compaction would not work. However, audit and routing
> > > >> use-cases
> > > >> > > from
> > > >> > > > > Todd
> > > >> > > > > > > will likely do consistent transformation.
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > *Additional callbacks (discussion item #3 in my previous
> > > >> email):*
> > > >> > > > > > >
> > > >> > > > > > > If we want to support encryption, we would want to be
> able
> > > to
> > > >> > > modify
> > > >> > > > > > > serialized key/value, rather than key and value objects.
> > > This
> > > >> > will
> > > >> > > > add
> > > >> > > > > > the
> > > >> > > > > > > following API to producer and consumer interceptors:
> > > >> > > > > > >
> > > >> > > > > > > ProducerInterceptor:
> > > >> > > > > > > SerializedKeyValue onEnqueued(TopicPartition tp,
> > > >> > ProducerRecord<K,
> > > >> > > V>
> > > >> > > > > > > record, SerializedKeyValue serializedKeyValue);
> > > >> > > > > > >
> > > >> > > > > > > ConsumerInterceptor:
> > > >> > > > > > > SerializedKeyValue onReceive(TopicPartition tp,
> > > >> > SerializedKeyValue
> > > >> > > > > > > serializedKeyValue);
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > I am leaning towards implementing the minimal set of
> > > >> immutable or
> > > >> > > > > mutable
> > > >> > > > > > > interfaces, making sure that we have a compatibility
> plan
> > > that
> > > >> > > allows
> > > >> > > > > us
> > > >> > > > > > to
> > > >> > > > > > > add more callbacks in the future (per Ismael comment),
> and
> > > add
> > > >> > more
> > > >> > > > > APIs
> > > >> > > > > > > later. E.g., for encryption use-case, there could be an
> > > >> argument
> > > >> > in
> > > >> > > > > doing
> > > >> > > > > > > encryption after message compression vs. per-record
> > > encryption
> > > >> > that
> > > >> > > > > could
> > > >> > > > > > > be done using the above additional API. There is also
> more
> > > >> > > > implications
> > > >> > > > > > for
> > > >> > > > > > > every API that modifies records: modifying serialized
> > > >> key/value
> > > >> > > will
> > > >> > > > > > again
> > > >> > > > > > > impact partition assignment (we will likely do that
> after
> > > >> > partition
> > > >> > > > > > > assignment), which may impact log compaction and mirror
> > > maker
> > > >> > > > > > partitioning.
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > Thanks,
> > > >> > > > > > > Anna
> > > >> > > > > > >
> > > >> > > > > > > On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino <
> > > >> tpal...@gmail.com>
> > > >> > > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > Finally got a chance to take a look at this. I won’t
> be
> > > >> able to
> > > >> > > > make
> > > >> > > > > > the
> > > >> > > > > > > > KIP meeting due to a conflict.
> > > >> > > > > > > >
> > > >> > > > > > > > I’m somewhat disappointed in this proposal. I think
> that
> > > the
> > > >> > > > explicit
> > > >> > > > > > > > exclusion of modification of the messages is
> > > short-sighted,
> > > >> and
> > > >> > > not
> > > >> > > > > > > > accounting for it now is going to bite us later. Jay,
> > > aren’t
> > > >> > you
> > > >> > > > the
> > > >> > > > > > one
> > > >> > > > > > > > railing against public interfaces and how difficult
> they
> > > >> are to
> > > >> > > > work
> > > >> > > > > > with
> > > >> > > > > > > > when you don’t get them right? The “simple” change to
> > one
> > > of
> > > >> > > these
> > > >> > > > > > > > interfaces to make it able to return a record is going
> > to
> > > >> be a
> > > >> > > > > > > significant
> > > >> > > > > > > > change and is going to require all clients to rewrite
> > > their
> > > >> > > > > > interceptors.
> > > >> > > > > > > > If we’re not willing to put the time to think through
> > > >> > > manipulation
> > > >> > > > > now,
> > > >> > > > > > > > then this KIP should be shelved until we are.
> > Implementing
> > > >> > > > something
> > > >> > > > > > > > halfway is going to be worse than taking a little
> > longer.
> > > In
> > > >> > > > > addition,
> > > >> > > > > > I
> > > >> > > > > > > > don’t believe that manipulation requires anything more
> > > than
> > > >> > > > > > interceptors
> > > >> > > > > > > to
> > > >> > > > > > > > receive the full record, and then to return it.
> > > >> > > > > > > >
> > > >> > > > > > > > There are 3 use case I can think of right now without
> > any
> > > >> deep
> > > >> > > > > > discussion
> > > >> > > > > > > > that can make use of interceptors with modification:
> > > >> > > > > > > >
> > > >> > > > > > > > 1. Auditing. The ability to add metadata to a message
> > for
> > > >> > > auditing
> > > >> > > > is
> > > >> > > > > > > > critical. Hostname, service name, timestamps, etc. are
> > all
> > > >> > pieces
> > > >> > > > of
> > > >> > > > > > data
> > > >> > > > > > > > that can be used on the other side of the pipeline to
> > > >> > categorize
> > > >> > > > > > > messages,
> > > >> > > > > > > > determine loss and transport time, and pin down
> issues.
> > > You
> > > >> may
> > > >> > > say
> > > >> > > > > > that
> > > >> > > > > > > > these things can just be part of the message schema,
> but
> > > >> anyone
> > > >> > > who
> > > >> > > > > has
> > > >> > > > > > > > worked with a multi-user data system (especially those
> > who
> > > >> have
> > > >> > > > been
> > > >> > > > > > > > involved with LinkedIn) know how difficult it is to
> > > maintain
> > > >> > > > > consistent
> > > >> > > > > > > > message schemas and to get other people to put in
> fields
> > > for
> > > >> > your
> > > >> > > > > use.
> > > >> > > > > > > >
> > > >> > > > > > > > 2. Encryption. This is probably the most obvious case
> > for
> > > >> > record
> > > >> > > > > > > > manipulation on both sides. The ability to tie in end
> to
> > > end
> > > >> > > > > encryption
> > > >> > > > > > > is
> > > >> > > > > > > > important for data that requires external compliance
> > (PCI,
> > > >> > HIPAA,
> > > >> > > > > > etc.).
> > > >> > > > > > > >
> > > >> > > > > > > > 3. Routing. By being able to add a bit of information
> > > about
> > > >> the
> > > >> > > > > source
> > > >> > > > > > or
> > > >> > > > > > > > destination of a message to the metadata, you can
> easily
> > > >> > > construct
> > > >> > > > an
> > > >> > > > > > > > intelligent mirror maker that can prevent loops. This
> > has
> > > >> the
> > > >> > > > > > opportunity
> > > >> > > > > > > > to result in significant operational savings, as you
> can
> > > get
> > > >> > rid
> > > >> > > of
> > > >> > > > > the
> > > >> > > > > > > > need for tiered clusters in order to prevent loops in
> > > >> mirroring
> > > >> > > > > > messages.
> > > >> > > > > > > >
> > > >> > > > > > > > All three of these share the feature that they add
> > > metadata
> > > >> to
> > > >> > > > > > messages.
> > > >> > > > > > > > With the pushback on having arbitrary metadata as an
> > > >> “envelope”
> > > >> > > to
> > > >> > > > > the
> > > >> > > > > > > > message, this is a way to provide it and make it the
> > > >> > > responsibility
> > > >> > > > > of
> > > >> > > > > > > the
> > > >> > > > > > > > client, and not the Kafka broker and system itself.
> > > >> > > > > > > >
> > > >> > > > > > > > -Todd
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > On Tue, Jan 26, 2016 at 2:30 AM, Ismael Juma <
> > > >> > ism...@juma.me.uk>
> > > >> > > > > > wrote:
> > > >> > > > > > > >
> > > >> > > > > > > > > Hi Anna and Neha,
> > > >> > > > > > > > >
> > > >> > > > > > > > > I think it makes a lot of sense to try and keep the
> > > >> interface
> > > >> > > > lean
> > > >> > > > > > and
> > > >> > > > > > > to
> > > >> > > > > > > > > add more methods later when/if there is a need. What
> > is
> > > >> the
> > > >> > > > current
> > > >> > > > > > > > > thinking with regards to compatibility when/if we
> add
> > > new
> > > >> > > > methods?
> > > >> > > > > A
> > > >> > > > > > > few
> > > >> > > > > > > > > options come to mind:
> > > >> > > > > > > > >
> > > >> > > > > > > > > 1. Change the interface to an abstract class with
> > empty
> > > >> > > > > > implementations
> > > >> > > > > > > > for
> > > >> > > > > > > > > all the methods. This means that the path to adding
> > new
> > > >> > methods
> > > >> > > > is
> > > >> > > > > > > clear.
> > > >> > > > > > > > > 2. Hope we have moved to Java 8 by the time we need
> to
> > > add
> > > >> > new
> > > >> > > > > > methods
> > > >> > > > > > > > and
> > > >> > > > > > > > > use default methods with an empty implementation for
> > any
> > > >> new
> > > >> > > > method
> > > >> > > > > > > (and
> > > >> > > > > > > > > potentially make existing methods default methods
> too
> > at
> > > >> that
> > > >> > > > point
> > > >> > > > > > for
> > > >> > > > > > > > > consistency)
> > > >> > > > > > > > > 3. Introduce a new interface that inherits from the
> > > >> existing
> > > >> > > > > > > Interceptor
> > > >> > > > > > > > > interface when we need to add new methods.
> > > >> > > > > > > > >
> > > >> > > > > > > > > Option 1 is the easiest and it also means that
> > > interceptor
> > > >> > > users
> > > >> > > > > only
> > > >> > > > > > > > need
> > > >> > > > > > > > > to override the methods that they are interested
> (more
> > > >> useful
> > > >> > > if
> > > >> > > > > the
> > > >> > > > > > > > number
> > > >> > > > > > > > > of methods grows). The downside is that interceptor
> > > >> > > > implementations
> > > >> > > > > > > > cannot
> > > >> > > > > > > > > inherit from another class (a straightforward
> > workaround
> > > >> is
> > > >> > to
> > > >> > > > make
> > > >> > > > > > the
> > > >> > > > > > > > > interceptor a forwarder that calls another class).
> > Also,
> > > >> our
> > > >> > > > > existing
> > > >> > > > > > > > > callbacks are interfaces, so seems a bit
> inconsistent.
> > > >> > > > > > > > >
> > > >> > > > > > > > > Option 2 may be the most appealing one as both users
> > and
> > > >> > > > ourselves
> > > >> > > > > > > retain
> > > >> > > > > > > > > flexibility. The main downside is that it relies on
> us
> > > >> moving
> > > >> > > to
> > > >> > > > > Java
> > > >> > > > > > > 8,
> > > >> > > > > > > > > which may be more than a year away potentially (if
> we
> > > >> support
> > > >> > > the
> > > >> > > > > > last
> > > >> > > > > > > 2
> > > >> > > > > > > > > Java releases).
> > > >> > > > > > > > >
> > > >> > > > > > > > > Thoughts?
> > > >> > > > > > > > >
> > > >> > > > > > > > > Ismael
> > > >> > > > > > > > >
> > > >> > > > > > > > > On Tue, Jan 26, 2016 at 4:59 AM, Neha Narkhede <
> > > >> > > > n...@confluent.io>
> > > >> > > > > > > > wrote:
> > > >> > > > > > > > >
> > > >> > > > > > > > > > Anna,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > I'm also in favor of including just the APIs for
> > which
> > > >> we
> > > >> > > have
> > > >> > > > a
> > > >> > > > > > > clear
> > > >> > > > > > > > > use
> > > >> > > > > > > > > > case. If more use cases for finer monitoring show
> up
> > > in
> > > >> the
> > > >> > > > > future,
> > > >> > > > > > > we
> > > >> > > > > > > > > can
> > > >> > > > > > > > > > always update the interface. Would you please
> > > highlight
> > > >> in
> > > >> > > the
> > > >> > > > > KIP
> > > >> > > > > > > the
> > > >> > > > > > > > > APIs
> > > >> > > > > > > > > > that you think we have an immediate use for?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Joel,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Broker-side monitoring makes a lot of sense in the
> > > long
> > > >> > term
> > > >> > > > > > though I
> > > >> > > > > > > > > don't
> > > >> > > > > > > > > > think it is a requirement for end-to-end
> monitoring.
> > > >> With
> > > >> > the
> > > >> > > > > > > producer
> > > >> > > > > > > > > and
> > > >> > > > > > > > > > consumer interceptors, you have the ability to get
> > > full
> > > >> > > > > > > > > > publish-to-subscribe end-to-end monitoring. The
> > broker
> > > >> > > > > interceptor
> > > >> > > > > > > > > > certainly improves the resolution of monitoring
> but
> > it
> > > >> is
> > > >> > > also
> > > >> > > > a
> > > >> > > > > > > > riskier
> > > >> > > > > > > > > > change. I prefer an incremental approach over a
> > > big-bang
> > > >> > and
> > > >> > > > > > > recommend
> > > >> > > > > > > > > > taking baby-steps. Let's first make sure the
> > > >> > > producer/consumer
> > > >> > > > > > > > > interceptors
> > > >> > > > > > > > > > are successful. And then come back and add the
> > broker
> > > >> > > > interceptor
> > > >> > > > > > > > > > carefully.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Having said that, it would be great to understand
> > your
> > > >> > > proposal
> > > >> > > > > for
> > > >> > > > > > > the
> > > >> > > > > > > > > > broker interceptor independently. We can either
> add
> > an
> > > >> > > > > interceptor
> > > >> > > > > > > > > > on-append or on-commit. If people want to use this
> > for
> > > >> > > > > monitoring,
> > > >> > > > > > > then
> > > >> > > > > > > > > > possibly on-commit might be more useful?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Thanks,
> > > >> > > > > > > > > > Neha
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > On Mon, Jan 25, 2016 at 6:47 PM, Jay Kreps <
> > > >> > j...@confluent.io
> > > >> > > >
> > > >> > > > > > wrote:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > > Hey Joel,
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > What is the interface you are thinking of?
> > Something
> > > >> like
> > > >> > > > this:
> > > >> > > > > > > > > > >     onAppend(String topic, int partition,
> Records
> > > >> > records,
> > > >> > > > long
> > > >> > > > > > > time)
> > > >> > > > > > > > > > > ?
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > One challenge right now is that we are still
> using
> > > the
> > > >> > old
> > > >> > > > > > > > > > > Message/MessageSet classes on the broker which
> I'm
> > > not
> > > >> > sure
> > > >> > > > if
> > > >> > > > > > we'd
> > > >> > > > > > > > > want
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > support over the long haul but it might be okay
> > just
> > > >> to
> > > >> > > > create
> > > >> > > > > > the
> > > >> > > > > > > > > > records
> > > >> > > > > > > > > > > instance for this interface.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > -Jay
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy <
> > > >> > > > > > jjkosh...@gmail.com>
> > > >> > > > > > > > > > wrote:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > > I'm definitely in favor of having such hooks
> in
> > > the
> > > >> > > > > > > produce/consume
> > > >> > > > > > > > > > > > life-cycle. Not sure if people remember this
> but
> > > in
> > > >> > Kafka
> > > >> > > > 0.7
> > > >> > > > > > > this
> > > >> > > > > > > > > was
> > > >> > > > > > > > > > > > pretty much how it was:
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > > >> > > > > > > > > > > > i.e., we had something similar to the
> > interceptor
> > > >> > > proposal
> > > >> > > > > for
> > > >> > > > > > > > > various
> > > >> > > > > > > > > > > > stages of the producer request. The producer
> > > >> provided
> > > >> > > > > > call-backs
> > > >> > > > > > > > for
> > > >> > > > > > > > > > > > beforeEnqueue, afterEnqueue, afterDequeuing,
> > > >> > > beforeSending,
> > > >> > > > > > etc.
> > > >> > > > > > > So
> > > >> > > > > > > > > at
> > > >> > > > > > > > > > > > LinkedIn we in fact did auditing within these
> > > >> > call-backs
> > > >> > > > (and
> > > >> > > > > > not
> > > >> > > > > > > > > > > > explicitly in the wrapper). Over time and with
> > 0.8
> > > >> we
> > > >> > > moved
> > > >> > > > > > that
> > > >> > > > > > > > out
> > > >> > > > > > > > > to
> > > >> > > > > > > > > > > the
> > > >> > > > > > > > > > > > wrapper libraries.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > On a side-note while audit and other
> monitoring
> > > can
> > > >> be
> > > >> > > done
> > > >> > > > > > > > > internally
> > > >> > > > > > > > > > > in a
> > > >> > > > > > > > > > > > convenient way I think it should be clarified
> > that
> > > >> > > having a
> > > >> > > > > > > wrapper
> > > >> > > > > > > > > is
> > > >> > > > > > > > > > in
> > > >> > > > > > > > > > > > general not a bad idea and I would even
> consider
> > > it
> > > >> to
> > > >> > > be a
> > > >> > > > > > > > > > > best-practice.
> > > >> > > > > > > > > > > > Even with 0.7 we still had a wrapper library
> and
> > > >> that
> > > >> > API
> > > >> > > > has
> > > >> > > > > > > > largely
> > > >> > > > > > > > > > > > stayed the same and has helped protect against
> > > >> > (sometimes
> > > >> > > > > > > backwards
> > > >> > > > > > > > > > > > incompatible) changes in open source.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > While we are on this topic I have one comment
> > and
> > > >> Anna,
> > > >> > > you
> > > >> > > > > may
> > > >> > > > > > > > have
> > > >> > > > > > > > > > > > already considered this but I don't see
> mention
> > of
> > > >> it
> > > >> > in
> > > >> > > > the
> > > >> > > > > > KIP:
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > Add a custom message interceptor/validator on
> > the
> > > >> > broker
> > > >> > > on
> > > >> > > > > > > message
> > > >> > > > > > > > > > > > arrival.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > We decompress and do basic validation of
> > messages
> > > on
> > > >> > > > > arrival. I
> > > >> > > > > > > > think
> > > >> > > > > > > > > > > there
> > > >> > > > > > > > > > > > is value in supporting custom validation and
> > > expand
> > > >> it
> > > >> > to
> > > >> > > > > > support
> > > >> > > > > > > > > > custom
> > > >> > > > > > > > > > > > on-arrival processing. Here is a specific
> > > use-case I
> > > >> > have
> > > >> > > > in
> > > >> > > > > > > mind.
> > > >> > > > > > > > > The
> > > >> > > > > > > > > > > blog
> > > >> > > > > > > > > > > > that James referenced describes our auditing
> > > >> > > > infrastructure.
> > > >> > > > > In
> > > >> > > > > > > > order
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > > audit the Kafka cluster itself we need to run
> a
> > > >> > "console
> > > >> > > > > > auditor"
> > > >> > > > > > > > > > service
> > > >> > > > > > > > > > > > that consumes everything and spits out audit
> > > events
> > > >> > back
> > > >> > > to
> > > >> > > > > the
> > > >> > > > > > > > > > cluster.
> > > >> > > > > > > > > > > I
> > > >> > > > > > > > > > > > would prefer not having to run this service
> > > because:
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >    - Well, it is one more service that we have
> > to
> > > >> run
> > > >> > and
> > > >> > > > > > monitor
> > > >> > > > > > > > > > > >    - Consuming everything takes up bandwidth
> > which
> > > >> can
> > > >> > be
> > > >> > > > > > avoided
> > > >> > > > > > > > > > > >    - The console auditor consumer itself can
> lag
> > > and
> > > >> > > cause
> > > >> > > > > > > > temporary
> > > >> > > > > > > > > > > audit
> > > >> > > > > > > > > > > >    discrepancies
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > One way we can mitigate this is by having
> > > >> mirror-makers
> > > >> > > in
> > > >> > > > > > > between
> > > >> > > > > > > > > > > clusters
> > > >> > > > > > > > > > > > emit audit events. The problem is that the
> very
> > > last
> > > >> > > > cluster
> > > >> > > > > in
> > > >> > > > > > > the
> > > >> > > > > > > > > > > > pipeline will not have any audit which is why
> we
> > > >> need
> > > >> > to
> > > >> > > > have
> > > >> > > > > > > > > something
> > > >> > > > > > > > > > > to
> > > >> > > > > > > > > > > > audit the cluster.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > If we had a custom message validator then the
> > > audit
> > > >> can
> > > >> > > be
> > > >> > > > > done
> > > >> > > > > > > > > > > on-arrival
> > > >> > > > > > > > > > > > and we won't need a console auditor.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > One potential issue in this approach and any
> > > >> elaborate
> > > >> > > > > > on-arrival
> > > >> > > > > > > > > > > > processing for that matter is that you may
> need
> > to
> > > >> > > > > deserialize
> > > >> > > > > > > the
> > > >> > > > > > > > > > > message
> > > >> > > > > > > > > > > > as well which can drive up produce request
> > > handling
> > > >> > > times.
> > > >> > > > > > > However
> > > >> > > > > > > > > I'm
> > > >> > > > > > > > > > > not
> > > >> > > > > > > > > > > > terribly concerned about that especially if
> the
> > > >> audit
> > > >> > > > header
> > > >> > > > > > can
> > > >> > > > > > > be
> > > >> > > > > > > > > > > > separated out easily or even deserialized
> > > partially
> > > >> as
> > > >> > > this
> > > >> > > > > > Avro
> > > >> > > > > > > > > thread
> > > >> > > > > > > > > > > > touches on
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > Joel
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh
> > Gharat
> > > <
> > > >> > > > > > > > > > > > gharatmayures...@gmail.com> wrote:
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Nice KIP. Excellent idea.
> > > >> > > > > > > > > > > > > Was just thinking if we can add onDequed()
> to
> > > the
> > > >> > > > > > > > > ProducerIterceptor
> > > >> > > > > > > > > > > > > interface. Since we have the onEnqueued(),
> it
> > > will
> > > >> > help
> > > >> > > > the
> > > >> > > > > > > > client
> > > >> > > > > > > > > or
> > > >> > > > > > > > > > > the
> > > >> > > > > > > > > > > > > tools to know how much time the message
> spent
> > in
> > > >> the
> > > >> > > > > > > > > > RecordAccumulator.
> > > >> > > > > > > > > > > > > Also an API to check if there are any
> messages
> > > >> left
> > > >> > > for a
> > > >> > > > > > > > > particular
> > > >> > > > > > > > > > > > topic
> > > >> > > > > > > > > > > > > in the RecordAccumulator would help.
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Mayuresh
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:29 AM, Todd
> Palino
> > <
> > > >> > > > > > > tpal...@gmail.com
> > > >> > > > > > > > >
> > > >> > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > Great idea. I’ve been talking about this
> > for 2
> > > >> > years,
> > > >> > > > and
> > > >> > > > > > I’m
> > > >> > > > > > > > > glad
> > > >> > > > > > > > > > > > > someone
> > > >> > > > > > > > > > > > > > is finally picking it up. Will take a look
> > at
> > > >> the
> > > >> > KIP
> > > >> > > > at
> > > >> > > > > > some
> > > >> > > > > > > > > point
> > > >> > > > > > > > > > > > > > shortly.
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > -Todd
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay
> Kreps
> > <
> > > >> > > > > > > j...@confluent.io>
> > > >> > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > Hey Becket,
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > Yeah this is really similar to the
> > callback.
> > > >> The
> > > >> > > > > > difference
> > > >> > > > > > > > is
> > > >> > > > > > > > > > > really
> > > >> > > > > > > > > > > > > in
> > > >> > > > > > > > > > > > > > > who sets the behavior. The idea of the
> > > >> > interceptor
> > > >> > > is
> > > >> > > > > > that
> > > >> > > > > > > it
> > > >> > > > > > > > > > > doesn't
> > > >> > > > > > > > > > > > > > > require any code change in apps so you
> can
> > > >> > globally
> > > >> > > > add
> > > >> > > > > > > > > behavior
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > > > your
> > > >> > > > > > > > > > > > > > > Kafka usage without changing app code.
> > > Whereas
> > > >> > the
> > > >> > > > > > callback
> > > >> > > > > > > > is
> > > >> > > > > > > > > > > added
> > > >> > > > > > > > > > > > by
> > > >> > > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > app. The idea is to kind of obviate the
> > need
> > > >> for
> > > >> > > the
> > > >> > > > > > > wrapper
> > > >> > > > > > > > > code
> > > >> > > > > > > > > > > > that
> > > >> > > > > > > > > > > > > > e.g.
> > > >> > > > > > > > > > > > > > > LinkedIn maintains to hold this kind of
> > > stuff.
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > -Jay
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket
> > Qin
> > > <
> > > >> > > > > > > > > > becket....@gmail.com>
> > > >> > > > > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > This could be a useful feature. And I
> > > think
> > > >> > there
> > > >> > > > are
> > > >> > > > > > > some
> > > >> > > > > > > > > use
> > > >> > > > > > > > > > > > cases
> > > >> > > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > > mutate the data like rejected
> > alternative
> > > >> one
> > > >> > > > > > mentioned.
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > I am wondering if there is functional
> > > >> > overlapping
> > > >> > > > > > between
> > > >> > > > > > > > > > > > > > > >
> ProducerInterceptor.onAcknowledgement()
> > > and
> > > >> the
> > > >> > > > > > producer
> > > >> > > > > > > > > > > callback?
> > > >> > > > > > > > > > > > I
> > > >> > > > > > > > > > > > > > can
> > > >> > > > > > > > > > > > > > > > see that the Callback could be a per
> > > record
> > > >> > > setting
> > > >> > > > > > while
> > > >> > > > > > > > > > > > > > > > onAcknowledgement() is a producer
> level
> > > >> > setting.
> > > >> > > > > Other
> > > >> > > > > > > than
> > > >> > > > > > > > > > that,
> > > >> > > > > > > > > > > > is
> > > >> > > > > > > > > > > > > > > there
> > > >> > > > > > > > > > > > > > > > any difference between them?
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha
> > > >> Narkhede
> > > >> > <
> > > >> > > > > > > > > > > n...@confluent.io>
> > > >> > > > > > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > James,
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > That is one of the many monitoring
> use
> > > >> cases
> > > >> > > for
> > > >> > > > > the
> > > >> > > > > > > > > > > interceptor
> > > >> > > > > > > > > > > > > > > > interface.
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > > > > > > Neha
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:18 PM,
> James
> > > >> Cheng
> > > >> > <
> > > >> > > > > > > > > > jch...@tivo.com>
> > > >> > > > > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > Anna,
> > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > I'm trying to understand a
> concrete
> > > use
> > > >> > case.
> > > >> > > > It
> > > >> > > > > > > sounds
> > > >> > > > > > > > > > like
> > > >> > > > > > > > > > > > > > producer
> > > >> > > > > > > > > > > > > > > > > > interceptors could be used to
> > > implement
> > > >> > part
> > > >> > > of
> > > >> > > > > > > > > LinkedIn's
> > > >> > > > > > > > > > > > Kafak
> > > >> > > > > > > > > > > > > > > Audit
> > > >> > > > > > > > > > > > > > > > > > tool?
> > > >> > > > > > > > > > > >
> > > >> > > https://engineering.linkedin.com/kafka/running-kafka-scale
> > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > Part of that is done by a wrapper
> > > >> library
> > > >> > > > around
> > > >> > > > > > the
> > > >> > > > > > > > > kafka
> > > >> > > > > > > > > > > > > producer
> > > >> > > > > > > > > > > > > > > > that
> > > >> > > > > > > > > > > > > > > > > > keeps a count of the number of
> > > messages
> > > >> > > > produced,
> > > >> > > > > > and
> > > >> > > > > > > > > then
> > > >> > > > > > > > > > > > sends
> > > >> > > > > > > > > > > > > > that
> > > >> > > > > > > > > > > > > > > > > count
> > > >> > > > > > > > > > > > > > > > > > to a side-topic. It sounds like
> the
> > > >> > producer
> > > >> > > > > > > > interceptors
> > > >> > > > > > > > > > > could
> > > >> > > > > > > > > > > > > > > > possibly
> > > >> > > > > > > > > > > > > > > > > be
> > > >> > > > > > > > > > > > > > > > > > used to implement that?
> > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > -James
> > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > > On Jan 22, 2016, at 4:33 PM,
> Anna
> > > >> > Povzner <
> > > >> > > > > > > > > > > a...@confluent.io
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > > Hi,
> > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > > I just created a KIP-42 for
> adding
> > > >> > producer
> > > >> > > > and
> > > >> > > > > > > > > consumer
> > > >> > > > > > > > > > > > > > > interceptors
> > > >> > > > > > > > > > > > > > > > > for
> > > >> > > > > > > > > > > > > > > > > > > intercepting messages at
> different
> > > >> points
> > > >> > > on
> > > >> > > > > > > producer
> > > >> > > > > > > > > and
> > > >> > > > > > > > > > > > > > consumer.
> > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > > Comments and suggestions are
> > > welcome!
> > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > > > > > > > > Anna
> > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > ________________________________
> > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > > This email and any attachments may
> > > >> contain
> > > >> > > > > > > confidential
> > > >> > > > > > > > > and
> > > >> > > > > > > > > > > > > > > privileged
> > > >> > > > > > > > > > > > > > > > > > material for the sole use of the
> > > >> intended
> > > >> > > > > > recipient.
> > > >> > > > > > > > Any
> > > >> > > > > > > > > > > > review,
> > > >> > > > > > > > > > > > > > > > copying,
> > > >> > > > > > > > > > > > > > > > > > or distribution of this email (or
> > any
> > > >> > > > > attachments)
> > > >> > > > > > by
> > > >> > > > > > > > > > others
> > > >> > > > > > > > > > > is
> > > >> > > > > > > > > > > > > > > > > prohibited.
> > > >> > > > > > > > > > > > > > > > > > If you are not the intended
> > recipient,
> > > >> > please
> > > >> > > > > > contact
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > > sender
> > > >> > > > > > > > > > > > > > > > > > immediately and permanently delete
> > > this
> > > >> > email
> > > >> > > > and
> > > >> > > > > > any
> > > >> > > > > > > > > > > > > attachments.
> > > >> > > > > > > > > > > > > > No
> > > >> > > > > > > > > > > > > > > > > > employee or agent of TiVo Inc. is
> > > >> > authorized
> > > >> > > to
> > > >> > > > > > > > conclude
> > > >> > > > > > > > > > any
> > > >> > > > > > > > > > > > > > binding
> > > >> > > > > > > > > > > > > > > > > > agreement on behalf of TiVo Inc.
> by
> > > >> email.
> > > >> > > > > Binding
> > > >> > > > > > > > > > agreements
> > > >> > > > > > > > > > > > > with
> > > >> > > > > > > > > > > > > > > TiVo
> > > >> > > > > > > > > > > > > > > > > > Inc. may only be made by a signed
> > > >> written
> > > >> > > > > > agreement.
> > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > --
> > > >> > > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > > > > > > Neha
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > --
> > > >> > > > > > > > > > > > > > *—-*
> > > >> > > > > > > > > > > > > > *Todd Palino*
> > > >> > > > > > > > > > > > > > Staff Site Reliability Engineer
> > > >> > > > > > > > > > > > > > Data Infrastructure Streaming
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > linkedin.com/in/toddpalino
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > --
> > > >> > > > > > > > > > > > > -Regards,
> > > >> > > > > > > > > > > > > Mayuresh R. Gharat
> > > >> > > > > > > > > > > > > (862) 250-7125
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > --
> > > >> > > > > > > > > > Thanks,
> > > >> > > > > > > > > > Neha
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > --
> > > >> > > > > > > > *—-*
> > > >> > > > > > > > *Todd Palino*
> > > >> > > > > > > > Staff Site Reliability Engineer
> > > >> > > > > > > > Data Infrastructure Streaming
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > linkedin.com/in/toddpalino
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > --
> > > >> > > > > > -Regards,
> > > >> > > > > > Mayuresh R. Gharat
> > > >> > > > > > (862) 250-7125
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > > --
> > > >> > > > *—-*
> > > >> > > > *Todd Palino*
> > > >> > > > Staff Site Reliability Engineer
> > > >> > > > Data Infrastructure Streaming
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > > linkedin.com/in/toddpalino
> > > >> > > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > > -Regards,
> > > >> > > Mayuresh R. Gharat
> > > >> > > (862) 250-7125
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> -Regards,
> > > >> Mayuresh R. Gharat
> > > >> (862) 250-7125
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Neha
> >
>

Reply via email to