Anna,

Using CRC to do end2end auditing might be very costly because you will need
to collect all the CRC from both producer and consumer. And it is based on
the assumption that broker does not modify the record.
Can you shed some idea on how end to end auditing will be using the CRC
before we decide to expose such low level detail to the end user? It would
also be helpful if you can compare it with something like sequence number
based auditing.

About the record size, one thing worth notice is that the size of Record is
not the actual bytes sent over the wire if we use compression. So that does
not really tell user how much bandwidth they are using. Personally I think
two kinds of size may be useful.
1. The record size after serialization, i.e. application bytes. (The
uncompressed record size can be easily derived as well)
2. The actual bytes sent over the wire.
We can get (1) easily, but (2) is difficult to get at Record level when we
use compression.

Thanks,

Jiangjie (Becket) Qin

On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner <a...@confluent.io> wrote:

> Hi Becket,
>
> The use-case for CRC is end-to-end audit, rather than checking whether a
> single message is corrupt or not.
>
> Regarding record size, I was thinking to extract record size from Record.
> That will include header overhead as well. I think total record size will
> tell users how much bandwidth their messages take. Since header is
> relatively small and constant, users also will get an idea of their
> key/value sizes.
>
> Thanks,
> Anna
>
> On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin <becket....@gmail.com> wrote:
>
> > I am +1 on #1.2 and #3.
> >
> > #2: Regarding CRC, I am not sure if users care about CRC. is there any
> > specific use case? Currently we validate messages by calling
> ensureValid()
> > to verify the checksum and throw exception if it does not match.
> >
> > Message size would be useful. We can add that to ConsumerRecord. Can you
> > clarify the message size you are referring to? Does it include the
> message
> > header overhead or not? From user's point of view, they probably don't
> care
> > about header size.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede <n...@confluent.io>
> wrote:
> >
> > > Anna,
> > >
> > > Thanks for being diligent.
> > >
> > > +1 on #1.2 and sounds good on #3. I recommend adding checksum and size
> > > fields to RecordMetadata and ConsumerRecord instead of exposing
> metadata
> > > piecemeal in the interceptor APIs.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner <a...@confluent.io>
> wrote:
> > >
> > > > Hi All,
> > > >
> > > > The KIP wiki page is now up-to-date with the scope we have agreed on:
> > > > Producer and Consumer Interceptors with a minimal set of mutable API
> > that
> > > > are not dependent on producer and consumer internal implementation.
> > > >
> > > > I have few more API details that I would like to bring attention to
> > > or/and
> > > > discuss:
> > > >
> > > > 1. Handling exceptions
> > > >
> > > > Exceptions can provide an additional level of control. For example,
> we
> > > can
> > > > filter messages on consumer side or stop messages on producer if they
> > > don’t
> > > > have the right field.
> > > >
> > > > I see two options:
> > > > 1.1. For callbacks that can mutate records (onSend and onConsume),
> > > > propagate exceptions through the original calls (KafkaProducer.send()
> > and
> > > > KafkaConsumer.poll() respectively). For other callbacks, catch
> > exception,
> > > > log, and ignore.
> > > > 1.2. Catch exceptions from all the interceptor callbacks and ignore.
> > > >
> > > > The issue with 1.1. is that it effectively changes
> KafkaProducer.send()
> > > and
> > > > KafkaConsumer.poll() API, since now they may throw exceptions that
> are
> > > not
> > > > documented in KafkaProducer/Consumer API. Another option is to allow
> to
> > > > propagate some exceptions, and ignore others.
> > > >
> > > > I think our use-cases do not require propagating exceptions. So, I
> > > propose
> > > > option 1.2. Unless someone has suggestion/use-cases for propagating
> > > > exceptions. Please let me know.
> > > >
> > > > 2. Intercepting record CRC and record size
> > > >
> > > > Since we decided not to add any intermediate callbacks (such as
> > onEnqueue
> > > > or onReceive) to interceptors, I think it is still valuable to
> > intercept
> > > > record CRC and record size in bytes for monitoring and audit
> use-cases.
> > > >
> > > > I propose to add checksum and size fields to RecordMetadata and
> > > > ConsumerRecord. Another option would be to add them as parameters in
> > > > onAcknowledgement() and onConsume() callbacks.
> > > >
> > > > 3. Callbacks that allow to modify records look as follows:
> > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
> > > > ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
> > > >
> > > > This means that interceptors can potentially modify topic/partition
> in
> > > > ProducerRecord and topic/partition/offset in ConsumerRecord. I
> propose
> > > that
> > > > it is up to the interceptor implementation to ensure that
> > > topic/partition,
> > > > etc is correct. KafkaProducer.send() will use topic, partition, key,
> > and
> > > > value from ProducerRecord returned from the onSend(). Similarly,
> > > > ConsumerRecords returned from KafkaConsumer.poll() would be the ones
> > > > returned from the interceptor.
> > > >
> > > >
> > > >
> > > > Please let me know if you have any suggestions or objections to the
> > > above.
> > > >
> > > > Thanks,
> > > > Anna
> > > >
> > > >
> > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner <a...@confluent.io>
> > wrote:
> > > >
> > > > > Hi Mayuresh,
> > > > >
> > > > > I see why you would want to check for messages left in the
> > > > > RecordAccumulator. However, I don't think this will completely
> solve
> > > the
> > > > > problem. Messages could be in-flight somewhere else, like in the
> > > socket,
> > > > or
> > > > > there maybe in-flight messages on the consumer side of the
> > MirrorMaker.
> > > > So,
> > > > > if we go the route of checking whether there are any in-flight
> > messages
> > > > for
> > > > > topic deletion use-case, maybe it is better count them with
> onSend()
> > > and
> > > > > onAcknowledge() -- whether all messages sent were acknowledged. I
> > also
> > > > > think that it would be better to solve this without interceptors,
> > such
> > > as
> > > > > fix error handling in this scenario. However, I do not have any
> good
> > > > > proposal right now, so these are just general thoughts.
> > > > >
> > > > > Thanks,
> > > > > Anna
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat <
> > > > > gharatmayures...@gmail.com> wrote:
> > > > >
> > > > >> Calling producer.flush(), flushes all the data. So this is OK. But
> > > when
> > > > >> you
> > > > >> are running Mirror maker, I am not sure there is a way to flush()
> > from
> > > > >> outside.
> > > > >>
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Mayuresh
> > > > >>
> > > > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin <
> becket....@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Mayuresh,
> > > > >> >
> > > > >> > Regarding your use case about mirror maker. Is it good enough as
> > > long
> > > > >> as we
> > > > >> > know there is no message for the topic in the producer anymore?
> If
> > > > that
> > > > >> is
> > > > >> > the case, call producer.flush() is sufficient.
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Jiangjie (Becket) Qin
> > > > >> >
> > > > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat <
> > > > >> > gharatmayures...@gmail.com
> > > > >> > > wrote:
> > > > >> >
> > > > >> > > Hi Anna,
> > > > >> > >
> > > > >> > > Thanks a lot for summarizing the discussion on this kip.
> > > > >> > >
> > > > >> > > It LGTM.
> > > > >> > > This is really nice :
> > > > >> > > We decided not to add any callbacks to producer and consumer
> > > > >> > > interceptors that will depend on internal implementation as
> part
> > > of
> > > > >> this
> > > > >> > > KIP.
> > > > >> > > *However, it is possible to add them later as part of another
> > KIP
> > > if
> > > > >> > there
> > > > >> > > are good use-cases.*
> > > > >> > >
> > > > >> > > Do you agree with the use case I explained earlier for knowing
> > the
> > > > >> number
> > > > >> > > of records left in the RecordAccumulator for a particular
> topic.
> > > It
> > > > >> might
> > > > >> > > be orthogonal to this KIP, but will be helpful. What do you
> > think?
> > > > >> > >
> > > > >> > > Thanks,
> > > > >> > >
> > > > >> > > Mayuresh
> > > > >> > >
> > > > >> > >
> > > > >> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino <
> tpal...@gmail.com
> > >
> > > > >> wrote:
> > > > >> > >
> > > > >> > > > This looks good. As noted, having one mutable interceptor on
> > > each
> > > > >> side
> > > > >> > > > allows for the use cases we can envision right now, and I
> > think
> > > > >> that’s
> > > > >> > > > going to provide a great deal of opportunity for
> implementing
> > > > things
> > > > >> > like
> > > > >> > > > audit, especially within a multi-tenant environment. Looking
> > > > >> forward to
> > > > >> > > > getting this available in the clients.
> > > > >> > > >
> > > > >> > > > Thanks!
> > > > >> > > >
> > > > >> > > > -Todd
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner <
> > > a...@confluent.io>
> > > > >> > wrote:
> > > > >> > > >
> > > > >> > > > > Hi All,
> > > > >> > > > >
> > > > >> > > > > Here is meeting notes from today’s KIP meeting:
> > > > >> > > > >
> > > > >> > > > > 1. We agreed to keep the scope of this KIP to be producer
> > and
> > > > >> > consumer
> > > > >> > > > > interceptors only. Broker-side interceptor will be added
> > later
> > > > as
> > > > >> a
> > > > >> > > > > separate KIP. The reasons were already mentioned in this
> > > thread,
> > > > >> but
> > > > >> > > the
> > > > >> > > > > summary is:
> > > > >> > > > >  * Broker interceptor is riskier and requires careful
> > > > >> consideration
> > > > >> > > about
> > > > >> > > > > overheads, whether to intercept leaders vs.
> > leaders/replicas,
> > > > >> what to
> > > > >> > > do
> > > > >> > > > on
> > > > >> > > > > leader failover and so on.
> > > > >> > > > >  * Broker interceptors increase monitoring resolution, but
> > not
> > > > >> > > including
> > > > >> > > > it
> > > > >> > > > > in this KIP does not reduce usefulness of producer and
> > > consumer
> > > > >> > > > > interceptors that enable end-to-end monitoring
> > > > >> > > > >
> > > > >> > > > > 2. We agreed to scope ProducerInterceptor and
> > > > ConsumerInterceptor
> > > > >> > > > callbacks
> > > > >> > > > > to minimal set of mutable API that are not dependent on
> > > producer
> > > > >> and
> > > > >> > > > > consumer internal implementation.
> > > > >> > > > >
> > > > >> > > > > ProducerInterceptor:
> > > > >> > > > > *ProducerRecord<K, V> onSend(ProducerRecord<K, V>
> record);*
> > > > >> > > > > *void onAcknowledgement(RecordMetadata metadata, Exception
> > > > >> > exception);*
> > > > >> > > > >
> > > > >> > > > > ConsumerInterceptor:
> > > > >> > > > > *ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V>
> > > > records);*
> > > > >> > > > > *void onCommit(Map<TopicPartition, OffsetAndMetadata>
> > > offsets);*
> > > > >> > > > >
> > > > >> > > > > We will allow interceptors to modify ProducerRecord on
> > > producer
> > > > >> side,
> > > > >> > > and
> > > > >> > > > > modify ConsumerRecords on consumer side. This will support
> > > > >> end-to-end
> > > > >> > > > > monitoring and auditing and support the ability to add
> > > metadata
> > > > >> for a
> > > > >> > > > > message. This will support Todd’s Auditing and Routing
> > > > use-cases.
> > > > >> > > > >
> > > > >> > > > > We did not find any use-case for modifying records in
> > > > onConsume()
> > > > >> > > > callback,
> > > > >> > > > > but decided to enable modification of consumer records for
> > > > >> symmetry
> > > > >> > > with
> > > > >> > > > > onSend().
> > > > >> > > > >
> > > > >> > > > > 3. We agreed to ensure compatibility when/if we add new
> > > methods
> > > > to
> > > > >> > > > > ProducerInterceptor and ConsumerInterceptor by using
> default
> > > > >> methods
> > > > >> > > with
> > > > >> > > > > an empty implementation. Ok to assume Java 8. (This is
> > > Ismael’s
> > > > >> > method
> > > > >> > > > #2).
> > > > >> > > > >
> > > > >> > > > > 4. We decided not to add any callbacks to producer and
> > > consumer
> > > > >> > > > > interceptors that will depend on internal implementation
> as
> > > part
> > > > >> of
> > > > >> > > this
> > > > >> > > > > KIP. However, it is possible to add them later as part of
> > > > another
> > > > >> KIP
> > > > >> > > if
> > > > >> > > > > there are good use-cases.
> > > > >> > > > >
> > > > >> > > > > *Reasoning.* We did not have concrete use-cases that
> > justified
> > > > >> more
> > > > >> > > > methods
> > > > >> > > > > at this point. Some of the use-cases were for more
> > fine-grain
> > > > >> latency
> > > > >> > > > > collection, which could be done with Kafka Metrics.
> Another
> > > > >> use-case
> > > > >> > > was
> > > > >> > > > > encryption. However, there are several design options for
> > > > >> encryption.
> > > > >> > > One
> > > > >> > > > > is to do per-record encryption which would require adding
> > > > >> > > > > ProducerInterceptor.onEnqueued() and
> > > > >> ConsumerInterceptor.onReceive().
> > > > >> > > One
> > > > >> > > > > could argue that in that case encryption could be done by
> > > > adding a
> > > > >> > > custom
> > > > >> > > > > serializer/deserializer. Another option is to do
> encryption
> > > > after
> > > > >> > > message
> > > > >> > > > > gets compressed, but there are issues that arise regarding
> > > > broker
> > > > >> > doing
> > > > >> > > > > re-compression. We decided that it is better to have that
> > > > >> discussion
> > > > >> > > in a
> > > > >> > > > > separate KIP and decide that this is something we want to
> do
> > > > with
> > > > >> > > > > interceptors or by other means.
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > Todd, Mayuresh and others who missed the KIP meeting,
> please
> > > let
> > > > >> me
> > > > >> > > know
> > > > >> > > > > your thoughts on the scope we agreed on during the
> meeting.
> > > > >> > > > >
> > > > >> > > > > I will update the KIP proposal with the current decision
> by
> > > end
> > > > of
> > > > >> > > today.
> > > > >> > > > >
> > > > >> > > > > Thanks,
> > > > >> > > > > Anna
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat <
> > > > >> > > > > gharatmayures...@gmail.com> wrote:
> > > > >> > > > >
> > > > >> > > > > > Hi,
> > > > >> > > > > >
> > > > >> > > > > > I won't be able to make it to KIP hangout due to
> conflict.
> > > > >> > > > > >
> > > > >> > > > > > Anna, here is the use case where knowing if there are
> > > messages
> > > > >> in
> > > > >> > the
> > > > >> > > > > > RecordAccumulator left to be sent to the kafka cluster
> > for a
> > > > >> topic
> > > > >> > is
> > > > >> > > > > > useful.
> > > > >> > > > > >
> > > > >> > > > > > 1) Consider a pipeline :
> > > > >> > > > > > A ---> Mirror-maker -----> B
> > > > >> > > > > >
> > > > >> > > > > > 2) We have a topic T in cluster A mirrored to cluster B.
> > > > >> > > > > >
> > > > >> > > > > > 3) Now if we delete topic T in A and immediately proceed
> > to
> > > > >> delete
> > > > >> > > the
> > > > >> > > > > > topic in cluster B, some of the the Mirror-maker
> machines
> > > die
> > > > >> > because
> > > > >> > > > > > atleast one of the batches in RecordAccumulator for
> topic
> > T
> > > > >> fail to
> > > > >> > > be
> > > > >> > > > > > produced to cluster B. We have seen this happening in
> our
> > > > >> clusters.
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > > If we know that there are no more messages left in the
> > > > >> > > > RecordAccumulator
> > > > >> > > > > to
> > > > >> > > > > > be produced to cluster B, we can safely delete the topic
> > in
> > > > >> > cluster B
> > > > >> > > > > > without disturbing the pipeline.
> > > > >> > > > > >
> > > > >> > > > > > Thanks,
> > > > >> > > > > >
> > > > >> > > > > > Mayuresh
> > > > >> > > > > >
> > > > >> > > > > > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner <
> > > > >> a...@confluent.io>
> > > > >> > > > > wrote:
> > > > >> > > > > >
> > > > >> > > > > > > Thanks Ismael and Todd for your feedback!
> > > > >> > > > > > >
> > > > >> > > > > > > I agree about coming up with lean, but useful
> interfaces
> > > > that
> > > > >> > will
> > > > >> > > be
> > > > >> > > > > > easy
> > > > >> > > > > > > to extend later.
> > > > >> > > > > > >
> > > > >> > > > > > > When we discuss the minimal set of producer and
> consumer
> > > > >> > > interceptor
> > > > >> > > > > API
> > > > >> > > > > > in
> > > > >> > > > > > > today’s KIP meeting (discussion item #2 in my previous
> > > > email),
> > > > >> > lets
> > > > >> > > > > > compare
> > > > >> > > > > > > two options:
> > > > >> > > > > > >
> > > > >> > > > > > > *1. Minimal set of immutable API for producer and
> > consumer
> > > > >> > > > > interceptors*
> > > > >> > > > > > >
> > > > >> > > > > > > ProducerInterceptor:
> > > > >> > > > > > > public void onSend(ProducerRecord<K, V> record);
> > > > >> > > > > > > public void onAcknowledgement(RecordMetadata metadata,
> > > > >> Exception
> > > > >> > > > > > > exception);
> > > > >> > > > > > >
> > > > >> > > > > > > ConsumerInterceptor:
> > > > >> > > > > > > public void onConsume(ConsumerRecords<K, V> records);
> > > > >> > > > > > > public void onCommit(Map<TopicPartition,
> > > OffsetAndMetadata>
> > > > >> > > offsets);
> > > > >> > > > > > >
> > > > >> > > > > > > Use-cases:
> > > > >> > > > > > > — end-to-end monitoring; custom tracing and logging
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > *2. Minimal set of mutable API for producer and
> consumer
> > > > >> > > > interceptors*
> > > > >> > > > > > >
> > > > >> > > > > > > ProducerInterceptor:
> > > > >> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V>
> > record);
> > > > >> > > > > > > void onAcknowledgement(RecordMetadata metadata,
> > Exception
> > > > >> > > exception);
> > > > >> > > > > > >
> > > > >> > > > > > > ConsumerInterceptor:
> > > > >> > > > > > > void onConsume(ConsumerRecords<K, V> records);
> > > > >> > > > > > > void onCommit(Map<TopicPartition, OffsetAndMetadata>
> > > > offsets);
> > > > >> > > > > > >
> > > > >> > > > > > > Additional use-cases to #1:
> > > > >> > > > > > > — Ability to add metadata to a message or fill in
> > standard
> > > > >> fields
> > > > >> > > for
> > > > >> > > > > > audit
> > > > >> > > > > > > and routing.
> > > > >> > > > > > >
> > > > >> > > > > > > Implications
> > > > >> > > > > > > — Partition assignment will be done based on modified
> > > > >> key/value
> > > > >> > > > instead
> > > > >> > > > > > of
> > > > >> > > > > > > original key/value. If key/value transformation is not
> > > > >> consistent
> > > > >> > > > (same
> > > > >> > > > > > key
> > > > >> > > > > > > and value does not mutate to the same, but modified,
> > > > >> key/value),
> > > > >> > > then
> > > > >> > > > > log
> > > > >> > > > > > > compaction would not work. However, audit and routing
> > > > >> use-cases
> > > > >> > > from
> > > > >> > > > > Todd
> > > > >> > > > > > > will likely do consistent transformation.
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > *Additional callbacks (discussion item #3 in my
> previous
> > > > >> email):*
> > > > >> > > > > > >
> > > > >> > > > > > > If we want to support encryption, we would want to be
> > able
> > > > to
> > > > >> > > modify
> > > > >> > > > > > > serialized key/value, rather than key and value
> objects.
> > > > This
> > > > >> > will
> > > > >> > > > add
> > > > >> > > > > > the
> > > > >> > > > > > > following API to producer and consumer interceptors:
> > > > >> > > > > > >
> > > > >> > > > > > > ProducerInterceptor:
> > > > >> > > > > > > SerializedKeyValue onEnqueued(TopicPartition tp,
> > > > >> > ProducerRecord<K,
> > > > >> > > V>
> > > > >> > > > > > > record, SerializedKeyValue serializedKeyValue);
> > > > >> > > > > > >
> > > > >> > > > > > > ConsumerInterceptor:
> > > > >> > > > > > > SerializedKeyValue onReceive(TopicPartition tp,
> > > > >> > SerializedKeyValue
> > > > >> > > > > > > serializedKeyValue);
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > I am leaning towards implementing the minimal set of
> > > > >> immutable or
> > > > >> > > > > mutable
> > > > >> > > > > > > interfaces, making sure that we have a compatibility
> > plan
> > > > that
> > > > >> > > allows
> > > > >> > > > > us
> > > > >> > > > > > to
> > > > >> > > > > > > add more callbacks in the future (per Ismael comment),
> > and
> > > > add
> > > > >> > more
> > > > >> > > > > APIs
> > > > >> > > > > > > later. E.g., for encryption use-case, there could be
> an
> > > > >> argument
> > > > >> > in
> > > > >> > > > > doing
> > > > >> > > > > > > encryption after message compression vs. per-record
> > > > encryption
> > > > >> > that
> > > > >> > > > > could
> > > > >> > > > > > > be done using the above additional API. There is also
> > more
> > > > >> > > > implications
> > > > >> > > > > > for
> > > > >> > > > > > > every API that modifies records: modifying serialized
> > > > >> key/value
> > > > >> > > will
> > > > >> > > > > > again
> > > > >> > > > > > > impact partition assignment (we will likely do that
> > after
> > > > >> > partition
> > > > >> > > > > > > assignment), which may impact log compaction and
> mirror
> > > > maker
> > > > >> > > > > > partitioning.
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks,
> > > > >> > > > > > > Anna
> > > > >> > > > > > >
> > > > >> > > > > > > On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino <
> > > > >> tpal...@gmail.com>
> > > > >> > > > > wrote:
> > > > >> > > > > > >
> > > > >> > > > > > > > Finally got a chance to take a look at this. I won’t
> > be
> > > > >> able to
> > > > >> > > > make
> > > > >> > > > > > the
> > > > >> > > > > > > > KIP meeting due to a conflict.
> > > > >> > > > > > > >
> > > > >> > > > > > > > I’m somewhat disappointed in this proposal. I think
> > that
> > > > the
> > > > >> > > > explicit
> > > > >> > > > > > > > exclusion of modification of the messages is
> > > > short-sighted,
> > > > >> and
> > > > >> > > not
> > > > >> > > > > > > > accounting for it now is going to bite us later.
> Jay,
> > > > aren’t
> > > > >> > you
> > > > >> > > > the
> > > > >> > > > > > one
> > > > >> > > > > > > > railing against public interfaces and how difficult
> > they
> > > > >> are to
> > > > >> > > > work
> > > > >> > > > > > with
> > > > >> > > > > > > > when you don’t get them right? The “simple” change
> to
> > > one
> > > > of
> > > > >> > > these
> > > > >> > > > > > > > interfaces to make it able to return a record is
> going
> > > to
> > > > >> be a
> > > > >> > > > > > > significant
> > > > >> > > > > > > > change and is going to require all clients to
> rewrite
> > > > their
> > > > >> > > > > > interceptors.
> > > > >> > > > > > > > If we’re not willing to put the time to think
> through
> > > > >> > > manipulation
> > > > >> > > > > now,
> > > > >> > > > > > > > then this KIP should be shelved until we are.
> > > Implementing
> > > > >> > > > something
> > > > >> > > > > > > > halfway is going to be worse than taking a little
> > > longer.
> > > > In
> > > > >> > > > > addition,
> > > > >> > > > > > I
> > > > >> > > > > > > > don’t believe that manipulation requires anything
> more
> > > > than
> > > > >> > > > > > interceptors
> > > > >> > > > > > > to
> > > > >> > > > > > > > receive the full record, and then to return it.
> > > > >> > > > > > > >
> > > > >> > > > > > > > There are 3 use case I can think of right now
> without
> > > any
> > > > >> deep
> > > > >> > > > > > discussion
> > > > >> > > > > > > > that can make use of interceptors with modification:
> > > > >> > > > > > > >
> > > > >> > > > > > > > 1. Auditing. The ability to add metadata to a
> message
> > > for
> > > > >> > > auditing
> > > > >> > > > is
> > > > >> > > > > > > > critical. Hostname, service name, timestamps, etc.
> are
> > > all
> > > > >> > pieces
> > > > >> > > > of
> > > > >> > > > > > data
> > > > >> > > > > > > > that can be used on the other side of the pipeline
> to
> > > > >> > categorize
> > > > >> > > > > > > messages,
> > > > >> > > > > > > > determine loss and transport time, and pin down
> > issues.
> > > > You
> > > > >> may
> > > > >> > > say
> > > > >> > > > > > that
> > > > >> > > > > > > > these things can just be part of the message schema,
> > but
> > > > >> anyone
> > > > >> > > who
> > > > >> > > > > has
> > > > >> > > > > > > > worked with a multi-user data system (especially
> those
> > > who
> > > > >> have
> > > > >> > > > been
> > > > >> > > > > > > > involved with LinkedIn) know how difficult it is to
> > > > maintain
> > > > >> > > > > consistent
> > > > >> > > > > > > > message schemas and to get other people to put in
> > fields
> > > > for
> > > > >> > your
> > > > >> > > > > use.
> > > > >> > > > > > > >
> > > > >> > > > > > > > 2. Encryption. This is probably the most obvious
> case
> > > for
> > > > >> > record
> > > > >> > > > > > > > manipulation on both sides. The ability to tie in
> end
> > to
> > > > end
> > > > >> > > > > encryption
> > > > >> > > > > > > is
> > > > >> > > > > > > > important for data that requires external compliance
> > > (PCI,
> > > > >> > HIPAA,
> > > > >> > > > > > etc.).
> > > > >> > > > > > > >
> > > > >> > > > > > > > 3. Routing. By being able to add a bit of
> information
> > > > about
> > > > >> the
> > > > >> > > > > source
> > > > >> > > > > > or
> > > > >> > > > > > > > destination of a message to the metadata, you can
> > easily
> > > > >> > > construct
> > > > >> > > > an
> > > > >> > > > > > > > intelligent mirror maker that can prevent loops.
> This
> > > has
> > > > >> the
> > > > >> > > > > > opportunity
> > > > >> > > > > > > > to result in significant operational savings, as you
> > can
> > > > get
> > > > >> > rid
> > > > >> > > of
> > > > >> > > > > the
> > > > >> > > > > > > > need for tiered clusters in order to prevent loops
> in
> > > > >> mirroring
> > > > >> > > > > > messages.
> > > > >> > > > > > > >
> > > > >> > > > > > > > All three of these share the feature that they add
> > > > metadata
> > > > >> to
> > > > >> > > > > > messages.
> > > > >> > > > > > > > With the pushback on having arbitrary metadata as an
> > > > >> “envelope”
> > > > >> > > to
> > > > >> > > > > the
> > > > >> > > > > > > > message, this is a way to provide it and make it the
> > > > >> > > responsibility
> > > > >> > > > > of
> > > > >> > > > > > > the
> > > > >> > > > > > > > client, and not the Kafka broker and system itself.
> > > > >> > > > > > > >
> > > > >> > > > > > > > -Todd
> > > > >> > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > > > On Tue, Jan 26, 2016 at 2:30 AM, Ismael Juma <
> > > > >> > ism...@juma.me.uk>
> > > > >> > > > > > wrote:
> > > > >> > > > > > > >
> > > > >> > > > > > > > > Hi Anna and Neha,
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > I think it makes a lot of sense to try and keep
> the
> > > > >> interface
> > > > >> > > > lean
> > > > >> > > > > > and
> > > > >> > > > > > > to
> > > > >> > > > > > > > > add more methods later when/if there is a need.
> What
> > > is
> > > > >> the
> > > > >> > > > current
> > > > >> > > > > > > > > thinking with regards to compatibility when/if we
> > add
> > > > new
> > > > >> > > > methods?
> > > > >> > > > > A
> > > > >> > > > > > > few
> > > > >> > > > > > > > > options come to mind:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > 1. Change the interface to an abstract class with
> > > empty
> > > > >> > > > > > implementations
> > > > >> > > > > > > > for
> > > > >> > > > > > > > > all the methods. This means that the path to
> adding
> > > new
> > > > >> > methods
> > > > >> > > > is
> > > > >> > > > > > > clear.
> > > > >> > > > > > > > > 2. Hope we have moved to Java 8 by the time we
> need
> > to
> > > > add
> > > > >> > new
> > > > >> > > > > > methods
> > > > >> > > > > > > > and
> > > > >> > > > > > > > > use default methods with an empty implementation
> for
> > > any
> > > > >> new
> > > > >> > > > method
> > > > >> > > > > > > (and
> > > > >> > > > > > > > > potentially make existing methods default methods
> > too
> > > at
> > > > >> that
> > > > >> > > > point
> > > > >> > > > > > for
> > > > >> > > > > > > > > consistency)
> > > > >> > > > > > > > > 3. Introduce a new interface that inherits from
> the
> > > > >> existing
> > > > >> > > > > > > Interceptor
> > > > >> > > > > > > > > interface when we need to add new methods.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Option 1 is the easiest and it also means that
> > > > interceptor
> > > > >> > > users
> > > > >> > > > > only
> > > > >> > > > > > > > need
> > > > >> > > > > > > > > to override the methods that they are interested
> > (more
> > > > >> useful
> > > > >> > > if
> > > > >> > > > > the
> > > > >> > > > > > > > number
> > > > >> > > > > > > > > of methods grows). The downside is that
> interceptor
> > > > >> > > > implementations
> > > > >> > > > > > > > cannot
> > > > >> > > > > > > > > inherit from another class (a straightforward
> > > workaround
> > > > >> is
> > > > >> > to
> > > > >> > > > make
> > > > >> > > > > > the
> > > > >> > > > > > > > > interceptor a forwarder that calls another class).
> > > Also,
> > > > >> our
> > > > >> > > > > existing
> > > > >> > > > > > > > > callbacks are interfaces, so seems a bit
> > inconsistent.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Option 2 may be the most appealing one as both
> users
> > > and
> > > > >> > > > ourselves
> > > > >> > > > > > > retain
> > > > >> > > > > > > > > flexibility. The main downside is that it relies
> on
> > us
> > > > >> moving
> > > > >> > > to
> > > > >> > > > > Java
> > > > >> > > > > > > 8,
> > > > >> > > > > > > > > which may be more than a year away potentially (if
> > we
> > > > >> support
> > > > >> > > the
> > > > >> > > > > > last
> > > > >> > > > > > > 2
> > > > >> > > > > > > > > Java releases).
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Thoughts?
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Ismael
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > On Tue, Jan 26, 2016 at 4:59 AM, Neha Narkhede <
> > > > >> > > > n...@confluent.io>
> > > > >> > > > > > > > wrote:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > > Anna,
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > I'm also in favor of including just the APIs for
> > > which
> > > > >> we
> > > > >> > > have
> > > > >> > > > a
> > > > >> > > > > > > clear
> > > > >> > > > > > > > > use
> > > > >> > > > > > > > > > case. If more use cases for finer monitoring
> show
> > up
> > > > in
> > > > >> the
> > > > >> > > > > future,
> > > > >> > > > > > > we
> > > > >> > > > > > > > > can
> > > > >> > > > > > > > > > always update the interface. Would you please
> > > > highlight
> > > > >> in
> > > > >> > > the
> > > > >> > > > > KIP
> > > > >> > > > > > > the
> > > > >> > > > > > > > > APIs
> > > > >> > > > > > > > > > that you think we have an immediate use for?
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Joel,
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Broker-side monitoring makes a lot of sense in
> the
> > > > long
> > > > >> > term
> > > > >> > > > > > though I
> > > > >> > > > > > > > > don't
> > > > >> > > > > > > > > > think it is a requirement for end-to-end
> > monitoring.
> > > > >> With
> > > > >> > the
> > > > >> > > > > > > producer
> > > > >> > > > > > > > > and
> > > > >> > > > > > > > > > consumer interceptors, you have the ability to
> get
> > > > full
> > > > >> > > > > > > > > > publish-to-subscribe end-to-end monitoring. The
> > > broker
> > > > >> > > > > interceptor
> > > > >> > > > > > > > > > certainly improves the resolution of monitoring
> > but
> > > it
> > > > >> is
> > > > >> > > also
> > > > >> > > > a
> > > > >> > > > > > > > riskier
> > > > >> > > > > > > > > > change. I prefer an incremental approach over a
> > > > big-bang
> > > > >> > and
> > > > >> > > > > > > recommend
> > > > >> > > > > > > > > > taking baby-steps. Let's first make sure the
> > > > >> > > producer/consumer
> > > > >> > > > > > > > > interceptors
> > > > >> > > > > > > > > > are successful. And then come back and add the
> > > broker
> > > > >> > > > interceptor
> > > > >> > > > > > > > > > carefully.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Having said that, it would be great to
> understand
> > > your
> > > > >> > > proposal
> > > > >> > > > > for
> > > > >> > > > > > > the
> > > > >> > > > > > > > > > broker interceptor independently. We can either
> > add
> > > an
> > > > >> > > > > interceptor
> > > > >> > > > > > > > > > on-append or on-commit. If people want to use
> this
> > > for
> > > > >> > > > > monitoring,
> > > > >> > > > > > > then
> > > > >> > > > > > > > > > possibly on-commit might be more useful?
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > Neha
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > On Mon, Jan 25, 2016 at 6:47 PM, Jay Kreps <
> > > > >> > j...@confluent.io
> > > > >> > > >
> > > > >> > > > > > wrote:
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > Hey Joel,
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > What is the interface you are thinking of?
> > > Something
> > > > >> like
> > > > >> > > > this:
> > > > >> > > > > > > > > > >     onAppend(String topic, int partition,
> > Records
> > > > >> > records,
> > > > >> > > > long
> > > > >> > > > > > > time)
> > > > >> > > > > > > > > > > ?
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > One challenge right now is that we are still
> > using
> > > > the
> > > > >> > old
> > > > >> > > > > > > > > > > Message/MessageSet classes on the broker which
> > I'm
> > > > not
> > > > >> > sure
> > > > >> > > > if
> > > > >> > > > > > we'd
> > > > >> > > > > > > > > want
> > > > >> > > > > > > > > > to
> > > > >> > > > > > > > > > > support over the long haul but it might be
> okay
> > > just
> > > > >> to
> > > > >> > > > create
> > > > >> > > > > > the
> > > > >> > > > > > > > > > records
> > > > >> > > > > > > > > > > instance for this interface.
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > -Jay
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy <
> > > > >> > > > > > jjkosh...@gmail.com>
> > > > >> > > > > > > > > > wrote:
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > > I'm definitely in favor of having such hooks
> > in
> > > > the
> > > > >> > > > > > > produce/consume
> > > > >> > > > > > > > > > > > life-cycle. Not sure if people remember this
> > but
> > > > in
> > > > >> > Kafka
> > > > >> > > > 0.7
> > > > >> > > > > > > this
> > > > >> > > > > > > > > was
> > > > >> > > > > > > > > > > > pretty much how it was:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > > > >> > > > > > > > > > > > i.e., we had something similar to the
> > > interceptor
> > > > >> > > proposal
> > > > >> > > > > for
> > > > >> > > > > > > > > various
> > > > >> > > > > > > > > > > > stages of the producer request. The producer
> > > > >> provided
> > > > >> > > > > > call-backs
> > > > >> > > > > > > > for
> > > > >> > > > > > > > > > > > beforeEnqueue, afterEnqueue, afterDequeuing,
> > > > >> > > beforeSending,
> > > > >> > > > > > etc.
> > > > >> > > > > > > So
> > > > >> > > > > > > > > at
> > > > >> > > > > > > > > > > > LinkedIn we in fact did auditing within
> these
> > > > >> > call-backs
> > > > >> > > > (and
> > > > >> > > > > > not
> > > > >> > > > > > > > > > > > explicitly in the wrapper). Over time and
> with
> > > 0.8
> > > > >> we
> > > > >> > > moved
> > > > >> > > > > > that
> > > > >> > > > > > > > out
> > > > >> > > > > > > > > to
> > > > >> > > > > > > > > > > the
> > > > >> > > > > > > > > > > > wrapper libraries.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > On a side-note while audit and other
> > monitoring
> > > > can
> > > > >> be
> > > > >> > > done
> > > > >> > > > > > > > > internally
> > > > >> > > > > > > > > > > in a
> > > > >> > > > > > > > > > > > convenient way I think it should be
> clarified
> > > that
> > > > >> > > having a
> > > > >> > > > > > > wrapper
> > > > >> > > > > > > > > is
> > > > >> > > > > > > > > > in
> > > > >> > > > > > > > > > > > general not a bad idea and I would even
> > consider
> > > > it
> > > > >> to
> > > > >> > > be a
> > > > >> > > > > > > > > > > best-practice.
> > > > >> > > > > > > > > > > > Even with 0.7 we still had a wrapper library
> > and
> > > > >> that
> > > > >> > API
> > > > >> > > > has
> > > > >> > > > > > > > largely
> > > > >> > > > > > > > > > > > stayed the same and has helped protect
> against
> > > > >> > (sometimes
> > > > >> > > > > > > backwards
> > > > >> > > > > > > > > > > > incompatible) changes in open source.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > While we are on this topic I have one
> comment
> > > and
> > > > >> Anna,
> > > > >> > > you
> > > > >> > > > > may
> > > > >> > > > > > > > have
> > > > >> > > > > > > > > > > > already considered this but I don't see
> > mention
> > > of
> > > > >> it
> > > > >> > in
> > > > >> > > > the
> > > > >> > > > > > KIP:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Add a custom message interceptor/validator
> on
> > > the
> > > > >> > broker
> > > > >> > > on
> > > > >> > > > > > > message
> > > > >> > > > > > > > > > > > arrival.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > We decompress and do basic validation of
> > > messages
> > > > on
> > > > >> > > > > arrival. I
> > > > >> > > > > > > > think
> > > > >> > > > > > > > > > > there
> > > > >> > > > > > > > > > > > is value in supporting custom validation and
> > > > expand
> > > > >> it
> > > > >> > to
> > > > >> > > > > > support
> > > > >> > > > > > > > > > custom
> > > > >> > > > > > > > > > > > on-arrival processing. Here is a specific
> > > > use-case I
> > > > >> > have
> > > > >> > > > in
> > > > >> > > > > > > mind.
> > > > >> > > > > > > > > The
> > > > >> > > > > > > > > > > blog
> > > > >> > > > > > > > > > > > that James referenced describes our auditing
> > > > >> > > > infrastructure.
> > > > >> > > > > In
> > > > >> > > > > > > > order
> > > > >> > > > > > > > > > to
> > > > >> > > > > > > > > > > > audit the Kafka cluster itself we need to
> run
> > a
> > > > >> > "console
> > > > >> > > > > > auditor"
> > > > >> > > > > > > > > > service
> > > > >> > > > > > > > > > > > that consumes everything and spits out audit
> > > > events
> > > > >> > back
> > > > >> > > to
> > > > >> > > > > the
> > > > >> > > > > > > > > > cluster.
> > > > >> > > > > > > > > > > I
> > > > >> > > > > > > > > > > > would prefer not having to run this service
> > > > because:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >    - Well, it is one more service that we
> have
> > > to
> > > > >> run
> > > > >> > and
> > > > >> > > > > > monitor
> > > > >> > > > > > > > > > > >    - Consuming everything takes up bandwidth
> > > which
> > > > >> can
> > > > >> > be
> > > > >> > > > > > avoided
> > > > >> > > > > > > > > > > >    - The console auditor consumer itself can
> > lag
> > > > and
> > > > >> > > cause
> > > > >> > > > > > > > temporary
> > > > >> > > > > > > > > > > audit
> > > > >> > > > > > > > > > > >    discrepancies
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > One way we can mitigate this is by having
> > > > >> mirror-makers
> > > > >> > > in
> > > > >> > > > > > > between
> > > > >> > > > > > > > > > > clusters
> > > > >> > > > > > > > > > > > emit audit events. The problem is that the
> > very
> > > > last
> > > > >> > > > cluster
> > > > >> > > > > in
> > > > >> > > > > > > the
> > > > >> > > > > > > > > > > > pipeline will not have any audit which is
> why
> > we
> > > > >> need
> > > > >> > to
> > > > >> > > > have
> > > > >> > > > > > > > > something
> > > > >> > > > > > > > > > > to
> > > > >> > > > > > > > > > > > audit the cluster.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > If we had a custom message validator then
> the
> > > > audit
> > > > >> can
> > > > >> > > be
> > > > >> > > > > done
> > > > >> > > > > > > > > > > on-arrival
> > > > >> > > > > > > > > > > > and we won't need a console auditor.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > One potential issue in this approach and any
> > > > >> elaborate
> > > > >> > > > > > on-arrival
> > > > >> > > > > > > > > > > > processing for that matter is that you may
> > need
> > > to
> > > > >> > > > > deserialize
> > > > >> > > > > > > the
> > > > >> > > > > > > > > > > message
> > > > >> > > > > > > > > > > > as well which can drive up produce request
> > > > handling
> > > > >> > > times.
> > > > >> > > > > > > However
> > > > >> > > > > > > > > I'm
> > > > >> > > > > > > > > > > not
> > > > >> > > > > > > > > > > > terribly concerned about that especially if
> > the
> > > > >> audit
> > > > >> > > > header
> > > > >> > > > > > can
> > > > >> > > > > > > be
> > > > >> > > > > > > > > > > > separated out easily or even deserialized
> > > > partially
> > > > >> as
> > > > >> > > this
> > > > >> > > > > > Avro
> > > > >> > > > > > > > > thread
> > > > >> > > > > > > > > > > > touches on
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Joel
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh
> > > Gharat
> > > > <
> > > > >> > > > > > > > > > > > gharatmayures...@gmail.com> wrote:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Nice KIP. Excellent idea.
> > > > >> > > > > > > > > > > > > Was just thinking if we can add onDequed()
> > to
> > > > the
> > > > >> > > > > > > > > ProducerIterceptor
> > > > >> > > > > > > > > > > > > interface. Since we have the onEnqueued(),
> > it
> > > > will
> > > > >> > help
> > > > >> > > > the
> > > > >> > > > > > > > client
> > > > >> > > > > > > > > or
> > > > >> > > > > > > > > > > the
> > > > >> > > > > > > > > > > > > tools to know how much time the message
> > spent
> > > in
> > > > >> the
> > > > >> > > > > > > > > > RecordAccumulator.
> > > > >> > > > > > > > > > > > > Also an API to check if there are any
> > messages
> > > > >> left
> > > > >> > > for a
> > > > >> > > > > > > > > particular
> > > > >> > > > > > > > > > > > topic
> > > > >> > > > > > > > > > > > > in the RecordAccumulator would help.
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Mayuresh
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:29 AM, Todd
> > Palino
> > > <
> > > > >> > > > > > > tpal...@gmail.com
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > Great idea. I’ve been talking about this
> > > for 2
> > > > >> > years,
> > > > >> > > > and
> > > > >> > > > > > I’m
> > > > >> > > > > > > > > glad
> > > > >> > > > > > > > > > > > > someone
> > > > >> > > > > > > > > > > > > > is finally picking it up. Will take a
> look
> > > at
> > > > >> the
> > > > >> > KIP
> > > > >> > > > at
> > > > >> > > > > > some
> > > > >> > > > > > > > > point
> > > > >> > > > > > > > > > > > > > shortly.
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > -Todd
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay
> > Kreps
> > > <
> > > > >> > > > > > > j...@confluent.io>
> > > > >> > > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > Hey Becket,
> > > > >> > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > Yeah this is really similar to the
> > > callback.
> > > > >> The
> > > > >> > > > > > difference
> > > > >> > > > > > > > is
> > > > >> > > > > > > > > > > really
> > > > >> > > > > > > > > > > > > in
> > > > >> > > > > > > > > > > > > > > who sets the behavior. The idea of the
> > > > >> > interceptor
> > > > >> > > is
> > > > >> > > > > > that
> > > > >> > > > > > > it
> > > > >> > > > > > > > > > > doesn't
> > > > >> > > > > > > > > > > > > > > require any code change in apps so you
> > can
> > > > >> > globally
> > > > >> > > > add
> > > > >> > > > > > > > > behavior
> > > > >> > > > > > > > > > to
> > > > >> > > > > > > > > > > > > your
> > > > >> > > > > > > > > > > > > > > Kafka usage without changing app code.
> > > > Whereas
> > > > >> > the
> > > > >> > > > > > callback
> > > > >> > > > > > > > is
> > > > >> > > > > > > > > > > added
> > > > >> > > > > > > > > > > > by
> > > > >> > > > > > > > > > > > > > the
> > > > >> > > > > > > > > > > > > > > app. The idea is to kind of obviate
> the
> > > need
> > > > >> for
> > > > >> > > the
> > > > >> > > > > > > wrapper
> > > > >> > > > > > > > > code
> > > > >> > > > > > > > > > > > that
> > > > >> > > > > > > > > > > > > > e.g.
> > > > >> > > > > > > > > > > > > > > LinkedIn maintains to hold this kind
> of
> > > > stuff.
> > > > >> > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > -Jay
> > > > >> > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > On Sun, Jan 24, 2016 at 4:21 PM,
> Becket
> > > Qin
> > > > <
> > > > >> > > > > > > > > > becket....@gmail.com>
> > > > >> > > > > > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > This could be a useful feature. And
> I
> > > > think
> > > > >> > there
> > > > >> > > > are
> > > > >> > > > > > > some
> > > > >> > > > > > > > > use
> > > > >> > > > > > > > > > > > cases
> > > > >> > > > > > > > > > > > > to
> > > > >> > > > > > > > > > > > > > > > mutate the data like rejected
> > > alternative
> > > > >> one
> > > > >> > > > > > mentioned.
> > > > >> > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > I am wondering if there is
> functional
> > > > >> > overlapping
> > > > >> > > > > > between
> > > > >> > > > > > > > > > > > > > > >
> > ProducerInterceptor.onAcknowledgement()
> > > > and
> > > > >> the
> > > > >> > > > > > producer
> > > > >> > > > > > > > > > > callback?
> > > > >> > > > > > > > > > > > I
> > > > >> > > > > > > > > > > > > > can
> > > > >> > > > > > > > > > > > > > > > see that the Callback could be a per
> > > > record
> > > > >> > > setting
> > > > >> > > > > > while
> > > > >> > > > > > > > > > > > > > > > onAcknowledgement() is a producer
> > level
> > > > >> > setting.
> > > > >> > > > > Other
> > > > >> > > > > > > than
> > > > >> > > > > > > > > > that,
> > > > >> > > > > > > > > > > > is
> > > > >> > > > > > > > > > > > > > > there
> > > > >> > > > > > > > > > > > > > > > any difference between them?
> > > > >> > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > >> > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:21 PM,
> Neha
> > > > >> Narkhede
> > > > >> > <
> > > > >> > > > > > > > > > > n...@confluent.io>
> > > > >> > > > > > > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > James,
> > > > >> > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > That is one of the many monitoring
> > use
> > > > >> cases
> > > > >> > > for
> > > > >> > > > > the
> > > > >> > > > > > > > > > > interceptor
> > > > >> > > > > > > > > > > > > > > > interface.
> > > > >> > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > > > > > > > > Neha
> > > > >> > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:18 PM,
> > James
> > > > >> Cheng
> > > > >> > <
> > > > >> > > > > > > > > > jch...@tivo.com>
> > > > >> > > > > > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > Anna,
> > > > >> > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > I'm trying to understand a
> > concrete
> > > > use
> > > > >> > case.
> > > > >> > > > It
> > > > >> > > > > > > sounds
> > > > >> > > > > > > > > > like
> > > > >> > > > > > > > > > > > > > producer
> > > > >> > > > > > > > > > > > > > > > > > interceptors could be used to
> > > > implement
> > > > >> > part
> > > > >> > > of
> > > > >> > > > > > > > > LinkedIn's
> > > > >> > > > > > > > > > > > Kafak
> > > > >> > > > > > > > > > > > > > > Audit
> > > > >> > > > > > > > > > > > > > > > > > tool?
> > > > >> > > > > > > > > > > >
> > > > >> > > https://engineering.linkedin.com/kafka/running-kafka-scale
> > > > >> > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > Part of that is done by a
> wrapper
> > > > >> library
> > > > >> > > > around
> > > > >> > > > > > the
> > > > >> > > > > > > > > kafka
> > > > >> > > > > > > > > > > > > producer
> > > > >> > > > > > > > > > > > > > > > that
> > > > >> > > > > > > > > > > > > > > > > > keeps a count of the number of
> > > > messages
> > > > >> > > > produced,
> > > > >> > > > > > and
> > > > >> > > > > > > > > then
> > > > >> > > > > > > > > > > > sends
> > > > >> > > > > > > > > > > > > > that
> > > > >> > > > > > > > > > > > > > > > > count
> > > > >> > > > > > > > > > > > > > > > > > to a side-topic. It sounds like
> > the
> > > > >> > producer
> > > > >> > > > > > > > interceptors
> > > > >> > > > > > > > > > > could
> > > > >> > > > > > > > > > > > > > > > possibly
> > > > >> > > > > > > > > > > > > > > > > be
> > > > >> > > > > > > > > > > > > > > > > > used to implement that?
> > > > >> > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > -James
> > > > >> > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > > On Jan 22, 2016, at 4:33 PM,
> > Anna
> > > > >> > Povzner <
> > > > >> > > > > > > > > > > a...@confluent.io
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > > Hi,
> > > > >> > > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > > I just created a KIP-42 for
> > adding
> > > > >> > producer
> > > > >> > > > and
> > > > >> > > > > > > > > consumer
> > > > >> > > > > > > > > > > > > > > interceptors
> > > > >> > > > > > > > > > > > > > > > > for
> > > > >> > > > > > > > > > > > > > > > > > > intercepting messages at
> > different
> > > > >> points
> > > > >> > > on
> > > > >> > > > > > > producer
> > > > >> > > > > > > > > and
> > > > >> > > > > > > > > > > > > > consumer.
> > > > >> > > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > > >> > > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > > Comments and suggestions are
> > > > welcome!
> > > > >> > > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > > > > > > > > > > Anna
> > > > >> > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > ________________________________
> > > > >> > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > > This email and any attachments
> may
> > > > >> contain
> > > > >> > > > > > > confidential
> > > > >> > > > > > > > > and
> > > > >> > > > > > > > > > > > > > > privileged
> > > > >> > > > > > > > > > > > > > > > > > material for the sole use of the
> > > > >> intended
> > > > >> > > > > > recipient.
> > > > >> > > > > > > > Any
> > > > >> > > > > > > > > > > > review,
> > > > >> > > > > > > > > > > > > > > > copying,
> > > > >> > > > > > > > > > > > > > > > > > or distribution of this email
> (or
> > > any
> > > > >> > > > > attachments)
> > > > >> > > > > > by
> > > > >> > > > > > > > > > others
> > > > >> > > > > > > > > > > is
> > > > >> > > > > > > > > > > > > > > > > prohibited.
> > > > >> > > > > > > > > > > > > > > > > > If you are not the intended
> > > recipient,
> > > > >> > please
> > > > >> > > > > > contact
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > > > > sender
> > > > >> > > > > > > > > > > > > > > > > > immediately and permanently
> delete
> > > > this
> > > > >> > email
> > > > >> > > > and
> > > > >> > > > > > any
> > > > >> > > > > > > > > > > > > attachments.
> > > > >> > > > > > > > > > > > > > No
> > > > >> > > > > > > > > > > > > > > > > > employee or agent of TiVo Inc.
> is
> > > > >> > authorized
> > > > >> > > to
> > > > >> > > > > > > > conclude
> > > > >> > > > > > > > > > any
> > > > >> > > > > > > > > > > > > > binding
> > > > >> > > > > > > > > > > > > > > > > > agreement on behalf of TiVo Inc.
> > by
> > > > >> email.
> > > > >> > > > > Binding
> > > > >> > > > > > > > > > agreements
> > > > >> > > > > > > > > > > > > with
> > > > >> > > > > > > > > > > > > > > TiVo
> > > > >> > > > > > > > > > > > > > > > > > Inc. may only be made by a
> signed
> > > > >> written
> > > > >> > > > > > agreement.
> > > > >> > > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > > > --
> > > > >> > > > > > > > > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > > > > > > > > Neha
> > > > >> > > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > --
> > > > >> > > > > > > > > > > > > > *—-*
> > > > >> > > > > > > > > > > > > > *Todd Palino*
> > > > >> > > > > > > > > > > > > > Staff Site Reliability Engineer
> > > > >> > > > > > > > > > > > > > Data Infrastructure Streaming
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > > linkedin.com/in/toddpalino
> > > > >> > > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > --
> > > > >> > > > > > > > > > > > > -Regards,
> > > > >> > > > > > > > > > > > > Mayuresh R. Gharat
> > > > >> > > > > > > > > > > > > (862) 250-7125
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > --
> > > > >> > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > Neha
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > > > --
> > > > >> > > > > > > > *—-*
> > > > >> > > > > > > > *Todd Palino*
> > > > >> > > > > > > > Staff Site Reliability Engineer
> > > > >> > > > > > > > Data Infrastructure Streaming
> > > > >> > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > > > linkedin.com/in/toddpalino
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > > --
> > > > >> > > > > > -Regards,
> > > > >> > > > > > Mayuresh R. Gharat
> > > > >> > > > > > (862) 250-7125
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > --
> > > > >> > > > *—-*
> > > > >> > > > *Todd Palino*
> > > > >> > > > Staff Site Reliability Engineer
> > > > >> > > > Data Infrastructure Streaming
> > > > >> > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > linkedin.com/in/toddpalino
> > > > >> > > >
> > > > >> > >
> > > > >> > >
> > > > >> > >
> > > > >> > > --
> > > > >> > > -Regards,
> > > > >> > > Mayuresh R. Gharat
> > > > >> > > (862) 250-7125
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -Regards,
> > > > >> Mayuresh R. Gharat
> > > > >> (862) 250-7125
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>

Reply via email to