Hi Henry, Jun and Ismael,

A few things make me wonder if building this into the existing Producer and
Consumer APIs is really the right thing to do:

1. Type safety. The existing Producer and Consumer are both generic in K
and V, but those type parameters are meaningless in the batch case. For
example, the apparent type safety of a Producer<Foo, Bar> would be violated
by using the batch method to actually send a <Baz, Quux>. Another example:
What happens if I pass a producer configured for records to someone that
requires one configured for batches (and vice versa)?

2. The existing Producer and Consumer would both accept a number of configs
which didn't apply in the batch case.

In the discussion for KIP-706 Jason was imagining a more abstracted set of
client APIs which separated the data from the topic destination/origin, and
he mentioned basically this exact use case. This got me thinking, and
although I don't want to derail this conversion, I thought I'd sketch what
I came up with.

On the Consumer side:

// Abstraction over where messages come from
interface ReceiveSource;
class TopicPartition implements SendTarget, ReceiveSource;
class Topic implements SendTarget, ReceiveSource;
class TopicId implements SendTarget, ReceiveSource;
class TopicPattern implements ReceiveSource;

// New abstraction for consumer-like things
interface Receiver<X> {
  assign(ReceiveSource source);
  subscribe(ReceiveSource source);
  // etc
  X poll(Duration);

// Consumer doesn't change, except for the implements clause
interface Consumer<K, V> implements Receiver<ConsumerRecords<K, V>> {
  assign(ReceiveSource source);
  subscribe(ReceiveSource source);
  ConsumerRecords<K, V> poll(Duration);

// KafkaConsumer doesn't change at all
class KafkaConsumer<K, V> implements Consumer<K, V> {

// Specialise Receiver for batch-based consumption.
interface BatchConsumer implements Receiver<ConsumerBatch> {


// Implementation
class KafkaBatchConsumer implements BatchConsumer {


class ConsumerBatch {
  // For KIP-712, a way to convert batches without exposing low level
details like ByteBuffer
  ProducerBatchPayload toProducerBatch();

On the producer side:
// Abstraction over targets (see the ReceiveSource for the impls)
interface SendTarget;

// Abstraction over data that can be send to a target
interface Payload;
class ProducerRecordPayload<K, V> implements Payload {
  // Like ProducerRecord, but without the topic and partition
class ProducerBatchPayload implements Payload {
  // For the KIP-712 case

// A new abstraction over producer-like things
interface Transmitter<P extends Payload> {
  CompletionStage<T> send(SendTarget target, P payload);

// Producer gains an extends clause
interface Producer<K, V> extends Transmitter<ProducerRecord<K, V>> {

class KafkaProducer<K, V> implements Producer<K, V> {
  // Unchanged, included for completeness

interface BatchProducer extends Transmitter<ProducerBatch> {
  CompletionStage<T> send(SendTarget target, ProducerBatch)

class KafkaBatchProducer extends BatchProducer {
  // New. In practice a lot of common code between this and KafkaProducer
could be factored into an abstract class.

Really I'm just re-stating Jason's KIP-706 idea in the context of this KIP,
but it would address the type safety issue and also enable a batch consumer
to have its own set of configs. It also allows the new Producer.send return
type to be CompletionStage, which is KIP-706's objective. And, of course
it's compatible with possible future work around produce to/consume from
topic id.

Kind regards,


On Thu, Apr 1, 2021 at 9:11 AM Henry Cai <h...@pinterest.com.invalid> wrote:

> Jun,
> Thanks for your insight looking into this KIP, we do believe the shallow
> iteration will give quite a significant performance boost.
> On your concerns:
> 1. Cleaner API.  One alternative is to create new batch APIs.  On consumer,
> it would become Consumer.pollBatch returns a ConsumerBatch object which
> contains topic/partition/firstOffsetOfBatch/pointerToByteBuffer, similarly
> Producer.sendBatch(ProducerBatch).  Both ConsumerBatch and ProducerBatch
> objects are fixed types (no generics), serializer is gone, interceptors are
> probably not needed initially (unless people see the need to intercept on
> the batch level).  On MM2 side, the current flow is ConsumerRecord ->
> Connect's SourceRecord -> ProducerRecord, we would need to enhance Connect
> framework to add SourceTask.pollBatch() method which returns a SourceBatch
> object, so the object conversion flow becomes ConsumerBatch -> SourceBatch
> -> ProducerBatch, we probably won't support any transformers on Batch
> objects.
> 2. PID/ProducerEpoch/SeqNo passing through RecordBatch.  I think those
> transaction fields are only meaningful in the original source kafka
> cluster, producer id/seqNo are not the same for the target kafka cluster.
> So if MM is not going to support transactions at the moment, we can clear
> those fields when they are going through MM.  Once MM starts to support
> transactions in the future, it probably will start its own PID/SeqNo etc.
> 3. For EOS and read_committed/read_uncommitted support, we can do phased
> support.  Phase 1, don't support transactional messages in the source
> cluster (i.e. abort if it sees control batch records).  Phase 2: applying
> commit/abort on the batch boundary level.  I am not too familiar with the
> isolation level and abort transaction code path, but it seems the control
> unit is currently on the batch boundary (commit/abort the whole batch), if
> so, this should also be doable.
> 4. MessageHandler in MM1 or SMT in MM2, initially we don't need to support
> them.  Since now the object is a ConsumerBatch and the existing handler is
> written for the individual object.  Deserialize the batch into individual
> objects would defeat the purpose of performance optimization.
> 5. Multiple batch performances, will do some testing on this.
> On Wed, Mar 31, 2021 at 10:14 AM Jun Rao <j...@confluent.io.invalid> wrote:
> > Hi, Henry,
> >
> > Thanks for the KIP. Sorry for the late reply. A few comments below.
> >
> > 1. The 'shallow' feature is potentially useful. I do agree with Tom that
> > the proposed API changes seem unclean. Quite a few existing stuff don't
> > really work together with this (e.g., generics, serializer, interceptors,
> > configs like max.poll.records, etc). It's also hard to explain this
> change
> > to the common users of the consumer/producer API. I think it would be
> > useful to explore if there is another cleaner way of adding this. For
> > example, you mentioned that creating a new set of APIs doesn't work for
> > MM2. However, we could potentially change the connect interface to allow
> > MM2 to use the new API. If this doesn't work, it would be useful to
> explain
> > that in the rejected alternative section.
> >
> > 2. I am not sure that we could pass through all fields in RecordBatch.
> For
> > example, a MM instance could be receiving RecordBatch from different
> source
> > partitions. Mixing the PID/ProducerEpoch/FirstSequence fields across them
> > in a single producer will be weird. So, it would be useful to document
> this
> > part clearer.
> >
> > 3. EOS. While MM itself doesn't support mirroring data in an
> > exactly-once way, it needs to support reading from a topic with EOS data.
> > So, it would be useful to document whether both read_committed and
> > read_uncommitted mode are supported and what kind of RecordBatch the
> > consumer returns in each case.
> >
> > 4. With the 'shallow' feature, it seems that some existing features in MM
> > won't work. For example, I am not sure if SMT works in MM2
> > and MirrorMakerMessageHandler works in MM1. It would be useful to
> document
> > this kind of impact in the KIP.
> >
> > 5. Multiple batches per partition in the produce request. This seems not
> > strictly required in KIP-98. However, changing this will probably add a
> bit
> > more complexity in the producer. So, it would be useful to understand its
> > benefits, especially since it doesn't seem to directly help reduce the
> > cost in MM. For example, do you have performance numbers with and without
> > this enabled in your MM tests?
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Mar 30, 2021 at 1:27 PM Henry Cai <h...@pinterest.com.invalid>
> > wrote:
> >
> > > Tom,
> > >
> > > Thanks for your comments.  Yes it's a bit clumsy to use the existing
> > > consumer and producer API to carry the underlying record batch, but
> > > creating a new set of API would also mean other use cases (e.g. MM2)
> > > wouldn't be able to use that feature easily.  We can throw exceptions
> if
> > we
> > > see clients are setting serializer/compression in the consumer config
> > > option.
> > >
> > > The consumer is essentially getting back a collection of
> > > RecordBatchByteBuffer records and passing them to the producer.  Most
> of
> > > the internal APIs inside consumer and producer code paths are actually
> > > taking on ByteBuffer as the argument so it's not too much work to get
> the
> > > byte buffer through.
> > >
> > > For the worry that the client might see the inside of that byte buffer,
> > we
> > > can create a RecordBatchByteBufferRecord class to wrap the underlying
> > byte
> > > buffer so hopefully they will not drill too deep into that object.
> > Java's
> > > ByteBuffer does have a asReadOnlyBuffer() method to return a read-only
> > > buffer, that can be explored as well.
> > >
> > > On Tue, Mar 30, 2021 at 4:24 AM Tom Bentley <tbent...@redhat.com>
> wrote:
> > >
> > > > Hi Henry and Ryanne,
> > > >
> > > > Related to Ismael's point about the producer & consumer configs being
> > > > dangerous, I can see two parts to this:
> > > >
> > > > 2a. Both the proposed configs seem to be fundamentally incompatible
> > with
> > > > the Producer's existing key.serializer, value.serializer and
> > > > compression.type configs, likewise the consumers key.deserializer and
> > > > value.deserializer. I don't see a way to avoid this, since those
> > existing
> > > > configs are already separate things. (I did consider whether using
> > > > special-case Deserializer and Serializer could be used instead, but
> > that
> > > > doesn't work nicely; in this use case they're necessarily all
> > configured
> > > > together). I think all we could do would be to reject configs which
> > tried
> > > > to set those existing client configs in conjunction with
> > fetch.raw.bytes
> > > > and send.raw.bytes.
> > > >
> > > > 2b. That still leaves a public Java API which would allow access to
> the
> > > raw
> > > > byte buffers. AFAICS we don't actually need user code to have access
> to
> > > the
> > > > raw buffers. It would be enough to get an opaque object that wrapped
> > the
> > > > ByteBuffer from the consumer and pass it to the producer. It's only
> the
> > > > consumer and producer code which needs to be able to obtain the
> wrapped
> > > > buffer.
> > > >
> > > > Kind regards,
> > > >
> > > > Tom
> > > >
> > > > On Tue, Mar 30, 2021 at 8:41 AM Ismael Juma <ism...@juma.me.uk>
> wrote:
> > > >
> > > > > Hi Henry,
> > > > >
> > > > > Can you clarify why this "network performance" issue is only
> related
> > to
> > > > > shallow mirroring? Generally, we want the protocol to be generic
> and
> > > not
> > > > > have a number of special cases. The more special cases you have,
> the
> > > > > tougher it becomes to test all the edge cases.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Mon, Mar 29, 2021 at 9:51 PM Henry Cai
> <h...@pinterest.com.invalid
> > >
> > > > > wrote:
> > > > >
> > > > > > It's interesting this VOTE thread finally becomes a DISCUSS
> thread.
> > > > > >
> > > > > > For MM2 concern, I will take a look to see whether I can add the
> > > > support
> > > > > > for MM2.
> > > > > >
> > > > > > For Ismael's concern on multiple batches in the ProduceRequest
> > > > > (conflicting
> > > > > > with KIP-98), here is my take:
> > > > > >
> > > > > > 1. We do need to group multiple batches in the same request
> > otherwise
> > > > the
> > > > > > network performance will suffer.
> > > > > > 2. For the concern on transactional message support as in KIP-98,
> > > since
> > > > > MM1
> > > > > > and MM2 currently don't support transactional messages, KIP-712
> > will
> > > > not
> > > > > > attempt to support transactions either.  I will add a config
> option
> > > on
> > > > > > producer config: allowMultipleBatches.  By default this option
> will
> > > be
> > > > > off
> > > > > > and the user needs to explicitly turn on this option to use the
> > > shallow
> > > > > > mirror feature.  And if we detect both this option and
> transaction
> > is
> > > > > > turned on we will throw an exception to protect current
> transaction
> > > > > > processing.
> > > > > > 3. In the future, when MM2 starts to support exact-once and
> > > > transactional
> > > > > > messages (is that KIP-656?), we can revisit this code.  The
> current
> > > > > > transactional message already makes the compromise that the
> > messages
> > > in
> > > > > the
> > > > > > same RecordBatch (MessageSet) are sharing the same
> > > > > > sequence-id/transaction-id, so those messages need to be
> committed
> > > all
> > > > > > together.  I think when we support the shallow mirror with
> > > > transactional
> > > > > > semantics, we will group all batches in the same ProduceRequest
> in
> > > the
> > > > > same
> > > > > > transaction boundary, they need to be committed all together.  On
> > the
> > > > > > broker side, all batches coming from ProduceRequest (or
> > > FetchResponse)
> > > > > are
> > > > > > committed in the same log segment file as one unit (current
> > > behavior).
> > > > > >
> > > > > > On Mon, Mar 29, 2021 at 8:46 AM Ryanne Dolan <
> > ryannedo...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Ah, I see, thanks Ismael. Now I understand your concern.
> > > > > > >
> > > > > > > From KIP-98, re this change in v3:
> > > > > > >
> > > > > > > "This allows us to remove the message set size since each
> message
> > > set
> > > > > > > already contains a field for the size. More importantly, since
> > > there
> > > > is
> > > > > > > only one message set to be written to the log, partial produce
> > > > failures
> > > > > > are
> > > > > > > no longer possible. The full message set is either successfully
> > > > written
> > > > > > to
> > > > > > > the log (and replicated) or it is not."
> > > > > > >
> > > > > > > The schema and size field don't seem to be an issue, as KIP-712
> > > > already
> > > > > > > addresses.
> > > > > > >
> > > > > > > The partial produce failure issue is something I don't
> > understand.
> > > I
> > > > > > can't
> > > > > > > tell if this was done out of convenience at the time or if
> there
> > is
> > > > > > > something incompatible with partial produce success/failure and
> > > EOS.
> > > > > Does
> > > > > > > anyone know?
> > > > > > >
> > > > > > > Ryanne
> > > > > > >
> > > > > > > On Mon, Mar 29, 2021, 1:41 AM Ismael Juma <ism...@juma.me.uk>
> > > wrote:
> > > > > > >
> > > > > > > > Ryanne,
> > > > > > > >
> > > > > > > > You misunderstood the referenced comment. It is about the
> > produce
> > > > > > request
> > > > > > > > change to have multiple batches:
> > > > > > > >
> > > > > > > > "Up to ProduceRequest V2, a ProduceRequest can contain
> multiple
> > > > > batches
> > > > > > > of
> > > > > > > > messages stored in the record_set field, but this was
> disabled
> > in
> > > > V3.
> > > > > > We
> > > > > > > > are proposing to bring the multiple batches feature back to
> > > improve
> > > > > the
> > > > > > > > network throughput of the mirror maker producer when the
> > original
> > > > > batch
> > > > > > > > size from source broker is too small."
> > > > > > > >
> > > > > > > > This is unrelated to shallow iteration.
> > > > > > > >
> > > > > > > > Ismael
> > > > > > > >
> > > > > > > > On Sun, Mar 28, 2021, 10:15 PM Ryanne Dolan <
> > > ryannedo...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Ismael, I don't think KIP-98 is related. Shallow iteration
> > was
> > > > > > removed
> > > > > > > in
> > > > > > > > > KAFKA-732, which predates KIP-98 by a few years.
> > > > > > > > >
> > > > > > > > > Ryanne
> > > > > > > > >
> > > > > > > > > On Sun, Mar 28, 2021, 11:25 PM Ismael Juma <
> > ism...@juma.me.uk>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks for the KIP. I have a few high level comments:
> > > > > > > > > >
> > > > > > > > > > 1. Like Tom, I'm not convinced about the proposal to make
> > > this
> > > > > > change
> > > > > > > > to
> > > > > > > > > > MirrorMaker 1 if we intend to deprecate it and remove
> it. I
> > > > would
> > > > > > > > rather
> > > > > > > > > us
> > > > > > > > > > focus our efforts on the implementation we intend to
> > support
> > > > > going
> > > > > > > > > forward.
> > > > > > > > > > 2. The producer/consumer configs seem pretty dangerous
> for
> > > > > general
> > > > > > > > usage,
> > > > > > > > > > but the KIP doesn't address the potential downsides.
> > > > > > > > > > 3. How does the ProducerRequest change impact
> exactly-once
> > > (if
> > > > at
> > > > > > > all)?
> > > > > > > > > The
> > > > > > > > > > change we are reverting was done as part of KIP-98. Have
> we
> > > > > > > considered
> > > > > > > > > the
> > > > > > > > > > original reasons for the change?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Ismael
> > > > > > > > > >
> > > > > > > > > > On Wed, Feb 10, 2021 at 12:58 PM Vahid Hashemian <
> > > > > > > > > > vahid.hashem...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Retitled the thread to conform to the common format.
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Feb 5, 2021 at 4:00 PM Ning Zhang <
> > > > > > ning2008w...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hello Henry,
> > > > > > > > > > > >
> > > > > > > > > > > > This is a very interesting proposal.
> > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10728
> > > reflects
> > > > > the
> > > > > > > > > similar
> > > > > > > > > > > > concern of re-compressing data in mirror maker.
> > > > > > > > > > > >
> > > > > > > > > > > > Probably one thing may need to clarify is: how
> > "shallow"
> > > > > > > mirroring
> > > > > > > > is
> > > > > > > > > > > only
> > > > > > > > > > > > applied to mirrormaker use case, if the changes need
> to
> > > be
> > > > > made
> > > > > > > on
> > > > > > > > > > > generic
> > > > > > > > > > > > consumer and producer (e.g. by adding
> `fetch.raw.bytes`
> > > and
> > > > > > > > > > > > `send.raw.bytes` to producer and consumer config)
> > > > > > > > > > > >
> > > > > > > > > > > > On 2021/02/05 00:59:57, Henry Cai
> > > > <h...@pinterest.com.INVALID
> > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > > > Dear Community members,
> > > > > > > > > > > > >
> > > > > > > > > > > > > We are proposing a new feature to improve the
> > > performance
> > > > > of
> > > > > > > > Kafka
> > > > > > > > > > > mirror
> > > > > > > > > > > > > maker:
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring
> > > > > > > > > > > > >
> > > > > > > > > > > > > The current Kafka MirrorMaker process (with the
> > > > underlying
> > > > > > > > Consumer
> > > > > > > > > > and
> > > > > > > > > > > > > Producer library) uses significant CPU cycles and
> > > memory
> > > > to
> > > > > > > > > > > > > decompress/recompress, deserialize/re-serialize
> > > messages
> > > > > and
> > > > > > > copy
> > > > > > > > > > > > multiple
> > > > > > > > > > > > > times of messages bytes along the
> > mirroring/replicating
> > > > > > stages.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The KIP proposes a *shallow mirror* feature which
> > > brings
> > > > > back
> > > > > > > the
> > > > > > > > > > > shallow
> > > > > > > > > > > > > iterator concept to the mirror process and also
> > > proposes
> > > > to
> > > > > > > skip
> > > > > > > > > the
> > > > > > > > > > > > > unnecessary message decompression and recompression
> > > > steps.
> > > > > > We
> > > > > > > > > argue
> > > > > > > > > > in
> > > > > > > > > > > > > many cases users just want a simple replication
> > > pipeline
> > > > to
> > > > > > > > > replicate
> > > > > > > > > > > the
> > > > > > > > > > > > > message as it is from the source cluster to the
> > > > destination
> > > > > > > > > cluster.
> > > > > > > > > > > In
> > > > > > > > > > > > > many cases the messages in the source cluster are
> > > already
> > > > > > > > > compressed
> > > > > > > > > > > and
> > > > > > > > > > > > > properly batched, users just need an identical copy
> > of
> > > > the
> > > > > > > > message
> > > > > > > > > > > bytes
> > > > > > > > > > > > > through the mirroring without any transformation or
> > > > > > > > repartitioning.
> > > > > > > > > > > > >
> > > > > > > > > > > > > We have a prototype implementation in house with
> > > > > MirrorMaker
> > > > > > v1
> > > > > > > > and
> > > > > > > > > > > > > observed *CPU usage dropped from 50% to 15%* for
> some
> > > > > mirror
> > > > > > > > > > pipelines.
> > > > > > > > > > > > >
> > > > > > > > > > > > > We name this feature: *shallow mirroring* since it
> > has
> > > > some
> > > > > > > > > > resemblance
> > > > > > > > > > > > to
> > > > > > > > > > > > > the old Kafka 0.7 namesake feature but the
> > > > implementations
> > > > > > are
> > > > > > > > not
> > > > > > > > > > > quite
> > > > > > > > > > > > > the same.  ‘*Shallow*’ means 1. we *shallowly*
> > iterate
> > > > > > > > > RecordBatches
> > > > > > > > > > > > inside
> > > > > > > > > > > > > MemoryRecords structure instead of deep iterating
> > > records
> > > > > > > inside
> > > > > > > > > > > > > RecordBatch; 2. We *shallowly* copy (share)
> pointers
> > > > inside
> > > > > > > > > > ByteBuffer
> > > > > > > > > > > > > instead of deep copying and deserializing bytes
> into
> > > > > objects.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Please share discussions/feedback along this email
> > > > thread.
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > >
> > > > > > > > > > > Thanks!
> > > > > > > > > > > --Vahid
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >

