Thanks Jason, the intuition behind defining a separate callback function is
that, with KIP-429 we no longer guarantee to call OnPartitionsAssigned() or
OnPartitionsRevoked() with each rebalance. Our requirement is to be
up-to-date with group metadata such as generation information, so callback
like onGroupJoined() would make more sense as it should be invoked after
every successful rebalance.

Best,
Boyang

On Mon, Aug 12, 2019 at 2:02 PM Jason Gustafson <ja...@confluent.io> wrote:

> Hey Boyang,
>
> I favor option 4 as well. It's a little more cumbersome than 3 for this use
> case, but it seems like a cleaner separation of concerns. The rebalance
> listener is already concerned with events affecting the assignment
> lifecycle and group membership. I think the only thing I'm wondering is
> whether it should be a separate callback as you've suggested, or if it
> would make sense to overload `onPartitionsAssigned`. If it's separate,
> maybe a name like `onGroupJoined` would be clearer?
>
> Thanks,
> Jason
>
>
>
> On Thu, Aug 8, 2019 at 10:59 PM Boyang Chen <reluctanthero...@gmail.com>
> wrote:
>
> > Thank you Jason. We had some offline discussion on properly keeping group
> > metadata up to date, and here are some of our options brainstormed:
> > 1. Let the caller of `sendOffsetsToTransaction(offset, metadata)`
> maintain
> > the ever-changing group metadata. This could be done on stream side, but
> > for non-stream EOS the sample code will become complicated as the user
> > needs to implement the partition assignor interface to get the update
> from
> > `onAssignment`
> >
> > 2. Get a new API on producer like `refreshGroupMetadata(metadata)`. This
> is
> > similar to option 1 except that now in the partition assignor callback we
> > could straightly pass in the producer instance, which simplifies the
> > non-stream EOS, however this new API seems weird to define on producer.
> >
> > 3. Make an accessing interface to group metadata, or just expose the
> group
> > metadata through a consumer API like `consumer.GroupMetadata()`. This is
> > the old way which avoids the users’ effort to implement partition
> assignor
> > directly.
> >
> > 4. Expose the group metadata through rebalance listener, which is a more
> > well-known and adopted callback interface. We could do sth like
> > `onGroupMetadataUpdated(ConsumerGroupMetadata metadata)`
> >
> > To simplify the code logic, we believe option 3 & 4 are better solutions,
> > and of which I slightly prefer option 4 as it is the most clean solution
> > with less intrusion to both consumer and producer APIs.
> >
> > WDYT?
> >
> > Boyang
> >
> >
> >
> >
> > On Wed, Aug 7, 2019 at 9:20 AM Jason Gustafson <ja...@confluent.io>
> wrote:
> >
> > > Hi Boyang,
> > >
> > > > We already persist member.id, instance.id and generation.id in the
> > > offset
> > > topic, what extra fields we need to store?
> > >
> > > Yeah, you're right. I was a little confused and thought this
> information
> > > was needed by the transaction coordinator.
> > >
> > > > This should be easily done on the stream side as we have
> > > StreamsPartitionAssignor to reflect metadata changes upon
> > #onAssignment(),
> > > but non-stream user has to code the callback by hand, do you think the
> > > convenience we sacrifice here worth the simplification benefit?
> > >
> > > Either way, you need a reference to the consumer. I was mostly just
> > > thinking it would be better to reduce the integration point to its
> > minimum.
> > > Have you thought through the implications of needing to keep around a
> > > reference to the consumer in the producer? What if it gets closed? It
> > seems
> > > better not to have to think about these cases.
> > >
> > > -Jason
> > >
> > > On Tue, Aug 6, 2019 at 9:53 PM Boyang Chen <reluctanthero...@gmail.com
> >
> > > wrote:
> > >
> > > > Thank you for the suggestions Jason. And a side note for Guozhang, I
> > > > updated the KIP to reflect the dependency on 447.
> > > >
> > > > On Tue, Aug 6, 2019 at 11:35 AM Jason Gustafson <ja...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hi Boyang, thanks for the updates. I have a few more comments:
> > > > >
> > > > > 1. We are adding some new fields to TxnOffsetCommit to support
> > > > group-based
> > > > > fencing. Do we need these fields to be persisted in the offsets
> topic
> > > to
> > > > > ensure that the fencing still works after a coordinator failover?
> > > > >
> > > > > We already persist member.id, instance.id and generation.id in the
> > > > offset
> > > > topic, what extra fields we need to store?
> > > >
> > > >
> > > > > 2. Since you are proposing a new `groupMetadata` API, have you
> > > considered
> > > > > whether we still need the `initTransactions` overload? Another way
> > > would
> > > > be
> > > > > to pass it through the `sendOffsetsToTransaction` API:
> > > > >
> > > > > void sendOffsetsToTransaction(Map<TopicPartition,
> OffsetAndMetadata>
> > > > > offsets, GroupMetadata groupMetadata) throws
> > > > > ProducerFencedException, IllegalGenerationException;
> > > > >
> > > > > This seems a little more consistent with the current API and avoids
> > the
> > > > > direct dependence on the Consumer in the producer.
> > > > >
> > > > > Note that although we avoid one dependency to consumer, producer
> > needs
> > > to
> > > > periodically update
> > > > its group metadata, or in this case the caller of
> > > > *sendOffsetsToTransaction(Map<TopicPartition,
> > > > OffsetAndMetadata>*
> > > > *offsets, GroupMetadata groupMetadata) *is responsible for getting
> the
> > > > latest value of group metadata.
> > > > This should be easily done on the stream side as we have
> > > > StreamsPartitionAssignor to reflect metadata changes upon
> > > #onAssignment(),
> > > > but non-stream user has to code the callback by hand, do you think
> the
> > > > convenience we sacrifice here worth the simplification benefit?
> > > >
> > > >
> > > > > 3. Can you clarify the behavior of the clients when the brokers do
> > not
> > > > > support the latest API versions? This is both for the new
> > > TxnOffsetCommit
> > > > > and the OffsetFetch APIs. I guess the high level idea in streams is
> > to
> > > > > detect broker support before instantiating the producer and
> > consumer. I
> > > > > think that's reasonable, but we might need some approach for
> > > non-streams
> > > > > use cases. One option I was considering is enforcing the latest
> > version
> > > > > through the new `sendOffsetsToTransaction` API. Basically when you
> > use
> > > > the
> > > > > new API, we require support for the latest TxnOffsetCommit version.
> > > This
> > > > > puts some burden on users, but it avoids breaking correctness
> > > assumptions
> > > > > when the new APIs are in use. What do you think?
> > > > >
> > > > Yes, I think we haven't covered this case, so the plan is to crash
> the
> > > > non-stream application when the job is using new sendOffsets API.
> > > >
> > > > >
> > > > > -Jason
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Aug 5, 2019 at 6:06 PM Boyang Chen <
> > reluctanthero...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Yep, Guozhang I think that would be best as passing in an entire
> > > > consumer
> > > > > > instance is indeed cumbersome.
> > > > > >
> > > > > > Just saw you updated KIP-429, I will follow-up to change 447 as
> > well.
> > > > > >
> > > > > > Best,
> > > > > > Boyang
> > > > > >
> > > > > > On Mon, Aug 5, 2019 at 3:18 PM Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > okay I think I understand your concerns about
> > ConsumerGroupMetadata
> > > > > now:
> > > > > > if
> > > > > > > we still want to only call initTxns once, then we should allow
> > the
> > > > > > whatever
> > > > > > > passed-in parameter to reflect the latest value of generation
> id
> > > > > whenever
> > > > > > > sending the offset fetch request.
> > > > > > >
> > > > > > > Whereas the current ConsumerGroupMetadata is a static object.
> > > > > > >
> > > > > > > Maybe we can consider having an extended class of
> > > > ConsumerGroupMetadata
> > > > > > > whose values are updated from the consumer's rebalance
> callback?
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Aug 5, 2019 at 9:26 AM Boyang Chen <
> > > > reluctanthero...@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thank you Guozhang for the reply! I'm curious whether KIP-429
> > has
> > > > > > > reflected
> > > > > > > > the latest change on ConsumerGroupMetadata? Also regarding
> > > question
> > > > > > one,
> > > > > > > > the group metadata needs to be accessed via callback, does
> that
> > > > mean
> > > > > we
> > > > > > > > need a separate producer API such like
> > > > > > > > "producer.refreshMetadata(groupMetadata)" to be able to
> access
> > it
> > > > > > instead
> > > > > > > > of passing in the consumer instance?
> > > > > > > >
> > > > > > > > Boyang
> > > > > > > >
> > > > > > > > On Fri, Aug 2, 2019 at 4:36 PM Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Boyang,
> > > > > > > > >
> > > > > > > > > I've made another pass on KIP-447 as well as
> > > > > > > > > https://github.com/apache/kafka/pull/7078, and have some
> > minor
> > > > > > > comments
> > > > > > > > > about the proposed API:
> > > > > > > > >
> > > > > > > > > 1. it seems instead of needing the whole KafkaConsumer
> > object,
> > > > > you'd
> > > > > > > only
> > > > > > > > > need the "ConsumerGroupMetadata", in that case can we just
> > pass
> > > > in
> > > > > > that
> > > > > > > > > object into the initTxns call?
> > > > > > > > >
> > > > > > > > > 2. the current trunk already has a public class named
> > > > > > > > > (ConsumerGroupMetadata)
> > > > > > > > > under o.a.k.clients.consumer created by KIP-429. If we want
> > to
> > > > just
> > > > > > use
> > > > > > > > > that then maybe it makes less sense to declare a base
> > > > GroupMetadata
> > > > > > as
> > > > > > > we
> > > > > > > > > are already leaking such information on the assignor
> anyways.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > > On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen <
> > > > > > > reluctanthero...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thank you Guozhang for the reply. We will consider the
> > > > interface
> > > > > > > change
> > > > > > > > > > from 429 as a backup plan for 447.
> > > > > > > > > >
> > > > > > > > > > And bumping this thread for more discussion.
> > > > > > > > > >
> > > > > > > > > > On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang <
> > > > > wangg...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen <
> > > > > > > > > reluctanthero...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thank you Guozhang for the suggestion! I would
> normally
> > > > > prefer
> > > > > > > > > naming a
> > > > > > > > > > > > flag corresponding to its functionality. Seems to me
> > > > > > > > > `isolation_level`
> > > > > > > > > > > > makes us another hop on information track.
> > > > > > > > > > > >
> > > > > > > > > > > > Fair enough, let's use a separate flag name then :)
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > As for the generation.id exposure, I'm fine
> leveraging
> > > the
> > > > > new
> > > > > > > API
> > > > > > > > > > from
> > > > > > > > > > > > 429, but however is that design finalized yet, and
> > > whether
> > > > > the
> > > > > > > API
> > > > > > > > > will
> > > > > > > > > > > be
> > > > > > > > > > > > added on the generic Consumer<K, V> interface?
> > > > > > > > > > > >
> > > > > > > > > > > > The current PartitionAssignor is inside `internals`
> > > package
> > > > > and
> > > > > > > in
> > > > > > > > > > > KIP-429
> > > > > > > > > > > we are going to create a new interface out of
> `internals`
> > > to
> > > > > > really
> > > > > > > > > make
> > > > > > > > > > it
> > > > > > > > > > > public APIs, and as part of that we are refactoring
> some
> > of
> > > > its
> > > > > > > > method
> > > > > > > > > > > signatures. I just feel some of the newly introduced
> > > classes
> > > > > can
> > > > > > be
> > > > > > > > > > reused
> > > > > > > > > > > in your KIP as well, i.e. just for code succinctness,
> but
> > > no
> > > > > > > > semantical
> > > > > > > > > > > indications.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > Boyang
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang <
> > > > > > > wangg...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Boyang, thanks for the updated proposal!
> > > > > > > > > > > > >
> > > > > > > > > > > > > 3.a. As Jason mentioned, with EOS enabled we still
> > need
> > > > to
> > > > > > > > augment
> > > > > > > > > > the
> > > > > > > > > > > > > offset fetch request with a boolean to indicate
> "give
> > > me
> > > > an
> > > > > > > > > retriable
> > > > > > > > > > > > error
> > > > > > > > > > > > > code if there's pending offset, rather than sending
> > me
> > > > the
> > > > > > > > > committed
> > > > > > > > > > > > offset
> > > > > > > > > > > > > immediately". Personally I still feel it is okay to
> > > > > > piggy-back
> > > > > > > on
> > > > > > > > > the
> > > > > > > > > > > > > ISOLATION_LEVEL boolean, but I'm also fine with
> > another
> > > > > > > > > > > > `await_transaction`
> > > > > > > > > > > > > boolean if you feel strongly about it.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 10. About the exposure of generation id, there may
> be
> > > > some
> > > > > > > > > > refactoring
> > > > > > > > > > > > work
> > > > > > > > > > > > > coming from KIP-429 that can benefit KIP-447 as
> well
> > > > since
> > > > > we
> > > > > > > are
> > > > > > > > > > > > wrapping
> > > > > > > > > > > > > the consumer subscription / assignment data in new
> > > > classes.
> > > > > > > Note
> > > > > > > > > that
> > > > > > > > > > > > > current proposal does not `generationId` since with
> > the
> > > > > > > > cooperative
> > > > > > > > > > > > sticky
> > > > > > > > > > > > > assignor we think it is not necessary for
> > correctness,
> > > > but
> > > > > > also
> > > > > > > > if
> > > > > > > > > we
> > > > > > > > > > > > agree
> > > > > > > > > > > > > it is okay to expose it we can potentially include
> it
> > > in
> > > > > > > > > > > > > `ConsumerAssignmentData` as well.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen <
> > > > > > > > > > > reluctanthero...@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thank you Jason for the ideas.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson <
> > > > > > > > > > ja...@confluent.io>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Boyang,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for the updates. A few comments below:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. The KIP mentions that `
> transaction.timeout.ms
> > `
> > > > > should
> > > > > > > be
> > > > > > > > > > > reduced
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > 10s.
> > > > > > > > > > > > > > > I think this makes sense for Kafka Streams
> which
> > is
> > > > > tied
> > > > > > to
> > > > > > > > the
> > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > group semantics and uses a default 10s session
> > > > timeout.
> > > > > > > > > However,
> > > > > > > > > > it
> > > > > > > > > > > > > > seems a
> > > > > > > > > > > > > > > bit dangerous to make this change for the
> > producer
> > > > > > > generally.
> > > > > > > > > > Could
> > > > > > > > > > > > we
> > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > change it for streams?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > That sounds good to me.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 2. The new `initTransactions` API takes a
> > > `Consumer`
> > > > > > > > instance.
> > > > > > > > > I
> > > > > > > > > > > > think
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > idea is to basically put in a backdoor to give
> > the
> > > > > > producer
> > > > > > > > > > access
> > > > > > > > > > > to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > group generationId. It's not clear to me how
> this
> > > > would
> > > > > > > work
> > > > > > > > > > given
> > > > > > > > > > > > > > package
> > > > > > > > > > > > > > > restrictions. I wonder if it would be better to
> > > just
> > > > > > expose
> > > > > > > > the
> > > > > > > > > > > state
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > > need from the consumer. I know we have been
> > > reluctant
> > > > > to
> > > > > > do
> > > > > > > > > this
> > > > > > > > > > so
> > > > > > > > > > > > far
> > > > > > > > > > > > > > > because we treat the generationId as an
> > > > implementation
> > > > > > > > detail.
> > > > > > > > > > > > > However, I
> > > > > > > > > > > > > > > think we might just bite the bullet and expose
> it
> > > > > rather
> > > > > > > than
> > > > > > > > > > > coming
> > > > > > > > > > > > up
> > > > > > > > > > > > > > > with a messy hack. Concepts such as memberIds
> > have
> > > > > > already
> > > > > > > > been
> > > > > > > > > > > > exposed
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > the AdminClient, so maybe it is not too bad.
> > > > > > Alternatively,
> > > > > > > > we
> > > > > > > > > > > could
> > > > > > > > > > > > > use
> > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > opaque type. For example:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > // public
> > > > > > > > > > > > > > > interface GroupMetadata {}
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > // private
> > > > > > > > > > > > > > > interface ConsumerGroupMetadata {
> > > > > > > > > > > > > > >   final int generationId;
> > > > > > > > > > > > > > >   final String memberId;
> > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > // Consumer API
> > > > > > > > > > > > > > > public GroupMetadata groupMetadata();
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I am probably leaning toward just exposing the
> > > state
> > > > we
> > > > > > > need.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Yes, also to mention that Kafka Streams use
> > generic
> > > > > > > Cosnumer
> > > > > > > > > API
> > > > > > > > > > > > which
> > > > > > > > > > > > > > doesn't have rich
> > > > > > > > > > > > > > states like a full `KafkaConsumer`. The hack will
> > not
> > > > > work
> > > > > > as
> > > > > > > > > > > expected.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Instead, just exposing the consumer
> generation.id
> > > > seems
> > > > > a
> > > > > > > way
> > > > > > > > > > easier
> > > > > > > > > > > > > work.
> > > > > > > > > > > > > > We could consolidate
> > > > > > > > > > > > > > the API and make it
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 3. Given that we are already providing a way to
> > > > propagate
> > > > > > > group
> > > > > > > > > > state
> > > > > > > > > > > > > from
> > > > > > > > > > > > > > > the consumer to the producer, I wonder if we
> may
> > as
> > > > > well
> > > > > > > > > include
> > > > > > > > > > > the
> > > > > > > > > > > > > > > memberId and groupInstanceId. This would make
> the
> > > > > > > validation
> > > > > > > > we
> > > > > > > > > > do
> > > > > > > > > > > > for
> > > > > > > > > > > > > > > TxnOffsetCommit consistent with OffsetCommit.
> If
> > > for
> > > > no
> > > > > > > other
> > > > > > > > > > > > benefit,
> > > > > > > > > > > > > at
> > > > > > > > > > > > > > > least this may help with debugging.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Yes, we could put them into the GroupMetadata
> > struct.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 4. I like the addition of isolation_level to
> the
> > > > offset
> > > > > > > > fetch.
> > > > > > > > > At
> > > > > > > > > > > the
> > > > > > > > > > > > > > same
> > > > > > > > > > > > > > > time, its behavior is a bit inconsistent with
> how
> > > it
> > > > is
> > > > > > > used
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > > > > > consumer generally. There is no reason for the
> > > group
> > > > > > > > > coordinator
> > > > > > > > > > to
> > > > > > > > > > > > > ever
> > > > > > > > > > > > > > > expose aborted data, so this is mostly about
> > > awaiting
> > > > > > > pending
> > > > > > > > > > > offset
> > > > > > > > > > > > > > > commits, not reading uncommitted data. Perhaps
> > > > instead
> > > > > of
> > > > > > > > > calling
> > > > > > > > > > > > this
> > > > > > > > > > > > > > > "isolation level," it should be more like
> > > > > > > > > > > "await_pending_transaction"
> > > > > > > > > > > > > or
> > > > > > > > > > > > > > > something like that?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Also, just to be clear, the consumer would
> treat
> > > this
> > > > > as
> > > > > > an
> > > > > > > > > > > optional
> > > > > > > > > > > > > > field,
> > > > > > > > > > > > > > > right? So if the broker does not support the
> > latest
> > > > > > > > OffsetFetch
> > > > > > > > > > > API,
> > > > > > > > > > > > it
> > > > > > > > > > > > > > > would silently revert to reading the old data.
> > > > > Basically
> > > > > > it
> > > > > > > > > would
> > > > > > > > > > > be
> > > > > > > > > > > > up
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > the streams version probing logic to ensure
> that
> > > the
> > > > > > > > > expectation
> > > > > > > > > > on
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > > API fits with the usage of `transctional.id`.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Sounds like a better naming to me, while I
> think
> > it
> > > > > could
> > > > > > > be
> > > > > > > > > > > > shortened
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > `await_transaction`.
> > > > > > > > > > > > > > I think the field should be optional, too.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Jason
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen <
> > > > > > > > > > > > reluctanthero...@gmail.com
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hey Guozhang,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I will correct my statement from last email.
> I
> > > > don't
> > > > > > > think
> > > > > > > > > the
> > > > > > > > > > > > > > > > read_committed (3.a) is necessary to be added
> > to
> > > > the
> > > > > > > > > > OffsetFetch
> > > > > > > > > > > > > > request,
> > > > > > > > > > > > > > > > as if we are using EOS application, the
> > > underlying
> > > > > > > > consumers
> > > > > > > > > > > within
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > group should always back off when there is
> > > pending
> > > > > > > offsets.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Let me know if you think this is correct.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen <
> > > > > > > > > > > > > reluctanthero...@gmail.com
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thank you Guozhang for the questions,
> inline
> > > > > answers
> > > > > > > are
> > > > > > > > > > below.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen
> <
> > > > > > > > > > > > > > reluctanthero...@gmail.com
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >> Hey all,
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> I have done a fundamental polish of
> KIP-447
> > > > > > > > > > > > > > > > >> <
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > >> written a design doc
> > > > > > > > > > > > > > > > >> <
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit#
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > depicting
> > > > > > > > > > > > > > > > >> internal changes. We stripped off many
> > > > > > implementation
> > > > > > > > > > details
> > > > > > > > > > > > from
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > KIP,
> > > > > > > > > > > > > > > > >> and simplified the public changes by a
> lot.
> > > For
> > > > > > > > reviewers,
> > > > > > > > > > it
> > > > > > > > > > > is
> > > > > > > > > > > > > > > highly
> > > > > > > > > > > > > > > > >> recommended to fully understand EOS design
> > in
> > > > > KIP-98
> > > > > > > and
> > > > > > > > > > read
> > > > > > > > > > > > its
> > > > > > > > > > > > > > > > >> corresponding design doc
> > > > > > > > > > > > > > > > >> <
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > >> you haven't done so already.
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> Let me know if you found anything
> confusing
> > > > around
> > > > > > the
> > > > > > > > KIP
> > > > > > > > > > or
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > design.
> > > > > > > > > > > > > > > > >> Would be happy to discuss in depth.
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> Best,
> > > > > > > > > > > > > > > > >> Boyang
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> On Wed, Jun 26, 2019 at 11:00 AM Guozhang
> > > Wang <
> > > > > > > > > > > > > wangg...@gmail.com>
> > > > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>> 2. The reason we did not expose
> > > generation.id
> > > > > from
> > > > > > > > > > > > KafkaConsumer
> > > > > > > > > > > > > > > > public
> > > > > > > > > > > > > > > > >>> APIs directly is to abstract this notion
> > from
> > > > > users
> > > > > > > > > (since
> > > > > > > > > > it
> > > > > > > > > > > > is
> > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > >>> implementation detail of the rebalance
> > > protocol
> > > > > > > itself,
> > > > > > > > > > e.g.
> > > > > > > > > > > if
> > > > > > > > > > > > > > user
> > > > > > > > > > > > > > > > >>> calls
> > > > > > > > > > > > > > > > >>> consumer.assign() they do not need to
> > invoke
> > > > > > > > > > > > ConsumerCoordinator
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > no
> > > > > > > > > > > > > > > > >>> need to be aware of generation.id at
> all).
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >>> On the other hand, with the current
> > proposal
> > > > the
> > > > > > > > > > > > txn.coordiantor
> > > > > > > > > > > > > > did
> > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > >>> know about the latest generation from the
> > > > > > > > source-of-truth
> > > > > > > > > > > > > > > > >>> group.coordinator; instead, it will only
> > bump
> > > > up
> > > > > > the
> > > > > > > > > > > generation
> > > > > > > > > > > > > > from
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >>> producer's InitProducerIdRequest only.
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >>> The key here is that GroupCoordinator,
> when
> > > > > > handling
> > > > > > > > > > > > > > > > >>> `InitProducerIdRequest
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >> In the new design, we just pass the entire
> > > > > consumer
> > > > > > > > > instance
> > > > > > > > > > > > into
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > producer through
> > > > > > > > > > > > > > > > > #initTransaction, so no public API will be
> > > > created.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >> 3. I agree that if we rely on the group
> > > > > coordinator
> > > > > > to
> > > > > > > > > block
> > > > > > > > > > > on
> > > > > > > > > > > > > > > > returning
> > > > > > > > > > > > > > > > >>> offset-fetch-response if read-committed
> is
> > > > > enabled,
> > > > > > > > then
> > > > > > > > > we
> > > > > > > > > > > do
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > > > > >>> store partition assignment on txn
> > coordinator
> > > > and
> > > > > > > > > therefore
> > > > > > > > > > > > it's
> > > > > > > > > > > > > > > better
> > > > > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > > > > >>> still decouple them. For that case we
> still
> > > > need
> > > > > to
> > > > > > > > > update
> > > > > > > > > > > the
> > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > > wiki
> > > > > > > > > > > > > > > > >>> page that includes:
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >>> 3.a. Augment OffsetFetchRequest with the
> > > > > > > > ISOLATION_LEVEL
> > > > > > > > > as
> > > > > > > > > > > > well.
> > > > > > > > > > > > > > > > >>> 3.b. Add new error code in
> > > OffsetFetchResponse
> > > > to
> > > > > > let
> > > > > > > > > > client
> > > > > > > > > > > > > > backoff
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > >>> retry if there are pending txns including
> > the
> > > > > > > > interested
> > > > > > > > > > > > > > partitions.
> > > > > > > > > > > > > > > > >>> 3.c. Also in the worst case we would let
> > the
> > > > > client
> > > > > > > be
> > > > > > > > > > > blocked
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >>> txn.timeout period, and for that
> rationale
> > we
> > > > may
> > > > > > > need
> > > > > > > > to
> > > > > > > > > > > > > consider
> > > > > > > > > > > > > > > > >>> reducing
> > > > > > > > > > > > > > > > >>> our default txn.timeout value as well.
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >>> Addressed 3.b and 3.c, will do 3.a.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >> 4. According to Colin it seems we do not
> > need
> > > to
> > > > > > > create
> > > > > > > > > > > another
> > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > >>> can just complete it as part of KIP-117 /
> > > > > > KAFKA-5214;
> > > > > > > > and
> > > > > > > > > > we
> > > > > > > > > > > > need
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > do
> > > > > > > > > > > > > > > > >>> some cleanup to have BrokerApiVersion
> > exposed
> > > > > from
> > > > > > > > > > > AdminClient
> > > > > > > > > > > > > > > (@Colin
> > > > > > > > > > > > > > > > >>> please let use know if you have any
> > concerns
> > > > > > exposing
> > > > > > > > > it).
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >> I think we no longer need to rely on api
> > > version
> > > > > for
> > > > > > > > > > > > > initialization,
> > > > > > > > > > > > > > > > > since we will be using the upgrade.from
> > config
> > > > > > anyway.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>> Guozhang
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >>> On Tue, Jun 25, 2019 at 6:43 PM Jason
> > > > Gustafson <
> > > > > > > > > > > > > > ja...@confluent.io>
> > > > > > > > > > > > > > > > >>> wrote:
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >>> > For reference, we have
> > > > BrokerApiVersionCommand
> > > > > > > > already
> > > > > > > > > > as a
> > > > > > > > > > > > > > public
> > > > > > > > > > > > > > > > >>> > interface. We have a bit of tech debt
> at
> > > the
> > > > > > moment
> > > > > > > > > > because
> > > > > > > > > > > > it
> > > > > > > > > > > > > > > uses a
> > > > > > > > > > > > > > > > >>> > custom AdminClient. It would be nice to
> > > clean
> > > > > > that
> > > > > > > > up.
> > > > > > > > > In
> > > > > > > > > > > > > > general,
> > > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > >>> think
> > > > > > > > > > > > > > > > >>> > it is reasonable to expose from
> > > AdminClient.
> > > > It
> > > > > > can
> > > > > > > > be
> > > > > > > > > > used
> > > > > > > > > > > > by
> > > > > > > > > > > > > > > > >>> management
> > > > > > > > > > > > > > > > >>> > tools to inspect running Kafka versions
> > for
> > > > > > > example.
> > > > > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > > > > >>> > -Jason
> > > > > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > > > > >>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang
> > > Chen <
> > > > > > > > > > > > > > > > >>> reluctanthero...@gmail.com>
> > > > > > > > > > > > > > > > >>> > wrote:
> > > > > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > > > > >>> > > Thank you for the context Colin. The
> > > > groupId
> > > > > > was
> > > > > > > > > > indeed a
> > > > > > > > > > > > > > > > copy-paste
> > > > > > > > > > > > > > > > >>> > error.
> > > > > > > > > > > > > > > > >>> > > Our use case here for 447 is (Quoted
> > from
> > > > > > > > Guozhang):
> > > > > > > > > > > > > > > > >>> > > '''
> > > > > > > > > > > > > > > > >>> > > I think if we can do something else
> to
> > > > > > > > > > > > > > > > >>> > > avoid this config though, for example
> > we
> > > > can
> > > > > > use
> > > > > > > > the
> > > > > > > > > > > > embedded
> > > > > > > > > > > > > > > > >>> AdminClient
> > > > > > > > > > > > > > > > >>> > > to send the APIVersion request upon
> > > > starting
> > > > > > up,
> > > > > > > > and
> > > > > > > > > > > based
> > > > > > > > > > > > on
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >>> > returned
> > > > > > > > > > > > > > > > >>> > > value decides whether to go to the
> old
> > > code
> > > > > > path
> > > > > > > or
> > > > > > > > > the
> > > > > > > > > > > new
> > > > > > > > > > > > > > > > behavior.
> > > > > > > > > > > > > > > > >>> > > '''
> > > > > > > > > > > > > > > > >>> > > The benefit we get is to avoid
> adding a
> > > new
> > > > > > > > > > configuration
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > make a
> > > > > > > > > > > > > > > > >>> > > decision simply base on broker
> version.
> > > If
> > > > > you
> > > > > > > have
> > > > > > > > > > > > concerns
> > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > >>> > exposing
> > > > > > > > > > > > > > > > >>> > > ApiVersion for client, we could
> > > > > > > > > > > > > > > > >>> > > try to think of alternative solutions
> > > too.
> > > > > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > > > > >>> > > Boyang
> > > > > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > > > > >>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin
> > > > McCabe
> > > > > <
> > > > > > > > > > > > > > cmcc...@apache.org
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >>> wrote:
> > > > > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > > > > >>> > > > kafka.api.ApiVersion is an internal
> > > > class,
> > > > > > not
> > > > > > > > > > suitable
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > exposing
> > > > > > > > > > > > > > > > >>> > > > through AdminClient.  That class is
> > not
> > > > > even
> > > > > > > > > > accessible
> > > > > > > > > > > > > > without
> > > > > > > > > > > > > > > > >>> having
> > > > > > > > > > > > > > > > >>> > > the
> > > > > > > > > > > > > > > > >>> > > > broker jars on your CLASSPATH.
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > > > > >>> > > > Another question is, what is the
> > > groupId
> > > > > > > > parameter
> > > > > > > > > > > doing
> > > > > > > > > > > > in
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >>> call?
> > > > > > > > > > > > > > > > >>> > > The
> > > > > > > > > > > > > > > > >>> > > > API versions are the same no matter
> > > what
> > > > > > > consumer
> > > > > > > > > > group
> > > > > > > > > > > > we
> > > > > > > > > > > > > > use,
> > > > > > > > > > > > > > > > >>> right?
> > > > > > > > > > > > > > > > >>> > > > Perhaps this was a copy and paste
> > > error?
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > > > > >>> > > > This is not the first time we have
> > > > > discussed
> > > > > > > > > having a
> > > > > > > > > > > > > method
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > >>> > > > AdminClient to retrieve API version
> > > > > > > information.
> > > > > > > > > In
> > > > > > > > > > > > fact,
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >>> original
> > > > > > > > > > > > > > > > >>> > > KIP
> > > > > > > > > > > > > > > > >>> > > > which created KafkaAdminClient
> > > specified
> > > > an
> > > > > > API
> > > > > > > > for
> > > > > > > > > > > > > fetching
> > > > > > > > > > > > > > > > >>> version
> > > > > > > > > > > > > > > > >>> > > > information.  It was called
> > apiVersions
> > > > and
> > > > > > it
> > > > > > > is
> > > > > > > > > > still
> > > > > > > > > > > > > there
> > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > >>> the
> > > > > > > > > > > > > > > > >>> > > wiki.
> > > > > > > > > > > > > > > > >>> > > > See
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > > > > >>> > > > However, this API wasn't ready in
> > time
> > > > for
> > > > > > > 0.11.0
> > > > > > > > > so
> > > > > > > > > > we
> > > > > > > > > > > > > > shipped
> > > > > > > > > > > > > > > > >>> without
> > > > > > > > > > > > > > > > >>> > > > it.  There was a JIRA to implement
> it
> > > for
> > > > > > later
> > > > > > > > > > > versions,
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-5214
> > > > > > > > ,
> > > > > > > > > > as
> > > > > > > > > > > > well
> > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > >>> PR,
> > > > > > > > > > > > > > > > >>> > > >
> > > > https://github.com/apache/kafka/pull/3012
> > > > > .
> > > > > > > > > > However,
> > > > > > > > > > > we
> > > > > > > > > > > > > > > started
> > > > > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > > > > >>> > > > rethink whether this AdminClient
> > > function
> > > > > was
> > > > > > > > even
> > > > > > > > > > > > > necessary.
> > > > > > > > > > > > > > > > >>> Most of
> > > > > > > > > > > > > > > > >>> > > the
> > > > > > > > > > > > > > > > >>> > > > use-cases we could think of seemed
> > like
> > > > > > > horrible
> > > > > > > > > > hacks.
> > > > > > > > > > > > So
> > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > has
> > > > > > > > > > > > > > > > >>> > never
> > > > > > > > > > > > > > > > >>> > > > really been implemented (yet?).
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > > > > >>> > > > best,
> > > > > > > > > > > > > > > > >>> > > > Colin
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > > > > >>> > > > On Tue, Jun 25, 2019, at 15:46,
> > Boyang
> > > > Chen
> > > > > > > > wrote:
> > > > > > > > > > > > > > > > >>> > > > > Actually, after a second
> thought, I
> > > > think
> > > > > > it
> > > > > > > > > > actually
> > > > > > > > > > > > > makes
> > > > > > > > > > > > > > > > >>> sense to
> > > > > > > > > > > > > > > > >>> > > > > support auto upgrade through
> admin
> > > > client
> > > > > > to
> > > > > > > > help
> > > > > > > > > > use
> > > > > > > > > > > > get
> > > > > > > > > > > > > > api
> > > > > > > > > > > > > > > > >>> version
> > > > > > > > > > > > > > > > >>> > > > > from
> > > > > > > > > > > > > > > > >>> > > > > broker.
> > > > > > > > > > > > > > > > >>> > > > > A draft KIP is here:
> > > > > > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> > > > > > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > > > > > >>> > > > > Boyang
> > > > > > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > > > > > >>> > > > > On Tue, Jun 25, 2019 at 2:57 PM
> > > Boyang
> > > > > > Chen <
> > > > > > > > > > > > > > > > >>> > > reluctanthero...@gmail.com>
> > > > > > > > > > > > > > > > >>> > > > > wrote:
> > > > > > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > > > > > >>> > > > > > Thank you Guozhang, some of my
> > > > > > > understandings
> > > > > > > > > are
> > > > > > > > > > > > > inline
> > > > > > > > > > > > > > > > below.
> > > > > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > > On Tue, Jun 25, 2019 at 11:05
> AM
> > > > Jason
> > > > > > > > > Gustafson
> > > > > > > > > > <
> > > > > > > > > > > > > > > > >>> > ja...@confluent.io
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > > > > >>> > > > > > wrote:
> > > > > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >> > I think co-locating does
> have
> > > some
> > > > > > > merits
> > > > > > > > > > here,
> > > > > > > > > > > > i.e.
> > > > > > > > > > > > > > > > >>> letting the
> > > > > > > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator which
> has
> > > the
> > > > > > > > > > > source-of-truth
> > > > > > > > > > > > of
> > > > > > > > > > > > > > > > >>> assignment
> > > > > > > > > > > > > > > > >>> > to
> > > > > > > > > > > > > > > > >>> > > > act
> > > > > > > > > > > > > > > > >>> > > > > >> as
> > > > > > > > > > > > > > > > >>> > > > > >> > the TxnCoordinator as well;
> > but
> > > I
> > > > > > agree
> > > > > > > > > > there's
> > > > > > > > > > > > also
> > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > >>> cons
> > > > > > > > > > > > > > > > >>> > of
> > > > > > > > > > > > > > > > >>> > > > > >> coupling
> > > > > > > > > > > > > > > > >>> > > > > >> > them together. I'm still a
> bit
> > > > > > inclining
> > > > > > > > > > towards
> > > > > > > > > > > > > > > > colocation
> > > > > > > > > > > > > > > > >>> but
> > > > > > > > > > > > > > > > >>> > if
> > > > > > > > > > > > > > > > >>> > > > there
> > > > > > > > > > > > > > > > >>> > > > > >> > are good rationales not to
> do
> > > so I
> > > > > can
> > > > > > > be
> > > > > > > > > > > > convinced
> > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > well.
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > > > >> The good rationale is that we
> > have
> > > > no
> > > > > > > > > mechanism
> > > > > > > > > > to
> > > > > > > > > > > > > > > colocate
> > > > > > > > > > > > > > > > >>> > > > partitions ;).
> > > > > > > > > > > > > > > > >>> > > > > >> Are you suggesting we store
> the
> > > > group
> > > > > > and
> > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > >>> in the
> > > > > > > > > > > > > > > > >>> > > > same
> > > > > > > > > > > > > > > > >>> > > > > >> log? Can you be more concrete
> > > about
> > > > > the
> > > > > > > > > benefit?
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > > > >> -Jason
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > > > >> On Tue, Jun 25, 2019 at 10:51
> AM
> > > > > > Guozhang
> > > > > > > > > Wang <
> > > > > > > > > > > > > > > > >>> > wangg...@gmail.com>
> > > > > > > > > > > > > > > > >>> > > > > >> wrote:
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > > > >> > Hi Boyang,
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >> > 1. One advantage of retry
> > > against
> > > > > > > on-hold
> > > > > > > > is
> > > > > > > > > > > that
> > > > > > > > > > > > it
> > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > >>> not
> > > > > > > > > > > > > > > > >>> > > > tie-up a
> > > > > > > > > > > > > > > > >>> > > > > >> > handler thread (of course
> the
> > > > latter
> > > > > > > could
> > > > > > > > > do
> > > > > > > > > > > the
> > > > > > > > > > > > > same
> > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > >>> that
> > > > > > > > > > > > > > > > >>> > > > involves
> > > > > > > > > > > > > > > > >>> > > > > >> > using a purgatory which is
> > more
> > > > > > > > > complicated),
> > > > > > > > > > > and
> > > > > > > > > > > > > also
> > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > >>> > less
> > > > > > > > > > > > > > > > >>> > > > > >> likely to
> > > > > > > > > > > > > > > > >>> > > > > >> > violate request timeout. So
> I
> > > > think
> > > > > > > there
> > > > > > > > > are
> > > > > > > > > > > some
> > > > > > > > > > > > > > > > >>> rationales to
> > > > > > > > > > > > > > > > >>> > > > prefer
> > > > > > > > > > > > > > > > >>> > > > > >> > retries.
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > > > >  That sounds fair to me, also
> we
> > > are
> > > > > > > avoiding
> > > > > > > > > > usage
> > > > > > > > > > > > of
> > > > > > > > > > > > > > > > another
> > > > > > > > > > > > > > > > >>> > > > purgatory
> > > > > > > > > > > > > > > > >>> > > > > > instance. Usually for one
> > back-off
> > > > > > > > > > > > > > > > >>> > > > > > we are only delaying 50ms
> during
> > > > > startup
> > > > > > > > which
> > > > > > > > > is
> > > > > > > > > > > > > trivial
> > > > > > > > > > > > > > > > cost.
> > > > > > > > > > > > > > > > >>> > This
> > > > > > > > > > > > > > > > >>> > > > > > behavior shouldn't be changed.
> > > > > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > > > 2. Regarding
> > > > > > "ConsumerRebalanceListener":
> > > > > > > > > both
> > > > > > > > > > > > > > > > >>> > > > ConsumerRebalanceListener
> > > > > > > > > > > > > > > > >>> > > > > >> > and PartitionAssignors are
> > > > > > > > user-customizable
> > > > > > > > > > > > > modules,
> > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > >>> only
> > > > > > > > > > > > > > > > >>> > > > > >> difference
> > > > > > > > > > > > > > > > >>> > > > > >> > is that the former is
> > specified
> > > > via
> > > > > > code
> > > > > > > > and
> > > > > > > > > > the
> > > > > > > > > > > > > > latter
> > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > >>> > > > specified via
> > > > > > > > > > > > > > > > >>> > > > > >> > config.
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >> > Regarding Jason's proposal
> of
> > > > > > > > > > > ConsumerAssignment,
> > > > > > > > > > > > > one
> > > > > > > > > > > > > > > > thing
> > > > > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > > > > >>> > > note
> > > > > > > > > > > > > > > > >>> > > > > >> though
> > > > > > > > > > > > > > > > >>> > > > > >> > with KIP-429 the
> > > > onPartitionAssigned
> > > > > > may
> > > > > > > > not
> > > > > > > > > > be
> > > > > > > > > > > > > called
> > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > >>> the
> > > > > > > > > > > > > > > > >>> > > > assignment
> > > > > > > > > > > > > > > > >>> > > > > >> > does not change, whereas
> > > > > onAssignment
> > > > > > > > would
> > > > > > > > > > > always
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > >>> called at
> > > > > > > > > > > > > > > > >>> > > the
> > > > > > > > > > > > > > > > >>> > > > end
> > > > > > > > > > > > > > > > >>> > > > > >> of
> > > > > > > > > > > > > > > > >>> > > > > >> > sync-group response. My
> > proposed
> > > > > > > semantics
> > > > > > > > > is
> > > > > > > > > > > that
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > `RebalanceListener#onPartitionsXXX`
> > > > > > are
> > > > > > > > used
> > > > > > > > > > for
> > > > > > > > > > > > > > > > >>> notifications
> > > > > > > > > > > > > > > > >>> > to
> > > > > > > > > > > > > > > > >>> > > > user,
> > > > > > > > > > > > > > > > >>> > > > > >> and
> > > > > > > > > > > > > > > > >>> > > > > >> > hence if there's no changes
> > > these
> > > > > will
> > > > > > > not
> > > > > > > > > be
> > > > > > > > > > > > > called,
> > > > > > > > > > > > > > > > >>> whereas
> > > > > > > > > > > > > > > > >>> > > > > >> > `PartitionAssignor` is used
> > for
> > > > > > assignor
> > > > > > > > > > logic,
> > > > > > > > > > > > > whose
> > > > > > > > > > > > > > > > >>> callback
> > > > > > > > > > > > > > > > >>> > > would
> > > > > > > > > > > > > > > > >>> > > > > >> always
> > > > > > > > > > > > > > > > >>> > > > > >> > be called no matter if the
> > > > > partitions
> > > > > > > have
> > > > > > > > > > > changed
> > > > > > > > > > > > > or
> > > > > > > > > > > > > > > not.
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > > > >> I think a third option is to
> > > > > gracefully
> > > > > > > > expose
> > > > > > > > > > > > > > generation
> > > > > > > > > > > > > > > id
> > > > > > > > > > > > > > > > >>> as
> > > > > > > > > > > > > > > > >>> > part
> > > > > > > > > > > > > > > > >>> > > > of
> > > > > > > > > > > > > > > > >>> > > > > > consumer API, so that we don't
> > need
> > > > to
> > > > > > > > > > > > > > > > >>> > > > > > bother overloading various
> > > callbacks.
> > > > > Of
> > > > > > > > > course,
> > > > > > > > > > > this
> > > > > > > > > > > > > > > builds
> > > > > > > > > > > > > > > > >>> upon
> > > > > > > > > > > > > > > > >>> > the
> > > > > > > > > > > > > > > > >>> > > > > > assumption that topic
> partitions
> > > > > > > > > > > > > > > > >>> > > > > > will not be included in new
> > > > > > initTransaction
> > > > > > > > > API.
> > > > > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > > > 3. I feel it is a bit awkward
> > to
> > > > let
> > > > > > the
> > > > > > > > > > > > > TxnCoordinator
> > > > > > > > > > > > > > > > >>> keeping
> > > > > > > > > > > > > > > > >>> > > > partition
> > > > > > > > > > > > > > > > >>> > > > > >> > assignments since it is sort
> > of
> > > > > taking
> > > > > > > > over
> > > > > > > > > > the
> > > > > > > > > > > > job
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator, and may
> > > > likely
> > > > > > > cause
> > > > > > > > a
> > > > > > > > > > > > > > split-brain
> > > > > > > > > > > > > > > > >>> problem
> > > > > > > > > > > > > > > > >>> > as
> > > > > > > > > > > > > > > > >>> > > > two
> > > > > > > > > > > > > > > > >>> > > > > >> > coordinators keep a copy of
> > this
> > > > > > > > assignment
> > > > > > > > > > > which
> > > > > > > > > > > > > may
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > >>> > > different.
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >> > I think co-locating does
> have
> > > some
> > > > > > > merits
> > > > > > > > > > here,
> > > > > > > > > > > > i.e.
> > > > > > > > > > > > > > > > >>> letting the
> > > > > > > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator which
> has
> > > the
> > > > > > > > > > > source-of-truth
> > > > > > > > > > > > of
> > > > > > > > > > > > > > > > >>> assignment
> > > > > > > > > > > > > > > > >>> > to
> > > > > > > > > > > > > > > > >>> > > > act
> > > > > > > > > > > > > > > > >>> > > > > >> as
> > > > > > > > > > > > > > > > >>> > > > > >> > the TxnCoordinator as well;
> > but
> > > I
> > > > > > agree
> > > > > > > > > > there's
> > > > > > > > > > > > also
> > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > >>> cons
> > > > > > > > > > > > > > > > >>> > of
> > > > > > > > > > > > > > > > >>> > > > > >> coupling
> > > > > > > > > > > > > > > > >>> > > > > >> > them together. I'm still a
> bit
> > > > > > inclining
> > > > > > > > > > towards
> > > > > > > > > > > > > > > > colocation
> > > > > > > > > > > > > > > > >>> but
> > > > > > > > > > > > > > > > >>> > if
> > > > > > > > > > > > > > > > >>> > > > there
> > > > > > > > > > > > > > > > >>> > > > > >> > are good rationales not to
> do
> > > so I
> > > > > can
> > > > > > > be
> > > > > > > > > > > > convinced
> > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > well.
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > > > > The purpose of co-location is
> to
> > > let
> > > > > txn
> > > > > > > > > > > coordinator
> > > > > > > > > > > > > see
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >>> group
> > > > > > > > > > > > > > > > >>> > > > > > assignment. This priority is
> > > weakened
> > > > > > > > > > > > > > > > >>> > > > > > when we already have defense on
> > the
> > > > > > > consumer
> > > > > > > > > > offset
> > > > > > > > > > > > > > fetch,
> > > > > > > > > > > > > > > > so I
> > > > > > > > > > > > > > > > >>> > guess
> > > > > > > > > > > > > > > > >>> > > > it's
> > > > > > > > > > > > > > > > >>> > > > > > not super important anymore.
> > > > > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > 4. I guess I'm preferring
> the
> > > > > > philosophy
> > > > > > > > of
> > > > > > > > > > > "only
> > > > > > > > > > > > > add
> > > > > > > > > > > > > > > > >>> configs if
> > > > > > > > > > > > > > > > >>> > > > > >> there's no
> > > > > > > > > > > > > > > > >>> > > > > >> > other ways", since more and
> > more
> > > > > > configs
> > > > > > > > > would
> > > > > > > > > > > > make
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > less
> > > > > > > > > > > > > > > > >>> and
> > > > > > > > > > > > > > > > >>> > > less
> > > > > > > > > > > > > > > > >>> > > > > >> > intuitive out of the box to
> > use.
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >> > I think it's a valid point
> > that
> > > > > checks
> > > > > > > > upon
> > > > > > > > > > > > starting
> > > > > > > > > > > > > > up
> > > > > > > > > > > > > > > > >>> does not
> > > > > > > > > > > > > > > > >>> > > > cope
> > > > > > > > > > > > > > > > >>> > > > > >> with
> > > > > > > > > > > > > > > > >>> > > > > >> > brokers downgrading but even
> > > with
> > > > a
> > > > > > > > config,
> > > > > > > > > > but
> > > > > > > > > > > it
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > still
> > > > > > > > > > > > > > > > >>> hard
> > > > > > > > > > > > > > > > >>> > > for
> > > > > > > > > > > > > > > > >>> > > > > >> users
> > > > > > > > > > > > > > > > >>> > > > > >> > to determine when they can
> be
> > > > > ensured
> > > > > > > the
> > > > > > > > > > broker
> > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > never
> > > > > > > > > > > > > > > > >>> > > > downgrade
> > > > > > > > > > > > > > > > >>> > > > > >> > anymore and hence can safely
> > > > switch
> > > > > > the
> > > > > > > > > > config.
> > > > > > > > > > > So
> > > > > > > > > > > > > my
> > > > > > > > > > > > > > > > >>> feeling is
> > > > > > > > > > > > > > > > >>> > > > that
> > > > > > > > > > > > > > > > >>> > > > > >> this
> > > > > > > > > > > > > > > > >>> > > > > >> > config would not be helping
> > too
> > > > much
> > > > > > > > still.
> > > > > > > > > If
> > > > > > > > > > > we
> > > > > > > > > > > > > want
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >>> be at
> > > > > > > > > > > > > > > > >>> > > the
> > > > > > > > > > > > > > > > >>> > > > > >> safer
> > > > > > > > > > > > > > > > >>> > > > > >> > side, then I'd suggest we
> > modify
> > > > the
> > > > > > > > > > Coordinator
> > > > > > > > > > > > ->
> > > > > > > > > > > > > > > > >>> > NetworkClient
> > > > > > > > > > > > > > > > >>> > > > > >> hierarchy
> > > > > > > > > > > > > > > > >>> > > > > >> > to allow the NetworkClient
> > being
> > > > > able
> > > > > > to
> > > > > > > > > pass
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > APIVersion
> > > > > > > > > > > > > > > > >>> > > > metadata to
> > > > > > > > > > > > > > > > >>> > > > > >> > Coordinator, so that
> > Coordinator
> > > > can
> > > > > > > rely
> > > > > > > > on
> > > > > > > > > > > that
> > > > > > > > > > > > > > logic
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >>> > change
> > > > > > > > > > > > > > > > >>> > > > its
> > > > > > > > > > > > > > > > >>> > > > > >> > behavior dynamically.
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > > > > The stream thread init could
> not
> > be
> > > > > > > supported
> > > > > > > > > by
> > > > > > > > > > a
> > > > > > > > > > > > > client
> > > > > > > > > > > > > > > > >>> > coordinator
> > > > > > > > > > > > > > > > >>> > > > > > behavior change on the fly,
> > > > > > > > > > > > > > > > >>> > > > > > we are only losing
> possibilities
> > > > after
> > > > > we
> > > > > > > > > > > > initialized.
> > > > > > > > > > > > > > > (main
> > > > > > > > > > > > > > > > >>> thread
> > > > > > > > > > > > > > > > >>> > > > gets
> > > > > > > > > > > > > > > > >>> > > > > > exit and no thread has global
> > > picture
> > > > > > > > anymore)
> > > > > > > > > > > > > > > > >>> > > > > > If we do want to support auto
> > > version
> > > > > > > > > detection,
> > > > > > > > > > > > admin
> > > > > > > > > > > > > > > client
> > > > > > > > > > > > > > > > >>> > request
> > > > > > > > > > > > > > > > >>> > > > in
> > > > > > > > > > > > > > > > >>> > > > > > this sense shall be easier.
> > > > > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >> > 5. I do not have a concrete
> > idea
> > > > > about
> > > > > > > how
> > > > > > > > > the
> > > > > > > > > > > > > impact
> > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > >>> Connect
> > > > > > > > > > > > > > > > >>> > > > would
> > > > > > > > > > > > > > > > >>> > > > > >> > make, maybe Randall or
> > > Konstantine
> > > > > can
> > > > > > > > help
> > > > > > > > > > > here?
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > > > > Sounds good, let's see their
> > > > thoughts.
> > > > > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > Guozhang
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >> > On Mon, Jun 24, 2019 at
> 10:26
> > PM
> > > > > > Boyang
> > > > > > > > > Chen <
> > > > > > > > > > > > > > > > >>> > > > > >> reluctanthero...@gmail.com>
> > > > > > > > > > > > > > > > >>> > > > > >> > wrote:
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >> > > Hey Jason,
> > > > > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > thank you for the proposal
> > > here.
> > > > > > Some
> > > > > > > of
> > > > > > > > > my
> > > > > > > > > > > > > thoughts
> > > > > > > > > > > > > > > > >>> below.
> > > > > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at
> 8:58
> > > PM
> > > > > > Jason
> > > > > > > > > > > Gustafson
> > > > > > > > > > > > <
> > > > > > > > > > > > > > > > >>> > > > ja...@confluent.io>
> > > > > > > > > > > > > > > > >>> > > > > >> > > wrote:
> > > > > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > Hi Boyang,
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > Thanks for picking this
> > up!
> > > > > Still
> > > > > > > > > reading
> > > > > > > > > > > > > through
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >>> > updates,
> > > > > > > > > > > > > > > > >>> > > > but
> > > > > > > > > > > > > > > > >>> > > > > >> here
> > > > > > > > > > > > > > > > >>> > > > > >> > > are
> > > > > > > > > > > > > > > > >>> > > > > >> > > > a couple initial
> comments
> > on
> > > > the
> > > > > > > APIs:
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > 1. The
> > `TxnProducerIdentity`
> > > > > class
> > > > > > > is
> > > > > > > > a
> > > > > > > > > > bit
> > > > > > > > > > > > > > > awkward. I
> > > > > > > > > > > > > > > > >>> think
> > > > > > > > > > > > > > > > >>> > > we
> > > > > > > > > > > > > > > > >>> > > > are
> > > > > > > > > > > > > > > > >>> > > > > >> > > trying
> > > > > > > > > > > > > > > > >>> > > > > >> > > > to encapsulate state
> from
> > > the
> > > > > > > current
> > > > > > > > > > group
> > > > > > > > > > > > > > > > assignment.
> > > > > > > > > > > > > > > > >>> > Maybe
> > > > > > > > > > > > > > > > >>> > > > > >> something
> > > > > > > > > > > > > > > > >>> > > > > >> > > > like
> `ConsumerAssignment`
> > > > would
> > > > > be
> > > > > > > > > > clearer?
> > > > > > > > > > > If
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > > make
> > > > > > > > > > > > > > > > >>> the
> > > > > > > > > > > > > > > > >>> > > usage
> > > > > > > > > > > > > > > > >>> > > > > >> > > consistent
> > > > > > > > > > > > > > > > >>> > > > > >> > > > across the consumer and
> > > > > producer,
> > > > > > > then
> > > > > > > > > we
> > > > > > > > > > > can
> > > > > > > > > > > > > > avoid
> > > > > > > > > > > > > > > > >>> exposing
> > > > > > > > > > > > > > > > >>> > > > > >> internal
> > > > > > > > > > > > > > > > >>> > > > > >> > > state
> > > > > > > > > > > > > > > > >>> > > > > >> > > > like the generationId.
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > For example:
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > // Public API
> > > > > > > > > > > > > > > > >>> > > > > >> > > > interface
> > > ConsumerAssignment {
> > > > > > > > > > > > > > > > >>> > > > > >> > > >   Set<TopicPartition>
> > > > > > partittions();
> > > > > > > > > > > > > > > > >>> > > > > >> > > > }
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > // Not a public API
> > > > > > > > > > > > > > > > >>> > > > > >> > > > class
> > > > InternalConsumerAssignment
> > > > > > > > > > implements
> > > > > > > > > > > > > > > > >>> > > ConsumerAssignment {
> > > > > > > > > > > > > > > > >>> > > > > >> > > >   Set<TopicPartition>
> > > > > partittions;
> > > > > > > > > > > > > > > > >>> > > > > >> > > >   int generationId;
> > > > > > > > > > > > > > > > >>> > > > > >> > > > }
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > Then we can change the
> > > > rebalance
> > > > > > > > > listener
> > > > > > > > > > to
> > > > > > > > > > > > > > > something
> > > > > > > > > > > > > > > > >>> like
> > > > > > > > > > > > > > > > >>> > > > this:
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > onPartitionsAssigned(ConsumerAssignment
> > > > > > > > > > > > > > assignment)
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > And on the producer:
> > > > > > > > > > > > > > > > >>> > > > > >> > > > void
> > initTransactions(String
> > > > > > > groupId,
> > > > > > > > > > > > > > > > ConsumerAssignment
> > > > > > > > > > > > > > > > >>> > > > > >> assignment);
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > 2. Another bit of
> > > awkwardness
> > > > is
> > > > > > the
> > > > > > > > > fact
> > > > > > > > > > > that
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > > > > >>> > pass
> > > > > > > > > > > > > > > > >>> > > > the
> > > > > > > > > > > > > > > > >>> > > > > >> > > groupId
> > > > > > > > > > > > > > > > >>> > > > > >> > > > through both
> > > > initTransactions()
> > > > > > and
> > > > > > > > > > > > > > > > >>> > > sendOffsetsToTransaction().
> > > > > > > > > > > > > > > > >>> > > > We
> > > > > > > > > > > > > > > > >>> > > > > >> > could
> > > > > > > > > > > > > > > > >>> > > > > >> > > > consider a config
> instead.
> > > > Maybe
> > > > > > > > > something
> > > > > > > > > > > > like
> > > > > > > > > > > > > `
> > > > > > > > > > > > > > > > >>> > > > > >> > transactional.group.id
> > > > > > > > > > > > > > > > >>> > > > > >> > > `?
> > > > > > > > > > > > > > > > >>> > > > > >> > > > Then we could simplify
> the
> > > > > > producer
> > > > > > > > > APIs,
> > > > > > > > > > > > > > > potentially
> > > > > > > > > > > > > > > > >>> even
> > > > > > > > > > > > > > > > >>> > > > > >> deprecating
> > > > > > > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > current
> > > > > sendOffsetsToTransaction.
> > > > > > In
> > > > > > > > > fact,
> > > > > > > > > > > for
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > >>> > usage,
> > > > > > > > > > > > > > > > >>> > > > the `
> > > > > > > > > > > > > > > > >>> > > > > >> > > > transational.id` config
> > is
> > > > not
> > > > > > > > needed.
> > > > > > > > > It
> > > > > > > > > > > > would
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > >>> nice if
> > > > > > > > > > > > > > > > >>> > we
> > > > > > > > > > > > > > > > >>> > > > don't
> > > > > > > > > > > > > > > > >>> > > > > >> > have
> > > > > > > > > > > > > > > > >>> > > > > >> > > > to
> > > > > > > > > > > > > > > > >>> > > > > >> > > > provide it.
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > I like the idea of
> > > combining 1
> > > > > and
> > > > > > > 2.
> > > > > > > > We
> > > > > > > > > > > could
> > > > > > > > > > > > > > > > >>> definitely
> > > > > > > > > > > > > > > > >>> > pass
> > > > > > > > > > > > > > > > >>> > > > in a
> > > > > > > > > > > > > > > > >>> > > > > >> > > group.id config
> > > > > > > > > > > > > > > > >>> > > > > >> > > so that we could avoid
> > > exposing
> > > > > that
> > > > > > > > > > > information
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > >>> public
> > > > > > > > > > > > > > > > >>> > > API.
> > > > > > > > > > > > > > > > >>> > > > The
> > > > > > > > > > > > > > > > >>> > > > > >> > > question I have
> > > > > > > > > > > > > > > > >>> > > > > >> > > is that whether we should
> > name
> > > > the
> > > > > > > > > interface
> > > > > > > > > > > > > > > > >>> `GroupAssignment`
> > > > > > > > > > > > > > > > >>> > > > > >> instead,
> > > > > > > > > > > > > > > > >>> > > > > >> > so
> > > > > > > > > > > > > > > > >>> > > > > >> > > that Connect later
> > > > > > > > > > > > > > > > >>> > > > > >> > > could also extend on the
> > same
> > > > > > > interface,
> > > > > > > > > > just
> > > > > > > > > > > to
> > > > > > > > > > > > > > echo
> > > > > > > > > > > > > > > > >>> > Guozhang's
> > > > > > > > > > > > > > > > >>> > > > point
> > > > > > > > > > > > > > > > >>> > > > > >> > > here, Also the base
> > interface
> > > > > > > > > > > > > > > > >>> > > > > >> > > is better to be defined
> > empty
> > > > for
> > > > > > easy
> > > > > > > > > > > > extension,
> > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > >>> define an
> > > > > > > > > > > > > > > > >>> > > > > >> abstract
> > > > > > > > > > > > > > > > >>> > > > > >> > > type called `Resource` to
> be
> > > > > > shareable
> > > > > > > > > > > > > > > > >>> > > > > >> > > later IMHO.
> > > > > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > By the way, I'm a bit
> > > confused
> > > > > > about
> > > > > > > > > > > > discussion
> > > > > > > > > > > > > > > above
> > > > > > > > > > > > > > > > >>> about
> > > > > > > > > > > > > > > > >>> > > > > >> colocating
> > > > > > > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > txn and group
> > coordinators.
> > > > That
> > > > > > is
> > > > > > > > not
> > > > > > > > > > > > actually
> > > > > > > > > > > > > > > > >>> necessary,
> > > > > > > > > > > > > > > > >>> > is
> > > > > > > > > > > > > > > > >>> > > > it?
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > Yes, this is not a
> > > requirement
> > > > > for
> > > > > > > > this
> > > > > > > > > > KIP,
> > > > > > > > > > > > > > because
> > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > >>> is
> > > > > > > > > > > > > > > > >>> > > > > >> inherently
> > > > > > > > > > > > > > > > >>> > > > > >> > > impossible to
> > > > > > > > > > > > > > > > >>> > > > > >> > > achieve co-locating  topic
> > > > > partition
> > > > > > > of
> > > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > log
> > > > > > > > > > > > > > > > >>> and
> > > > > > > > > > > > > > > > >>> > > > consumed
> > > > > > > > > > > > > > > > >>> > > > > >> > offset
> > > > > > > > > > > > > > > > >>> > > > > >> > > topics.
> > > > > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > Thanks,
> > > > > > > > > > > > > > > > >>> > > > > >> > > > Jason
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at
> > 10:07
> > > AM
> > > > > > > Boyang
> > > > > > > > > > Chen <
> > > > > > > > > > > > > > > > >>> > > > > >> reluctanthero...@gmail.com
> > > > > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > wrote:
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > Thank you Ismael for
> the
> > > > > > > suggestion.
> > > > > > > > > We
> > > > > > > > > > > will
> > > > > > > > > > > > > > > attempt
> > > > > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > > > > >>> > > > address
> > > > > > > > > > > > > > > > >>> > > > > >> it by
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > giving more details to
> > > > > rejected
> > > > > > > > > > > alternative
> > > > > > > > > > > > > > > section.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > Thank you for the
> > comment
> > > > > > > Guozhang!
> > > > > > > > > > > Answers
> > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > inline
> > > > > > > > > > > > > > > > >>> > > below.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > On Sun, Jun 23, 2019
> at
> > > 6:33
> > > > > PM
> > > > > > > > > Guozhang
> > > > > > > > > > > > Wang
> > > > > > > > > > > > > <
> > > > > > > > > > > > > > > > >>> > > > wangg...@gmail.com
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > wrote:
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > Hello Boyang,
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > Thanks for the KIP,
> I
> > > have
> > > > > > some
> > > > > > > > > > comments
> > > > > > > > > > > > > > below:
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > 1. "Once
> transactions
> > > are
> > > > > > > > complete,
> > > > > > > > > > the
> > > > > > > > > > > > call
> > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > >>> > return."
> > > > > > > > > > > > > > > > >>> > > > This
> > > > > > > > > > > > > > > > >>> > > > > >> > seems
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > different from the
> > > > existing
> > > > > > > > > behavior,
> > > > > > > > > > in
> > > > > > > > > > > > > which
> > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > >>> would
> > > > > > > > > > > > > > > > >>> > > > return a
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > retriable
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > CONCURRENT_TRANSACTIONS
> > > > and
> > > > > > let
> > > > > > > > the
> > > > > > > > > > > client
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >>> retry, is
> > > > > > > > > > > > > > > > >>> > > this
> > > > > > > > > > > > > > > > >>> > > > > >> > > > intentional?
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > I don’t think it is
> > > > > intentional,
> > > > > > > > and I
> > > > > > > > > > > will
> > > > > > > > > > > > > > defer
> > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > >>> > > > question to
> > > > > > > > > > > > > > > > >>> > > > > >> > > Jason
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > when he got time to
> > answer
> > > > > since
> > > > > > > > from
> > > > > > > > > > > what I
> > > > > > > > > > > > > > > > >>> understood
> > > > > > > > > > > > > > > > >>> > > retry
> > > > > > > > > > > > > > > > >>> > > > and
> > > > > > > > > > > > > > > > >>> > > > > >> on
> > > > > > > > > > > > > > > > >>> > > > > >> > > hold
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > seem both valid
> > > approaches.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > 2. "an overload to
> > > > > > > > > > onPartitionsAssigned
> > > > > > > > > > > in
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >>> > consumer's
> > > > > > > > > > > > > > > > >>> > > > > >> rebalance
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > listener interface":
> > as
> > > > part
> > > > > > of
> > > > > > > > > > KIP-341
> > > > > > > > > > > > > we've
> > > > > > > > > > > > > > > > >>> already
> > > > > > > > > > > > > > > > >>> > add
> > > > > > > > > > > > > > > > >>> > > > this
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > information
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > to the onAssignment
> > > > > callback.
> > > > > > > > Would
> > > > > > > > > > this
> > > > > > > > > > > > be
> > > > > > > > > > > > > > > > >>> sufficient?
> > > > > > > > > > > > > > > > >>> > Or
> > > > > > > > > > > > > > > > >>> > > > more
> > > > > > > > > > > > > > > > >>> > > > > >> > > > generally
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > speaking, which
> > > > information
> > > > > > have
> > > > > > > > to
> > > > > > > > > be
> > > > > > > > > > > > > passed
> > > > > > > > > > > > > > > > >>> around in
> > > > > > > > > > > > > > > > >>> > > > > >> rebalance
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > callback
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > while others can be
> > > passed
> > > > > > > around
> > > > > > > > in
> > > > > > > > > > > > > > > > >>> PartitionAssignor
> > > > > > > > > > > > > > > > >>> > > > > >> callback? In
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > Streams
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > for example both
> > > callbacks
> > > > > are
> > > > > > > > used
> > > > > > > > > > but
> > > > > > > > > > > > most
> > > > > > > > > > > > > > > > >>> critical
> > > > > > > > > > > > > > > > >>> > > > > >> information
> > > > > > > > > > > > > > > > >>> > > > > >> > is
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > passed
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > via onAssignment.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > We still need to
> extend
> > > > > > > > > > > > > > ConsumerRebalanceListener
> > > > > > > > > > > > > > > > >>> because
> > > > > > > > > > > > > > > > >>> > > > it’s the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > interface we could
> have
> > > > public
> > > > > > > > access
> > > > > > > > > > to.
> > > > > > > > > > > > The
> > > > > > > > > > > > > > > > >>> > #onAssignment
> > > > > > > > > > > > > > > > >>> > > > call
> > > > > > > > > > > > > > > > >>> > > > > >> is
> > > > > > > > > > > > > > > > >>> > > > > >> > > > defined
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > on PartitionAssignor
> > level
> > > > > which
> > > > > > > is
> > > > > > > > > not
> > > > > > > > > > > easy
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > work
> > > > > > > > > > > > > > > > >>> with
> > > > > > > > > > > > > > > > >>> > > > external
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > producers.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > 3. "We propose to
> use
> > a
> > > > > > separate
> > > > > > > > > > record
> > > > > > > > > > > > type
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > >>> order to
> > > > > > > > > > > > > > > > >>> > > > store
> > > > > > > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > group
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > assignment.": hmm, I
> > > > thought
> > > > > > > with
> > > > > > > > > the
> > > > > > > > > > > > third
> > > > > > > > > > > > > > > typed
> > > > > > > > > > > > > > > > >>> > > > > >> FindCoordinator,
> > > > > > > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > same
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > broker that act as
> the
> > > > > > consumer
> > > > > > > > > > > > coordinator
> > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > >>> > always
> > > > > > > > > > > > > > > > >>> > > be
> > > > > > > > > > > > > > > > >>> > > > > >> > selected
> > > > > > > > > > > > > > > > >>> > > > > >> > > > as
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > the txn coordinator,
> > in
> > > > > which
> > > > > > > case
> > > > > > > > > it
> > > > > > > > > > > can
> > > > > > > > > > > > > > access
> > > > > > > > > > > > > > > > its
> > > > > > > > > > > > > > > > >>> > local
> > > > > > > > > > > > > > > > >>> > > > cache
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > metadata /
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > offset topic to get
> > this
> > > > > > > > information
> > > > > > > > > > > > > already?
> > > > > > > > > > > > > > We
> > > > > > > > > > > > > > > > >>> just
> > > > > > > > > > > > > > > > >>> > need
> > > > > > > > > > > > > > > > >>> > > > to
> > > > > > > > > > > > > > > > >>> > > > > >> think
> > > > > > > > > > > > > > > > >>> > > > > >> > > > about
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > how to make these
> two
> > > > > modules
> > > > > > > > > directly
> > > > > > > > > > > > > > exchange
> > > > > > > > > > > > > > > > >>> > > information
> > > > > > > > > > > > > > > > >>> > > > > >> without
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > messing
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > up the code
> hierarchy.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > These two coordinators
> > > will
> > > > be
> > > > > > on
> > > > > > > > the
> > > > > > > > > > same
> > > > > > > > > > > > > > broker
> > > > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > >>> > when
> > > > > > > > > > > > > > > > >>> > > > > >> number of
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > partitions for
> > transaction
> > > > > state
> > > > > > > > topic
> > > > > > > > > > and
> > > > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > >>> offset
> > > > > > > > > > > > > > > > >>> > > > topic
> > > > > > > > > > > > > > > > >>> > > > > >> are
> > > > > > > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > same. This normally
> > holds
> > > > > true,
> > > > > > > but
> > > > > > > > > I'm
> > > > > > > > > > > > afraid
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > we couldn't make this
> > > > > > assumption?
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > 4. The config of
> > > > > > > > > > > > > > > "CONSUMER_GROUP_AWARE_TRANSACTION":
> > > > > > > > > > > > > > > > >>> it
> > > > > > > > > > > > > > > > >>> > > seems
> > > > > > > > > > > > > > > > >>> > > > the
> > > > > > > > > > > > > > > > >>> > > > > >> > goal
> > > > > > > > > > > > > > > > >>> > > > > >> > > of
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > this config is just
> to
> > > > avoid
> > > > > > > > > > > old-versioned
> > > > > > > > > > > > > > > broker
> > > > > > > > > > > > > > > > >>> to not
> > > > > > > > > > > > > > > > >>> > > be
> > > > > > > > > > > > > > > > >>> > > > > >> able to
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > recognize newer
> > > versioned
> > > > > > > client.
> > > > > > > > I
> > > > > > > > > > > think
> > > > > > > > > > > > if
> > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > >>> do
> > > > > > > > > > > > > > > > >>> > > > something
> > > > > > > > > > > > > > > > >>> > > > > >> > else
> > > > > > > > > > > > > > > > >>> > > > > >> > > > to
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > avoid this config
> > > though,
> > > > > for
> > > > > > > > > example
> > > > > > > > > > we
> > > > > > > > > > > > can
> > > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >>> > > > embedded
> > > > > > > > > > > > > > > > >>> > > > > >> > > > AdminClient
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > to send the
> APIVersion
> > > > > request
> > > > > > > > upon
> > > > > > > > > > > > starting
> > > > > > > > > > > > > > up,
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > >>> > based
> > > > > > > > > > > > > > > > >>> > > > on
> > > > > > > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > returned
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > value decides
> whether
> > to
> > > > go
> > > > > to
> > > > > > > the
> > > > > > > > > old
> > > > > > > > > > > > code
> > > > > > > > > > > > > > path
> > > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > >>> the
> > > > > > > > > > > > > > > > >>> > > new
> > > > > > > > > > > > > > > > >>> > > > > >> > behavior.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > Admittedly asking a
> > > random
> > > > > > > broker
> > > > > > > > > > about
> > > > > > > > > > > > > > > APIVersion
> > > > > > > > > > > > > > > > >>> does
> > > > > > > > > > > > > > > > >>> > > not
> > > > > > > > > > > > > > > > >>> > > > > >> > guarantee
> > > > > > > > > > > > > > > > >>> > > > > >> > > > the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > whole cluster's
> > > versions,
> > > > > but
> > > > > > > what
> > > > > > > > > we
> > > > > > > > > > > can
> > > > > > > > > > > > do
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >>> first
> > > > > > > > > > > > > > > > >>> > > 1)
> > > > > > > > > > > > > > > > >>> > > > find
> > > > > > > > > > > > > > > > >>> > > > > >> > the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > coordinator (and if
> > the
> > > > > random
> > > > > > > > > broker
> > > > > > > > > > > does
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > even
> > > > > > > > > > > > > > > > >>> > > > recognize
> > > > > > > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > > > > > > >>> > > > > >> > new
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > discover type, fall
> > back
> > > > to
> > > > > > old
> > > > > > > > path
> > > > > > > > > > > > > > directly),
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > >>> then
> > > > > > > > > > > > > > > > >>> > > 2)
> > > > > > > > > > > > > > > > >>> > > > ask
> > > > > > > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > discovered
> coordinator
> > > > about
> > > > > > its
> > > > > > > > > > > supported
> > > > > > > > > > > > > > > > >>> APIVersion.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > The caveat here is
> that
> > we
> > > > > have
> > > > > > to
> > > > > > > > > make
> > > > > > > > > > > sure
> > > > > > > > > > > > > > both
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >>> > group
> > > > > > > > > > > > > > > > >>> > > > > >> > coordinator
> > > > > > > > > > > > > > > > >>> > > > > >> > > > and
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > transaction
> coordinator
> > > are
> > > > on
> > > > > > the
> > > > > > > > > > latest
> > > > > > > > > > > > > > version
> > > > > > > > > > > > > > > > >>> during
> > > > > > > > > > > > > > > > >>> > > init
> > > > > > > > > > > > > > > > >>> > > > > >> stage.
> > > > > > > > > > > > > > > > >>> > > > > >> > > This
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > is potentially doable
> as
> > > we
> > > > > only
> > > > > > > > need
> > > > > > > > > a
> > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > >>> group.id
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > to check that. In the
> > > > > meantime,
> > > > > > a
> > > > > > > > > > > hard-coded
> > > > > > > > > > > > > > > config
> > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > >>> > > still a
> > > > > > > > > > > > > > > > >>> > > > > >> > > favorable
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > backup in case the
> > server
> > > > has
> > > > > > > > > > downgraded,
> > > > > > > > > > > so
> > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > >>> want
> > > > > > > > > > > > > > > > >>> > > to
> > > > > > > > > > > > > > > > >>> > > > use
> > > > > > > > > > > > > > > > >>> > > > > >> a
> > > > > > > > > > > > > > > > >>> > > > > >> > new
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > version client without
> > > > > `consumer
> > > > > > > > > group`
> > > > > > > > > > > > > > > > transactional
> > > > > > > > > > > > > > > > >>> > > support.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > 5. This is a meta
> > > question:
> > > > > have
> > > > > > > you
> > > > > > > > > > > > > considered
> > > > > > > > > > > > > > > how
> > > > > > > > > > > > > > > > >>> this
> > > > > > > > > > > > > > > > >>> > can
> > > > > > > > > > > > > > > > >>> > > > be
> > > > > > > > > > > > > > > > >>> > > > > >> > applied
> > > > > > > > > > > > > > > > >>> > > > > >> > > > to
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > Kafka Connect as
> well?
> > > For
> > > > > > > > example,
> > > > > > > > > > for
> > > > > > > > > > > > > source
> > > > > > > > > > > > > > > > >>> > connectors,
> > > > > > > > > > > > > > > > >>> > > > the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > assignment
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > is not by
> > "partitions",
> > > > but
> > > > > by
> > > > > > > > some
> > > > > > > > > > > other
> > > > > > > > > > > > > sort
> > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > >>> > > > "resources"
> > > > > > > > > > > > > > > > >>> > > > > >> based
> > > > > > > > > > > > > > > > >>> > > > > >> > > on
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > source systems, how
> > > > KIP-447
> > > > > > > would
> > > > > > > > > > affect
> > > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > > > >>> > Connectors
> > > > > > > > > > > > > > > > >>> > > > that
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > implemented
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > EOS as well?
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > No, it's not currently
> > > > > included
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > > scope.
> > > > > > > > > > > > > > > Could
> > > > > > > > > > > > > > > > >>> you
> > > > > > > > > > > > > > > > >>> > > point
> > > > > > > > > > > > > > > > >>> > > > me
> > > > > > > > > > > > > > > > >>> > > > > >> to a
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > sample source
> connector
> > > who
> > > > > uses
> > > > > > > > EOS?
> > > > > > > > > > > Could
> > > > > > > > > > > > > > always
> > > > > > > > > > > > > > > > >>> > > piggy-back
> > > > > > > > > > > > > > > > >>> > > > into
> > > > > > > > > > > > > > > > >>> > > > > >> > the
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > TxnProducerIdentity
> > struct
> > > > > with
> > > > > > > more
> > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > such
> > > > > > > > > > > > > > > > >>> as
> > > > > > > > > > > > > > > > >>> > > > tasks. If
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > this is something to
> > > support
> > > > > in
> > > > > > > near
> > > > > > > > > > term,
> > > > > > > > > > > > an
> > > > > > > > > > > > > > > > abstract
> > > > > > > > > > > > > > > > >>> > type
> > > > > > > > > > > > > > > > >>> > > > called
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > "Resource" could be
> > > provided
> > > > > and
> > > > > > > let
> > > > > > > > > > topic
> > > > > > > > > > > > > > > partition
> > > > > > > > > > > > > > > > >>> and
> > > > > > > > > > > > > > > > >>> > > > connect
> > > > > > > > > > > > > > > > >>> > > > > >> task
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > implement it.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > Guozhang
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > On Sat, Jun 22, 2019
> > at
> > > > 8:40
> > > > > > PM
> > > > > > > > > Ismael
> > > > > > > > > > > > Juma
> > > > > > > > > > > > > <
> > > > > > > > > > > > > > > > >>> > > > ism...@juma.me.uk>
> > > > > > > > > > > > > > > > >>> > > > > >> > > wrote:
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > Hi Boyang,
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > Thanks for the
> KIP.
> > > It's
> > > > > > good
> > > > > > > > that
> > > > > > > > > > we
> > > > > > > > > > > > > > listed a
> > > > > > > > > > > > > > > > >>> number
> > > > > > > > > > > > > > > > >>> > of
> > > > > > > > > > > > > > > > >>> > > > > >> rejected
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > alternatives. It
> > would
> > > > be
> > > > > > > > helpful
> > > > > > > > > to
> > > > > > > > > > > > have
> > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > >>> > explanation
> > > > > > > > > > > > > > > > >>> > > > of
> > > > > > > > > > > > > > > > >>> > > > > >> why
> > > > > > > > > > > > > > > > >>> > > > > >> > > they
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > were
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > rejected.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > Ismael
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > On Sat, Jun 22,
> 2019
> > > at
> > > > > 8:31
> > > > > > > PM
> > > > > > > > > > Boyang
> > > > > > > > > > > > > Chen
> > > > > > > > > > > > > > <
> > > > > > > > > > > > > > > > >>> > > > > >> bche...@outlook.com
> > > > > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > wrote:
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > Hey all,
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > I would like to
> > > start
> > > > a
> > > > > > > > > discussion
> > > > > > > > > > > for
> > > > > > > > > > > > > > > > KIP-447:
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > this is a work
> > > > > originated
> > > > > > by
> > > > > > > > > Jason
> > > > > > > > > > > > > > Gustafson
> > > > > > > > > > > > > > > > >>> and we
> > > > > > > > > > > > > > > > >>> > > > would
> > > > > > > > > > > > > > > > >>> > > > > >> like
> > > > > > > > > > > > > > > > >>> > > > > >> > to
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > proceed
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > into discussion
> > > stage.
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > Let me know your
> > > > > thoughts,
> > > > > > > > > thanks!
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > > Boyang
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > --
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > > -- Guozhang
> > > > > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >> > --
> > > > > > > > > > > > > > > > >>> > > > > >> > -- Guozhang
> > > > > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >>> --
> > > > > > > > > > > > > > > > >>> -- Guozhang
> > > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > --
> > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > -- Guozhang
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to