Hi Boyang,

> We already persist member.id, instance.id and generation.id in the offset
topic, what extra fields we need to store?

Yeah, you're right. I was a little confused and thought this information
was needed by the transaction coordinator.

> This should be easily done on the stream side as we have
StreamsPartitionAssignor to reflect metadata changes upon #onAssignment(),
but non-stream user has to code the callback by hand, do you think the
convenience we sacrifice here worth the simplification benefit?

Either way, you need a reference to the consumer. I was mostly just
thinking it would be better to reduce the integration point to its minimum.
Have you thought through the implications of needing to keep around a
reference to the consumer in the producer? What if it gets closed? It seems
better not to have to think about these cases.

-Jason

On Tue, Aug 6, 2019 at 9:53 PM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Thank you for the suggestions Jason. And a side note for Guozhang, I
> updated the KIP to reflect the dependency on 447.
>
> On Tue, Aug 6, 2019 at 11:35 AM Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hi Boyang, thanks for the updates. I have a few more comments:
> >
> > 1. We are adding some new fields to TxnOffsetCommit to support
> group-based
> > fencing. Do we need these fields to be persisted in the offsets topic to
> > ensure that the fencing still works after a coordinator failover?
> >
> > We already persist member.id, instance.id and generation.id in the
> offset
> topic, what extra fields we need to store?
>
>
> > 2. Since you are proposing a new `groupMetadata` API, have you considered
> > whether we still need the `initTransactions` overload? Another way would
> be
> > to pass it through the `sendOffsetsToTransaction` API:
> >
> > void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>
> > offsets, GroupMetadata groupMetadata) throws
> > ProducerFencedException, IllegalGenerationException;
> >
> > This seems a little more consistent with the current API and avoids the
> > direct dependence on the Consumer in the producer.
> >
> > Note that although we avoid one dependency to consumer, producer needs to
> periodically update
> its group metadata, or in this case the caller of
> *sendOffsetsToTransaction(Map<TopicPartition,
> OffsetAndMetadata>*
> *offsets, GroupMetadata groupMetadata) *is responsible for getting the
> latest value of group metadata.
> This should be easily done on the stream side as we have
> StreamsPartitionAssignor to reflect metadata changes upon #onAssignment(),
> but non-stream user has to code the callback by hand, do you think the
> convenience we sacrifice here worth the simplification benefit?
>
>
> > 3. Can you clarify the behavior of the clients when the brokers do not
> > support the latest API versions? This is both for the new TxnOffsetCommit
> > and the OffsetFetch APIs. I guess the high level idea in streams is to
> > detect broker support before instantiating the producer and consumer. I
> > think that's reasonable, but we might need some approach for non-streams
> > use cases. One option I was considering is enforcing the latest version
> > through the new `sendOffsetsToTransaction` API. Basically when you use
> the
> > new API, we require support for the latest TxnOffsetCommit version. This
> > puts some burden on users, but it avoids breaking correctness assumptions
> > when the new APIs are in use. What do you think?
> >
> Yes, I think we haven't covered this case, so the plan is to crash the
> non-stream application when the job is using new sendOffsets API.
>
> >
> > -Jason
> >
> >
> >
> >
> > On Mon, Aug 5, 2019 at 6:06 PM Boyang Chen <reluctanthero...@gmail.com>
> > wrote:
> >
> > > Yep, Guozhang I think that would be best as passing in an entire
> consumer
> > > instance is indeed cumbersome.
> > >
> > > Just saw you updated KIP-429, I will follow-up to change 447 as well.
> > >
> > > Best,
> > > Boyang
> > >
> > > On Mon, Aug 5, 2019 at 3:18 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >
> > > > okay I think I understand your concerns about ConsumerGroupMetadata
> > now:
> > > if
> > > > we still want to only call initTxns once, then we should allow the
> > > whatever
> > > > passed-in parameter to reflect the latest value of generation id
> > whenever
> > > > sending the offset fetch request.
> > > >
> > > > Whereas the current ConsumerGroupMetadata is a static object.
> > > >
> > > > Maybe we can consider having an extended class of
> ConsumerGroupMetadata
> > > > whose values are updated from the consumer's rebalance callback?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Aug 5, 2019 at 9:26 AM Boyang Chen <
> reluctanthero...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Thank you Guozhang for the reply! I'm curious whether KIP-429 has
> > > > reflected
> > > > > the latest change on ConsumerGroupMetadata? Also regarding question
> > > one,
> > > > > the group metadata needs to be accessed via callback, does that
> mean
> > we
> > > > > need a separate producer API such like
> > > > > "producer.refreshMetadata(groupMetadata)" to be able to access it
> > > instead
> > > > > of passing in the consumer instance?
> > > > >
> > > > > Boyang
> > > > >
> > > > > On Fri, Aug 2, 2019 at 4:36 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks Boyang,
> > > > > >
> > > > > > I've made another pass on KIP-447 as well as
> > > > > > https://github.com/apache/kafka/pull/7078, and have some minor
> > > > comments
> > > > > > about the proposed API:
> > > > > >
> > > > > > 1. it seems instead of needing the whole KafkaConsumer object,
> > you'd
> > > > only
> > > > > > need the "ConsumerGroupMetadata", in that case can we just pass
> in
> > > that
> > > > > > object into the initTxns call?
> > > > > >
> > > > > > 2. the current trunk already has a public class named
> > > > > > (ConsumerGroupMetadata)
> > > > > > under o.a.k.clients.consumer created by KIP-429. If we want to
> just
> > > use
> > > > > > that then maybe it makes less sense to declare a base
> GroupMetadata
> > > as
> > > > we
> > > > > > are already leaking such information on the assignor anyways.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen <
> > > > reluctanthero...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Thank you Guozhang for the reply. We will consider the
> interface
> > > > change
> > > > > > > from 429 as a backup plan for 447.
> > > > > > >
> > > > > > > And bumping this thread for more discussion.
> > > > > > >
> > > > > > > On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang <
> > wangg...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen <
> > > > > > reluctanthero...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thank you Guozhang for the suggestion! I would normally
> > prefer
> > > > > > naming a
> > > > > > > > > flag corresponding to its functionality. Seems to me
> > > > > > `isolation_level`
> > > > > > > > > makes us another hop on information track.
> > > > > > > > >
> > > > > > > > > Fair enough, let's use a separate flag name then :)
> > > > > > > >
> > > > > > > >
> > > > > > > > > As for the generation.id exposure, I'm fine leveraging the
> > new
> > > > API
> > > > > > > from
> > > > > > > > > 429, but however is that design finalized yet, and whether
> > the
> > > > API
> > > > > > will
> > > > > > > > be
> > > > > > > > > added on the generic Consumer<K, V> interface?
> > > > > > > > >
> > > > > > > > > The current PartitionAssignor is inside `internals` package
> > and
> > > > in
> > > > > > > > KIP-429
> > > > > > > > we are going to create a new interface out of `internals` to
> > > really
> > > > > > make
> > > > > > > it
> > > > > > > > public APIs, and as part of that we are refactoring some of
> its
> > > > > method
> > > > > > > > signatures. I just feel some of the newly introduced classes
> > can
> > > be
> > > > > > > reused
> > > > > > > > in your KIP as well, i.e. just for code succinctness, but no
> > > > > semantical
> > > > > > > > indications.
> > > > > > > >
> > > > > > > >
> > > > > > > > > Boyang
> > > > > > > > >
> > > > > > > > > On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang <
> > > > wangg...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Boyang, thanks for the updated proposal!
> > > > > > > > > >
> > > > > > > > > > 3.a. As Jason mentioned, with EOS enabled we still need
> to
> > > > > augment
> > > > > > > the
> > > > > > > > > > offset fetch request with a boolean to indicate "give me
> an
> > > > > > retriable
> > > > > > > > > error
> > > > > > > > > > code if there's pending offset, rather than sending me
> the
> > > > > > committed
> > > > > > > > > offset
> > > > > > > > > > immediately". Personally I still feel it is okay to
> > > piggy-back
> > > > on
> > > > > > the
> > > > > > > > > > ISOLATION_LEVEL boolean, but I'm also fine with another
> > > > > > > > > `await_transaction`
> > > > > > > > > > boolean if you feel strongly about it.
> > > > > > > > > >
> > > > > > > > > > 10. About the exposure of generation id, there may be
> some
> > > > > > > refactoring
> > > > > > > > > work
> > > > > > > > > > coming from KIP-429 that can benefit KIP-447 as well
> since
> > we
> > > > are
> > > > > > > > > wrapping
> > > > > > > > > > the consumer subscription / assignment data in new
> classes.
> > > > Note
> > > > > > that
> > > > > > > > > > current proposal does not `generationId` since with the
> > > > > cooperative
> > > > > > > > > sticky
> > > > > > > > > > assignor we think it is not necessary for correctness,
> but
> > > also
> > > > > if
> > > > > > we
> > > > > > > > > agree
> > > > > > > > > > it is okay to expose it we can potentially include it in
> > > > > > > > > > `ConsumerAssignmentData` as well.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Guozhang
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen <
> > > > > > > > reluctanthero...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thank you Jason for the ideas.
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson <
> > > > > > > ja...@confluent.io>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Boyang,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the updates. A few comments below:
> > > > > > > > > > > >
> > > > > > > > > > > > 1. The KIP mentions that `transaction.timeout.ms`
> > should
> > > > be
> > > > > > > > reduced
> > > > > > > > > to
> > > > > > > > > > > > 10s.
> > > > > > > > > > > > I think this makes sense for Kafka Streams which is
> > tied
> > > to
> > > > > the
> > > > > > > > > > consumer
> > > > > > > > > > > > group semantics and uses a default 10s session
> timeout.
> > > > > > However,
> > > > > > > it
> > > > > > > > > > > seems a
> > > > > > > > > > > > bit dangerous to make this change for the producer
> > > > generally.
> > > > > > > Could
> > > > > > > > > we
> > > > > > > > > > > just
> > > > > > > > > > > > change it for streams?
> > > > > > > > > > > >
> > > > > > > > > > > > That sounds good to me.
> > > > > > > > > > >
> > > > > > > > > > > > 2. The new `initTransactions` API takes a `Consumer`
> > > > > instance.
> > > > > > I
> > > > > > > > > think
> > > > > > > > > > > the
> > > > > > > > > > > > idea is to basically put in a backdoor to give the
> > > producer
> > > > > > > access
> > > > > > > > to
> > > > > > > > > > the
> > > > > > > > > > > > group generationId. It's not clear to me how this
> would
> > > > work
> > > > > > > given
> > > > > > > > > > > package
> > > > > > > > > > > > restrictions. I wonder if it would be better to just
> > > expose
> > > > > the
> > > > > > > > state
> > > > > > > > > > we
> > > > > > > > > > > > need from the consumer. I know we have been reluctant
> > to
> > > do
> > > > > > this
> > > > > > > so
> > > > > > > > > far
> > > > > > > > > > > > because we treat the generationId as an
> implementation
> > > > > detail.
> > > > > > > > > > However, I
> > > > > > > > > > > > think we might just bite the bullet and expose it
> > rather
> > > > than
> > > > > > > > coming
> > > > > > > > > up
> > > > > > > > > > > > with a messy hack. Concepts such as memberIds have
> > > already
> > > > > been
> > > > > > > > > exposed
> > > > > > > > > > > in
> > > > > > > > > > > > the AdminClient, so maybe it is not too bad.
> > > Alternatively,
> > > > > we
> > > > > > > > could
> > > > > > > > > > use
> > > > > > > > > > > an
> > > > > > > > > > > > opaque type. For example:
> > > > > > > > > > > >
> > > > > > > > > > > > // public
> > > > > > > > > > > > interface GroupMetadata {}
> > > > > > > > > > > >
> > > > > > > > > > > > // private
> > > > > > > > > > > > interface ConsumerGroupMetadata {
> > > > > > > > > > > >   final int generationId;
> > > > > > > > > > > >   final String memberId;
> > > > > > > > > > > > }
> > > > > > > > > > > >
> > > > > > > > > > > > // Consumer API
> > > > > > > > > > > > public GroupMetadata groupMetadata();
> > > > > > > > > > > >
> > > > > > > > > > > > I am probably leaning toward just exposing the state
> we
> > > > need.
> > > > > > > > > > > >
> > > > > > > > > > > > Yes, also to mention that Kafka Streams use generic
> > > > Cosnumer
> > > > > > API
> > > > > > > > > which
> > > > > > > > > > > doesn't have rich
> > > > > > > > > > > states like a full `KafkaConsumer`. The hack will not
> > work
> > > as
> > > > > > > > expected.
> > > > > > > > > > >
> > > > > > > > > > > Instead, just exposing the consumer generation.id
> seems
> > a
> > > > way
> > > > > > > easier
> > > > > > > > > > work.
> > > > > > > > > > > We could consolidate
> > > > > > > > > > > the API and make it
> > > > > > > > > > >
> > > > > > > > > > > 3. Given that we are already providing a way to
> propagate
> > > > group
> > > > > > > state
> > > > > > > > > > from
> > > > > > > > > > > > the consumer to the producer, I wonder if we may as
> > well
> > > > > > include
> > > > > > > > the
> > > > > > > > > > > > memberId and groupInstanceId. This would make the
> > > > validation
> > > > > we
> > > > > > > do
> > > > > > > > > for
> > > > > > > > > > > > TxnOffsetCommit consistent with OffsetCommit. If for
> no
> > > > other
> > > > > > > > > benefit,
> > > > > > > > > > at
> > > > > > > > > > > > least this may help with debugging.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Yes, we could put them into the GroupMetadata struct.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > 4. I like the addition of isolation_level to the
> offset
> > > > > fetch.
> > > > > > At
> > > > > > > > the
> > > > > > > > > > > same
> > > > > > > > > > > > time, its behavior is a bit inconsistent with how it
> is
> > > > used
> > > > > in
> > > > > > > the
> > > > > > > > > > > > consumer generally. There is no reason for the group
> > > > > > coordinator
> > > > > > > to
> > > > > > > > > > ever
> > > > > > > > > > > > expose aborted data, so this is mostly about awaiting
> > > > pending
> > > > > > > > offset
> > > > > > > > > > > > commits, not reading uncommitted data. Perhaps
> instead
> > of
> > > > > > calling
> > > > > > > > > this
> > > > > > > > > > > > "isolation level," it should be more like
> > > > > > > > "await_pending_transaction"
> > > > > > > > > > or
> > > > > > > > > > > > something like that?
> > > > > > > > > > > >
> > > > > > > > > > > > Also, just to be clear, the consumer would treat this
> > as
> > > an
> > > > > > > > optional
> > > > > > > > > > > field,
> > > > > > > > > > > > right? So if the broker does not support the latest
> > > > > OffsetFetch
> > > > > > > > API,
> > > > > > > > > it
> > > > > > > > > > > > would silently revert to reading the old data.
> > Basically
> > > it
> > > > > > would
> > > > > > > > be
> > > > > > > > > up
> > > > > > > > > > > to
> > > > > > > > > > > > the streams version probing logic to ensure that the
> > > > > > expectation
> > > > > > > on
> > > > > > > > > > this
> > > > > > > > > > > > API fits with the usage of `transctional.id`.
> > > > > > > > > > > >
> > > > > > > > > > > > Sounds like a better naming to me, while I think it
> > could
> > > > be
> > > > > > > > > shortened
> > > > > > > > > > to
> > > > > > > > > > > `await_transaction`.
> > > > > > > > > > > I think the field should be optional, too.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Jason
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen <
> > > > > > > > > reluctanthero...@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hey Guozhang,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I will correct my statement from last email. I
> don't
> > > > think
> > > > > > the
> > > > > > > > > > > > > read_committed (3.a) is necessary to be added to
> the
> > > > > > > OffsetFetch
> > > > > > > > > > > request,
> > > > > > > > > > > > > as if we are using EOS application, the underlying
> > > > > consumers
> > > > > > > > within
> > > > > > > > > > the
> > > > > > > > > > > > > group should always back off when there is pending
> > > > offsets.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Let me know if you think this is correct.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen <
> > > > > > > > > > reluctanthero...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thank you Guozhang for the questions, inline
> > answers
> > > > are
> > > > > > > below.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen <
> > > > > > > > > > > reluctanthero...@gmail.com
> > > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> Hey all,
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> I have done a fundamental polish of KIP-447
> > > > > > > > > > > > > >> <
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > > > > > > > > > > >
> > > > > > > > > > > > > and
> > > > > > > > > > > > > >> written a design doc
> > > > > > > > > > > > > >> <
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit#
> > > > > > > > > > > > >
> > > > > > > > > > > > > depicting
> > > > > > > > > > > > > >> internal changes. We stripped off many
> > > implementation
> > > > > > > details
> > > > > > > > > from
> > > > > > > > > > > the
> > > > > > > > > > > > > KIP,
> > > > > > > > > > > > > >> and simplified the public changes by a lot. For
> > > > > reviewers,
> > > > > > > it
> > > > > > > > is
> > > > > > > > > > > > highly
> > > > > > > > > > > > > >> recommended to fully understand EOS design in
> > KIP-98
> > > > and
> > > > > > > read
> > > > > > > > > its
> > > > > > > > > > > > > >> corresponding design doc
> > > > > > > > > > > > > >> <
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit
> > > > > > > > > > > > >
> > > > > > > > > > > > > if
> > > > > > > > > > > > > >> you haven't done so already.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Let me know if you found anything confusing
> around
> > > the
> > > > > KIP
> > > > > > > or
> > > > > > > > > the
> > > > > > > > > > > > > design.
> > > > > > > > > > > > > >> Would be happy to discuss in depth.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Best,
> > > > > > > > > > > > > >> Boyang
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang <
> > > > > > > > > > wangg...@gmail.com>
> > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>> 2. The reason we did not expose generation.id
> > from
> > > > > > > > > KafkaConsumer
> > > > > > > > > > > > > public
> > > > > > > > > > > > > >>> APIs directly is to abstract this notion from
> > users
> > > > > > (since
> > > > > > > it
> > > > > > > > > is
> > > > > > > > > > an
> > > > > > > > > > > > > >>> implementation detail of the rebalance protocol
> > > > itself,
> > > > > > > e.g.
> > > > > > > > if
> > > > > > > > > > > user
> > > > > > > > > > > > > >>> calls
> > > > > > > > > > > > > >>> consumer.assign() they do not need to invoke
> > > > > > > > > ConsumerCoordinator
> > > > > > > > > > > and
> > > > > > > > > > > > no
> > > > > > > > > > > > > >>> need to be aware of generation.id at all).
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> On the other hand, with the current proposal
> the
> > > > > > > > > txn.coordiantor
> > > > > > > > > > > did
> > > > > > > > > > > > > not
> > > > > > > > > > > > > >>> know about the latest generation from the
> > > > > source-of-truth
> > > > > > > > > > > > > >>> group.coordinator; instead, it will only bump
> up
> > > the
> > > > > > > > generation
> > > > > > > > > > > from
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >>> producer's InitProducerIdRequest only.
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> The key here is that GroupCoordinator, when
> > > handling
> > > > > > > > > > > > > >>> `InitProducerIdRequest
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >> In the new design, we just pass the entire
> > consumer
> > > > > > instance
> > > > > > > > > into
> > > > > > > > > > > the
> > > > > > > > > > > > > > producer through
> > > > > > > > > > > > > > #initTransaction, so no public API will be
> created.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> 3. I agree that if we rely on the group
> > coordinator
> > > to
> > > > > > block
> > > > > > > > on
> > > > > > > > > > > > > returning
> > > > > > > > > > > > > >>> offset-fetch-response if read-committed is
> > enabled,
> > > > > then
> > > > > > we
> > > > > > > > do
> > > > > > > > > > not
> > > > > > > > > > > > need
> > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > >>> store partition assignment on txn coordinator
> and
> > > > > > therefore
> > > > > > > > > it's
> > > > > > > > > > > > better
> > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > >>> still decouple them. For that case we still
> need
> > to
> > > > > > update
> > > > > > > > the
> > > > > > > > > > KIP
> > > > > > > > > > > > wiki
> > > > > > > > > > > > > >>> page that includes:
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> 3.a. Augment OffsetFetchRequest with the
> > > > > ISOLATION_LEVEL
> > > > > > as
> > > > > > > > > well.
> > > > > > > > > > > > > >>> 3.b. Add new error code in OffsetFetchResponse
> to
> > > let
> > > > > > > client
> > > > > > > > > > > backoff
> > > > > > > > > > > > > and
> > > > > > > > > > > > > >>> retry if there are pending txns including the
> > > > > interested
> > > > > > > > > > > partitions.
> > > > > > > > > > > > > >>> 3.c. Also in the worst case we would let the
> > client
> > > > be
> > > > > > > > blocked
> > > > > > > > > > for
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>> txn.timeout period, and for that rationale we
> may
> > > > need
> > > > > to
> > > > > > > > > > consider
> > > > > > > > > > > > > >>> reducing
> > > > > > > > > > > > > >>> our default txn.timeout value as well.
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> Addressed 3.b and 3.c, will do 3.a.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> 4. According to Colin it seems we do not need to
> > > > create
> > > > > > > > another
> > > > > > > > > > KIP
> > > > > > > > > > > > and
> > > > > > > > > > > > > we
> > > > > > > > > > > > > >>> can just complete it as part of KIP-117 /
> > > KAFKA-5214;
> > > > > and
> > > > > > > we
> > > > > > > > > need
> > > > > > > > > > > to
> > > > > > > > > > > > do
> > > > > > > > > > > > > >>> some cleanup to have BrokerApiVersion exposed
> > from
> > > > > > > > AdminClient
> > > > > > > > > > > > (@Colin
> > > > > > > > > > > > > >>> please let use know if you have any concerns
> > > exposing
> > > > > > it).
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >> I think we no longer need to rely on api version
> > for
> > > > > > > > > > initialization,
> > > > > > > > > > > > > > since we will be using the upgrade.from config
> > > anyway.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>> Guozhang
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> On Tue, Jun 25, 2019 at 6:43 PM Jason
> Gustafson <
> > > > > > > > > > > ja...@confluent.io>
> > > > > > > > > > > > > >>> wrote:
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> > For reference, we have
> BrokerApiVersionCommand
> > > > > already
> > > > > > > as a
> > > > > > > > > > > public
> > > > > > > > > > > > > >>> > interface. We have a bit of tech debt at the
> > > moment
> > > > > > > because
> > > > > > > > > it
> > > > > > > > > > > > uses a
> > > > > > > > > > > > > >>> > custom AdminClient. It would be nice to clean
> > > that
> > > > > up.
> > > > > > In
> > > > > > > > > > > general,
> > > > > > > > > > > > I
> > > > > > > > > > > > > >>> think
> > > > > > > > > > > > > >>> > it is reasonable to expose from AdminClient.
> It
> > > can
> > > > > be
> > > > > > > used
> > > > > > > > > by
> > > > > > > > > > > > > >>> management
> > > > > > > > > > > > > >>> > tools to inspect running Kafka versions for
> > > > example.
> > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > >>> > -Jason
> > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > >>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen <
> > > > > > > > > > > > > >>> reluctanthero...@gmail.com>
> > > > > > > > > > > > > >>> > wrote:
> > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > >>> > > Thank you for the context Colin. The
> groupId
> > > was
> > > > > > > indeed a
> > > > > > > > > > > > > copy-paste
> > > > > > > > > > > > > >>> > error.
> > > > > > > > > > > > > >>> > > Our use case here for 447 is (Quoted from
> > > > > Guozhang):
> > > > > > > > > > > > > >>> > > '''
> > > > > > > > > > > > > >>> > > I think if we can do something else to
> > > > > > > > > > > > > >>> > > avoid this config though, for example we
> can
> > > use
> > > > > the
> > > > > > > > > embedded
> > > > > > > > > > > > > >>> AdminClient
> > > > > > > > > > > > > >>> > > to send the APIVersion request upon
> starting
> > > up,
> > > > > and
> > > > > > > > based
> > > > > > > > > on
> > > > > > > > > > > the
> > > > > > > > > > > > > >>> > returned
> > > > > > > > > > > > > >>> > > value decides whether to go to the old code
> > > path
> > > > or
> > > > > > the
> > > > > > > > new
> > > > > > > > > > > > > behavior.
> > > > > > > > > > > > > >>> > > '''
> > > > > > > > > > > > > >>> > > The benefit we get is to avoid adding a new
> > > > > > > configuration
> > > > > > > > > to
> > > > > > > > > > > > make a
> > > > > > > > > > > > > >>> > > decision simply base on broker version. If
> > you
> > > > have
> > > > > > > > > concerns
> > > > > > > > > > > with
> > > > > > > > > > > > > >>> > exposing
> > > > > > > > > > > > > >>> > > ApiVersion for client, we could
> > > > > > > > > > > > > >>> > > try to think of alternative solutions too.
> > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > >>> > > Boyang
> > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > >>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin
> McCabe
> > <
> > > > > > > > > > > cmcc...@apache.org
> > > > > > > > > > > > >
> > > > > > > > > > > > > >>> wrote:
> > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > >>> > > > kafka.api.ApiVersion is an internal
> class,
> > > not
> > > > > > > suitable
> > > > > > > > > to
> > > > > > > > > > > > > exposing
> > > > > > > > > > > > > >>> > > > through AdminClient.  That class is not
> > even
> > > > > > > accessible
> > > > > > > > > > > without
> > > > > > > > > > > > > >>> having
> > > > > > > > > > > > > >>> > > the
> > > > > > > > > > > > > >>> > > > broker jars on your CLASSPATH.
> > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > >>> > > > Another question is, what is the groupId
> > > > > parameter
> > > > > > > > doing
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > > > >>> call?
> > > > > > > > > > > > > >>> > > The
> > > > > > > > > > > > > >>> > > > API versions are the same no matter what
> > > > consumer
> > > > > > > group
> > > > > > > > > we
> > > > > > > > > > > use,
> > > > > > > > > > > > > >>> right?
> > > > > > > > > > > > > >>> > > > Perhaps this was a copy and paste error?
> > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > >>> > > > This is not the first time we have
> > discussed
> > > > > > having a
> > > > > > > > > > method
> > > > > > > > > > > in
> > > > > > > > > > > > > >>> > > > AdminClient to retrieve API version
> > > > information.
> > > > > > In
> > > > > > > > > fact,
> > > > > > > > > > > the
> > > > > > > > > > > > > >>> original
> > > > > > > > > > > > > >>> > > KIP
> > > > > > > > > > > > > >>> > > > which created KafkaAdminClient specified
> an
> > > API
> > > > > for
> > > > > > > > > > fetching
> > > > > > > > > > > > > >>> version
> > > > > > > > > > > > > >>> > > > information.  It was called apiVersions
> and
> > > it
> > > > is
> > > > > > > still
> > > > > > > > > > there
> > > > > > > > > > > > on
> > > > > > > > > > > > > >>> the
> > > > > > > > > > > > > >>> > > wiki.
> > > > > > > > > > > > > >>> > > > See
> > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > >>> > > > However, this API wasn't ready in time
> for
> > > > 0.11.0
> > > > > > so
> > > > > > > we
> > > > > > > > > > > shipped
> > > > > > > > > > > > > >>> without
> > > > > > > > > > > > > >>> > > > it.  There was a JIRA to implement it for
> > > later
> > > > > > > > versions,
> > > > > > > > > > > > > >>> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-5214
> > > > > ,
> > > > > > > as
> > > > > > > > > well
> > > > > > > > > > > as
> > > > > > > > > > > > a
> > > > > > > > > > > > > >>> PR,
> > > > > > > > > > > > > >>> > > >
> https://github.com/apache/kafka/pull/3012
> > .
> > > > > > > However,
> > > > > > > > we
> > > > > > > > > > > > started
> > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > >>> > > > rethink whether this AdminClient function
> > was
> > > > > even
> > > > > > > > > > necessary.
> > > > > > > > > > > > > >>> Most of
> > > > > > > > > > > > > >>> > > the
> > > > > > > > > > > > > >>> > > > use-cases we could think of seemed like
> > > > horrible
> > > > > > > hacks.
> > > > > > > > > So
> > > > > > > > > > > it
> > > > > > > > > > > > > has
> > > > > > > > > > > > > >>> > never
> > > > > > > > > > > > > >>> > > > really been implemented (yet?).
> > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > >>> > > > best,
> > > > > > > > > > > > > >>> > > > Colin
> > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > >>> > > > On Tue, Jun 25, 2019, at 15:46, Boyang
> Chen
> > > > > wrote:
> > > > > > > > > > > > > >>> > > > > Actually, after a second thought, I
> think
> > > it
> > > > > > > actually
> > > > > > > > > > makes
> > > > > > > > > > > > > >>> sense to
> > > > > > > > > > > > > >>> > > > > support auto upgrade through admin
> client
> > > to
> > > > > help
> > > > > > > use
> > > > > > > > > get
> > > > > > > > > > > api
> > > > > > > > > > > > > >>> version
> > > > > > > > > > > > > >>> > > > > from
> > > > > > > > > > > > > >>> > > > > broker.
> > > > > > > > > > > > > >>> > > > > A draft KIP is here:
> > > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> > > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > > >>> > > > > Boyang
> > > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > > >>> > > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang
> > > Chen <
> > > > > > > > > > > > > >>> > > reluctanthero...@gmail.com>
> > > > > > > > > > > > > >>> > > > > wrote:
> > > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > > >>> > > > > > Thank you Guozhang, some of my
> > > > understandings
> > > > > > are
> > > > > > > > > > inline
> > > > > > > > > > > > > below.
> > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > >>> > > > > > On Tue, Jun 25, 2019 at 11:05 AM
> Jason
> > > > > > Gustafson
> > > > > > > <
> > > > > > > > > > > > > >>> > ja...@confluent.io
> > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > >>> > > > > > wrote:
> > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >> > I think co-locating does have some
> > > > merits
> > > > > > > here,
> > > > > > > > > i.e.
> > > > > > > > > > > > > >>> letting the
> > > > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator which has the
> > > > > > > > source-of-truth
> > > > > > > > > of
> > > > > > > > > > > > > >>> assignment
> > > > > > > > > > > > > >>> > to
> > > > > > > > > > > > > >>> > > > act
> > > > > > > > > > > > > >>> > > > > >> as
> > > > > > > > > > > > > >>> > > > > >> > the TxnCoordinator as well; but I
> > > agree
> > > > > > > there's
> > > > > > > > > also
> > > > > > > > > > > > some
> > > > > > > > > > > > > >>> cons
> > > > > > > > > > > > > >>> > of
> > > > > > > > > > > > > >>> > > > > >> coupling
> > > > > > > > > > > > > >>> > > > > >> > them together. I'm still a bit
> > > inclining
> > > > > > > towards
> > > > > > > > > > > > > colocation
> > > > > > > > > > > > > >>> but
> > > > > > > > > > > > > >>> > if
> > > > > > > > > > > > > >>> > > > there
> > > > > > > > > > > > > >>> > > > > >> > are good rationales not to do so I
> > can
> > > > be
> > > > > > > > > convinced
> > > > > > > > > > as
> > > > > > > > > > > > > well.
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > > > >> The good rationale is that we have
> no
> > > > > > mechanism
> > > > > > > to
> > > > > > > > > > > > colocate
> > > > > > > > > > > > > >>> > > > partitions ;).
> > > > > > > > > > > > > >>> > > > > >> Are you suggesting we store the
> group
> > > and
> > > > > > > > > transaction
> > > > > > > > > > > > state
> > > > > > > > > > > > > >>> in the
> > > > > > > > > > > > > >>> > > > same
> > > > > > > > > > > > > >>> > > > > >> log? Can you be more concrete about
> > the
> > > > > > benefit?
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > > > >> -Jason
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > > > >> On Tue, Jun 25, 2019 at 10:51 AM
> > > Guozhang
> > > > > > Wang <
> > > > > > > > > > > > > >>> > wangg...@gmail.com>
> > > > > > > > > > > > > >>> > > > > >> wrote:
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > > > >> > Hi Boyang,
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >> > 1. One advantage of retry against
> > > > on-hold
> > > > > is
> > > > > > > > that
> > > > > > > > > it
> > > > > > > > > > > > will
> > > > > > > > > > > > > >>> not
> > > > > > > > > > > > > >>> > > > tie-up a
> > > > > > > > > > > > > >>> > > > > >> > handler thread (of course the
> latter
> > > > could
> > > > > > do
> > > > > > > > the
> > > > > > > > > > same
> > > > > > > > > > > > but
> > > > > > > > > > > > > >>> that
> > > > > > > > > > > > > >>> > > > involves
> > > > > > > > > > > > > >>> > > > > >> > using a purgatory which is more
> > > > > > complicated),
> > > > > > > > and
> > > > > > > > > > also
> > > > > > > > > > > > it
> > > > > > > > > > > > > is
> > > > > > > > > > > > > >>> > less
> > > > > > > > > > > > > >>> > > > > >> likely to
> > > > > > > > > > > > > >>> > > > > >> > violate request timeout. So I
> think
> > > > there
> > > > > > are
> > > > > > > > some
> > > > > > > > > > > > > >>> rationales to
> > > > > > > > > > > > > >>> > > > prefer
> > > > > > > > > > > > > >>> > > > > >> > retries.
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > > > >  That sounds fair to me, also we are
> > > > avoiding
> > > > > > > usage
> > > > > > > > > of
> > > > > > > > > > > > > another
> > > > > > > > > > > > > >>> > > > purgatory
> > > > > > > > > > > > > >>> > > > > > instance. Usually for one back-off
> > > > > > > > > > > > > >>> > > > > > we are only delaying 50ms during
> > startup
> > > > > which
> > > > > > is
> > > > > > > > > > trivial
> > > > > > > > > > > > > cost.
> > > > > > > > > > > > > >>> > This
> > > > > > > > > > > > > >>> > > > > > behavior shouldn't be changed.
> > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > >>> > > > > > > 2. Regarding
> > > "ConsumerRebalanceListener":
> > > > > > both
> > > > > > > > > > > > > >>> > > > ConsumerRebalanceListener
> > > > > > > > > > > > > >>> > > > > >> > and PartitionAssignors are
> > > > > user-customizable
> > > > > > > > > > modules,
> > > > > > > > > > > > and
> > > > > > > > > > > > > >>> only
> > > > > > > > > > > > > >>> > > > > >> difference
> > > > > > > > > > > > > >>> > > > > >> > is that the former is specified
> via
> > > code
> > > > > and
> > > > > > > the
> > > > > > > > > > > latter
> > > > > > > > > > > > is
> > > > > > > > > > > > > >>> > > > specified via
> > > > > > > > > > > > > >>> > > > > >> > config.
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >> > Regarding Jason's proposal of
> > > > > > > > ConsumerAssignment,
> > > > > > > > > > one
> > > > > > > > > > > > > thing
> > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > >>> > > note
> > > > > > > > > > > > > >>> > > > > >> though
> > > > > > > > > > > > > >>> > > > > >> > with KIP-429 the
> onPartitionAssigned
> > > may
> > > > > not
> > > > > > > be
> > > > > > > > > > called
> > > > > > > > > > > > if
> > > > > > > > > > > > > >>> the
> > > > > > > > > > > > > >>> > > > assignment
> > > > > > > > > > > > > >>> > > > > >> > does not change, whereas
> > onAssignment
> > > > > would
> > > > > > > > always
> > > > > > > > > > be
> > > > > > > > > > > > > >>> called at
> > > > > > > > > > > > > >>> > > the
> > > > > > > > > > > > > >>> > > > end
> > > > > > > > > > > > > >>> > > > > >> of
> > > > > > > > > > > > > >>> > > > > >> > sync-group response. My proposed
> > > > semantics
> > > > > > is
> > > > > > > > that
> > > > > > > > > > > > > >>> > > > > >> >
> `RebalanceListener#onPartitionsXXX`
> > > are
> > > > > used
> > > > > > > for
> > > > > > > > > > > > > >>> notifications
> > > > > > > > > > > > > >>> > to
> > > > > > > > > > > > > >>> > > > user,
> > > > > > > > > > > > > >>> > > > > >> and
> > > > > > > > > > > > > >>> > > > > >> > hence if there's no changes these
> > will
> > > > not
> > > > > > be
> > > > > > > > > > called,
> > > > > > > > > > > > > >>> whereas
> > > > > > > > > > > > > >>> > > > > >> > `PartitionAssignor` is used for
> > > assignor
> > > > > > > logic,
> > > > > > > > > > whose
> > > > > > > > > > > > > >>> callback
> > > > > > > > > > > > > >>> > > would
> > > > > > > > > > > > > >>> > > > > >> always
> > > > > > > > > > > > > >>> > > > > >> > be called no matter if the
> > partitions
> > > > have
> > > > > > > > changed
> > > > > > > > > > or
> > > > > > > > > > > > not.
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > > > >> I think a third option is to
> > gracefully
> > > > > expose
> > > > > > > > > > > generation
> > > > > > > > > > > > id
> > > > > > > > > > > > > >>> as
> > > > > > > > > > > > > >>> > part
> > > > > > > > > > > > > >>> > > > of
> > > > > > > > > > > > > >>> > > > > > consumer API, so that we don't need
> to
> > > > > > > > > > > > > >>> > > > > > bother overloading various callbacks.
> > Of
> > > > > > course,
> > > > > > > > this
> > > > > > > > > > > > builds
> > > > > > > > > > > > > >>> upon
> > > > > > > > > > > > > >>> > the
> > > > > > > > > > > > > >>> > > > > > assumption that topic partitions
> > > > > > > > > > > > > >>> > > > > > will not be included in new
> > > initTransaction
> > > > > > API.
> > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > >>> > > > > > > 3. I feel it is a bit awkward to
> let
> > > the
> > > > > > > > > > TxnCoordinator
> > > > > > > > > > > > > >>> keeping
> > > > > > > > > > > > > >>> > > > partition
> > > > > > > > > > > > > >>> > > > > >> > assignments since it is sort of
> > taking
> > > > > over
> > > > > > > the
> > > > > > > > > job
> > > > > > > > > > of
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator, and may
> likely
> > > > cause
> > > > > a
> > > > > > > > > > > split-brain
> > > > > > > > > > > > > >>> problem
> > > > > > > > > > > > > >>> > as
> > > > > > > > > > > > > >>> > > > two
> > > > > > > > > > > > > >>> > > > > >> > coordinators keep a copy of this
> > > > > assignment
> > > > > > > > which
> > > > > > > > > > may
> > > > > > > > > > > be
> > > > > > > > > > > > > >>> > > different.
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >> > I think co-locating does have some
> > > > merits
> > > > > > > here,
> > > > > > > > > i.e.
> > > > > > > > > > > > > >>> letting the
> > > > > > > > > > > > > >>> > > > > >> > ConsumerCoordinator which has the
> > > > > > > > source-of-truth
> > > > > > > > > of
> > > > > > > > > > > > > >>> assignment
> > > > > > > > > > > > > >>> > to
> > > > > > > > > > > > > >>> > > > act
> > > > > > > > > > > > > >>> > > > > >> as
> > > > > > > > > > > > > >>> > > > > >> > the TxnCoordinator as well; but I
> > > agree
> > > > > > > there's
> > > > > > > > > also
> > > > > > > > > > > > some
> > > > > > > > > > > > > >>> cons
> > > > > > > > > > > > > >>> > of
> > > > > > > > > > > > > >>> > > > > >> coupling
> > > > > > > > > > > > > >>> > > > > >> > them together. I'm still a bit
> > > inclining
> > > > > > > towards
> > > > > > > > > > > > > colocation
> > > > > > > > > > > > > >>> but
> > > > > > > > > > > > > >>> > if
> > > > > > > > > > > > > >>> > > > there
> > > > > > > > > > > > > >>> > > > > >> > are good rationales not to do so I
> > can
> > > > be
> > > > > > > > > convinced
> > > > > > > > > > as
> > > > > > > > > > > > > well.
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > > > > The purpose of co-location is to let
> > txn
> > > > > > > > coordinator
> > > > > > > > > > see
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>> group
> > > > > > > > > > > > > >>> > > > > > assignment. This priority is weakened
> > > > > > > > > > > > > >>> > > > > > when we already have defense on the
> > > > consumer
> > > > > > > offset
> > > > > > > > > > > fetch,
> > > > > > > > > > > > > so I
> > > > > > > > > > > > > >>> > guess
> > > > > > > > > > > > > >>> > > > it's
> > > > > > > > > > > > > >>> > > > > > not super important anymore.
> > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > 4. I guess I'm preferring the
> > > philosophy
> > > > > of
> > > > > > > > "only
> > > > > > > > > > add
> > > > > > > > > > > > > >>> configs if
> > > > > > > > > > > > > >>> > > > > >> there's no
> > > > > > > > > > > > > >>> > > > > >> > other ways", since more and more
> > > configs
> > > > > > would
> > > > > > > > > make
> > > > > > > > > > it
> > > > > > > > > > > > > less
> > > > > > > > > > > > > >>> and
> > > > > > > > > > > > > >>> > > less
> > > > > > > > > > > > > >>> > > > > >> > intuitive out of the box to use.
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >> > I think it's a valid point that
> > checks
> > > > > upon
> > > > > > > > > starting
> > > > > > > > > > > up
> > > > > > > > > > > > > >>> does not
> > > > > > > > > > > > > >>> > > > cope
> > > > > > > > > > > > > >>> > > > > >> with
> > > > > > > > > > > > > >>> > > > > >> > brokers downgrading but even with
> a
> > > > > config,
> > > > > > > but
> > > > > > > > it
> > > > > > > > > > is
> > > > > > > > > > > > > still
> > > > > > > > > > > > > >>> hard
> > > > > > > > > > > > > >>> > > for
> > > > > > > > > > > > > >>> > > > > >> users
> > > > > > > > > > > > > >>> > > > > >> > to determine when they can be
> > ensured
> > > > the
> > > > > > > broker
> > > > > > > > > > would
> > > > > > > > > > > > > never
> > > > > > > > > > > > > >>> > > > downgrade
> > > > > > > > > > > > > >>> > > > > >> > anymore and hence can safely
> switch
> > > the
> > > > > > > config.
> > > > > > > > So
> > > > > > > > > > my
> > > > > > > > > > > > > >>> feeling is
> > > > > > > > > > > > > >>> > > > that
> > > > > > > > > > > > > >>> > > > > >> this
> > > > > > > > > > > > > >>> > > > > >> > config would not be helping too
> much
> > > > > still.
> > > > > > If
> > > > > > > > we
> > > > > > > > > > want
> > > > > > > > > > > > to
> > > > > > > > > > > > > >>> be at
> > > > > > > > > > > > > >>> > > the
> > > > > > > > > > > > > >>> > > > > >> safer
> > > > > > > > > > > > > >>> > > > > >> > side, then I'd suggest we modify
> the
> > > > > > > Coordinator
> > > > > > > > > ->
> > > > > > > > > > > > > >>> > NetworkClient
> > > > > > > > > > > > > >>> > > > > >> hierarchy
> > > > > > > > > > > > > >>> > > > > >> > to allow the NetworkClient being
> > able
> > > to
> > > > > > pass
> > > > > > > > the
> > > > > > > > > > > > > APIVersion
> > > > > > > > > > > > > >>> > > > metadata to
> > > > > > > > > > > > > >>> > > > > >> > Coordinator, so that Coordinator
> can
> > > > rely
> > > > > on
> > > > > > > > that
> > > > > > > > > > > logic
> > > > > > > > > > > > to
> > > > > > > > > > > > > >>> > change
> > > > > > > > > > > > > >>> > > > its
> > > > > > > > > > > > > >>> > > > > >> > behavior dynamically.
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > > > > The stream thread init could not be
> > > > supported
> > > > > > by
> > > > > > > a
> > > > > > > > > > client
> > > > > > > > > > > > > >>> > coordinator
> > > > > > > > > > > > > >>> > > > > > behavior change on the fly,
> > > > > > > > > > > > > >>> > > > > > we are only losing possibilities
> after
> > we
> > > > > > > > > initialized.
> > > > > > > > > > > > (main
> > > > > > > > > > > > > >>> thread
> > > > > > > > > > > > > >>> > > > gets
> > > > > > > > > > > > > >>> > > > > > exit and no thread has global picture
> > > > > anymore)
> > > > > > > > > > > > > >>> > > > > > If we do want to support auto version
> > > > > > detection,
> > > > > > > > > admin
> > > > > > > > > > > > client
> > > > > > > > > > > > > >>> > request
> > > > > > > > > > > > > >>> > > > in
> > > > > > > > > > > > > >>> > > > > > this sense shall be easier.
> > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >> > 5. I do not have a concrete idea
> > about
> > > > how
> > > > > > the
> > > > > > > > > > impact
> > > > > > > > > > > on
> > > > > > > > > > > > > >>> Connect
> > > > > > > > > > > > > >>> > > > would
> > > > > > > > > > > > > >>> > > > > >> > make, maybe Randall or Konstantine
> > can
> > > > > help
> > > > > > > > here?
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > > > > Sounds good, let's see their
> thoughts.
> > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > Guozhang
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >> > On Mon, Jun 24, 2019 at 10:26 PM
> > > Boyang
> > > > > > Chen <
> > > > > > > > > > > > > >>> > > > > >> reluctanthero...@gmail.com>
> > > > > > > > > > > > > >>> > > > > >> > wrote:
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >> > > Hey Jason,
> > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > >>> > > > > >> > > thank you for the proposal here.
> > > Some
> > > > of
> > > > > > my
> > > > > > > > > > thoughts
> > > > > > > > > > > > > >>> below.
> > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 8:58 PM
> > > Jason
> > > > > > > > Gustafson
> > > > > > > > > <
> > > > > > > > > > > > > >>> > > > ja...@confluent.io>
> > > > > > > > > > > > > >>> > > > > >> > > wrote:
> > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > >>> > > > > >> > > > Hi Boyang,
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > > Thanks for picking this up!
> > Still
> > > > > > reading
> > > > > > > > > > through
> > > > > > > > > > > > the
> > > > > > > > > > > > > >>> > updates,
> > > > > > > > > > > > > >>> > > > but
> > > > > > > > > > > > > >>> > > > > >> here
> > > > > > > > > > > > > >>> > > > > >> > > are
> > > > > > > > > > > > > >>> > > > > >> > > > a couple initial comments on
> the
> > > > APIs:
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > > 1. The `TxnProducerIdentity`
> > class
> > > > is
> > > > > a
> > > > > > > bit
> > > > > > > > > > > > awkward. I
> > > > > > > > > > > > > >>> think
> > > > > > > > > > > > > >>> > > we
> > > > > > > > > > > > > >>> > > > are
> > > > > > > > > > > > > >>> > > > > >> > > trying
> > > > > > > > > > > > > >>> > > > > >> > > > to encapsulate state from the
> > > > current
> > > > > > > group
> > > > > > > > > > > > > assignment.
> > > > > > > > > > > > > >>> > Maybe
> > > > > > > > > > > > > >>> > > > > >> something
> > > > > > > > > > > > > >>> > > > > >> > > > like `ConsumerAssignment`
> would
> > be
> > > > > > > clearer?
> > > > > > > > If
> > > > > > > > > > we
> > > > > > > > > > > > make
> > > > > > > > > > > > > >>> the
> > > > > > > > > > > > > >>> > > usage
> > > > > > > > > > > > > >>> > > > > >> > > consistent
> > > > > > > > > > > > > >>> > > > > >> > > > across the consumer and
> > producer,
> > > > then
> > > > > > we
> > > > > > > > can
> > > > > > > > > > > avoid
> > > > > > > > > > > > > >>> exposing
> > > > > > > > > > > > > >>> > > > > >> internal
> > > > > > > > > > > > > >>> > > > > >> > > state
> > > > > > > > > > > > > >>> > > > > >> > > > like the generationId.
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > > For example:
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > > // Public API
> > > > > > > > > > > > > >>> > > > > >> > > > interface ConsumerAssignment {
> > > > > > > > > > > > > >>> > > > > >> > > >   Set<TopicPartition>
> > > partittions();
> > > > > > > > > > > > > >>> > > > > >> > > > }
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > > // Not a public API
> > > > > > > > > > > > > >>> > > > > >> > > > class
> InternalConsumerAssignment
> > > > > > > implements
> > > > > > > > > > > > > >>> > > ConsumerAssignment {
> > > > > > > > > > > > > >>> > > > > >> > > >   Set<TopicPartition>
> > partittions;
> > > > > > > > > > > > > >>> > > > > >> > > >   int generationId;
> > > > > > > > > > > > > >>> > > > > >> > > > }
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > > Then we can change the
> rebalance
> > > > > > listener
> > > > > > > to
> > > > > > > > > > > > something
> > > > > > > > > > > > > >>> like
> > > > > > > > > > > > > >>> > > > this:
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > onPartitionsAssigned(ConsumerAssignment
> > > > > > > > > > > assignment)
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > > And on the producer:
> > > > > > > > > > > > > >>> > > > > >> > > > void initTransactions(String
> > > > groupId,
> > > > > > > > > > > > > ConsumerAssignment
> > > > > > > > > > > > > >>> > > > > >> assignment);
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > > 2. Another bit of awkwardness
> is
> > > the
> > > > > > fact
> > > > > > > > that
> > > > > > > > > > we
> > > > > > > > > > > > have
> > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > >>> > pass
> > > > > > > > > > > > > >>> > > > the
> > > > > > > > > > > > > >>> > > > > >> > > groupId
> > > > > > > > > > > > > >>> > > > > >> > > > through both
> initTransactions()
> > > and
> > > > > > > > > > > > > >>> > > sendOffsetsToTransaction().
> > > > > > > > > > > > > >>> > > > We
> > > > > > > > > > > > > >>> > > > > >> > could
> > > > > > > > > > > > > >>> > > > > >> > > > consider a config instead.
> Maybe
> > > > > > something
> > > > > > > > > like
> > > > > > > > > > `
> > > > > > > > > > > > > >>> > > > > >> > transactional.group.id
> > > > > > > > > > > > > >>> > > > > >> > > `?
> > > > > > > > > > > > > >>> > > > > >> > > > Then we could simplify the
> > > producer
> > > > > > APIs,
> > > > > > > > > > > > potentially
> > > > > > > > > > > > > >>> even
> > > > > > > > > > > > > >>> > > > > >> deprecating
> > > > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > > > >>> > > > > >> > > > current
> > sendOffsetsToTransaction.
> > > In
> > > > > > fact,
> > > > > > > > for
> > > > > > > > > > > this
> > > > > > > > > > > > > new
> > > > > > > > > > > > > >>> > usage,
> > > > > > > > > > > > > >>> > > > the `
> > > > > > > > > > > > > >>> > > > > >> > > > transational.id` config is
> not
> > > > > needed.
> > > > > > It
> > > > > > > > > would
> > > > > > > > > > > be
> > > > > > > > > > > > > >>> nice if
> > > > > > > > > > > > > >>> > we
> > > > > > > > > > > > > >>> > > > don't
> > > > > > > > > > > > > >>> > > > > >> > have
> > > > > > > > > > > > > >>> > > > > >> > > > to
> > > > > > > > > > > > > >>> > > > > >> > > > provide it.
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > > I like the idea of combining 1
> > and
> > > > 2.
> > > > > We
> > > > > > > > could
> > > > > > > > > > > > > >>> definitely
> > > > > > > > > > > > > >>> > pass
> > > > > > > > > > > > > >>> > > > in a
> > > > > > > > > > > > > >>> > > > > >> > > group.id config
> > > > > > > > > > > > > >>> > > > > >> > > so that we could avoid exposing
> > that
> > > > > > > > information
> > > > > > > > > > in
> > > > > > > > > > > a
> > > > > > > > > > > > > >>> public
> > > > > > > > > > > > > >>> > > API.
> > > > > > > > > > > > > >>> > > > The
> > > > > > > > > > > > > >>> > > > > >> > > question I have
> > > > > > > > > > > > > >>> > > > > >> > > is that whether we should name
> the
> > > > > > interface
> > > > > > > > > > > > > >>> `GroupAssignment`
> > > > > > > > > > > > > >>> > > > > >> instead,
> > > > > > > > > > > > > >>> > > > > >> > so
> > > > > > > > > > > > > >>> > > > > >> > > that Connect later
> > > > > > > > > > > > > >>> > > > > >> > > could also extend on the same
> > > > interface,
> > > > > > > just
> > > > > > > > to
> > > > > > > > > > > echo
> > > > > > > > > > > > > >>> > Guozhang's
> > > > > > > > > > > > > >>> > > > point
> > > > > > > > > > > > > >>> > > > > >> > > here, Also the base interface
> > > > > > > > > > > > > >>> > > > > >> > > is better to be defined empty
> for
> > > easy
> > > > > > > > > extension,
> > > > > > > > > > or
> > > > > > > > > > > > > >>> define an
> > > > > > > > > > > > > >>> > > > > >> abstract
> > > > > > > > > > > > > >>> > > > > >> > > type called `Resource` to be
> > > shareable
> > > > > > > > > > > > > >>> > > > > >> > > later IMHO.
> > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > >>> > > > > >> > > > By the way, I'm a bit confused
> > > about
> > > > > > > > > discussion
> > > > > > > > > > > > above
> > > > > > > > > > > > > >>> about
> > > > > > > > > > > > > >>> > > > > >> colocating
> > > > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > > > >>> > > > > >> > > > txn and group coordinators.
> That
> > > is
> > > > > not
> > > > > > > > > actually
> > > > > > > > > > > > > >>> necessary,
> > > > > > > > > > > > > >>> > is
> > > > > > > > > > > > > >>> > > > it?
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > > Yes, this is not a requirement
> > for
> > > > > this
> > > > > > > KIP,
> > > > > > > > > > > because
> > > > > > > > > > > > > it
> > > > > > > > > > > > > >>> is
> > > > > > > > > > > > > >>> > > > > >> inherently
> > > > > > > > > > > > > >>> > > > > >> > > impossible to
> > > > > > > > > > > > > >>> > > > > >> > > achieve co-locating  topic
> > partition
> > > > of
> > > > > > > > > > transaction
> > > > > > > > > > > > log
> > > > > > > > > > > > > >>> and
> > > > > > > > > > > > > >>> > > > consumed
> > > > > > > > > > > > > >>> > > > > >> > offset
> > > > > > > > > > > > > >>> > > > > >> > > topics.
> > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > >>> > > > > >> > > > Thanks,
> > > > > > > > > > > > > >>> > > > > >> > > > Jason
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM
> > > > Boyang
> > > > > > > Chen <
> > > > > > > > > > > > > >>> > > > > >> reluctanthero...@gmail.com
> > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > >>> > > > > >> > > > wrote:
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > Thank you Ismael for the
> > > > suggestion.
> > > > > > We
> > > > > > > > will
> > > > > > > > > > > > attempt
> > > > > > > > > > > > > >>> to
> > > > > > > > > > > > > >>> > > > address
> > > > > > > > > > > > > >>> > > > > >> it by
> > > > > > > > > > > > > >>> > > > > >> > > > > giving more details to
> > rejected
> > > > > > > > alternative
> > > > > > > > > > > > section.
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > Thank you for the comment
> > > > Guozhang!
> > > > > > > > Answers
> > > > > > > > > > are
> > > > > > > > > > > > > inline
> > > > > > > > > > > > > >>> > > below.
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33
> > PM
> > > > > > Guozhang
> > > > > > > > > Wang
> > > > > > > > > > <
> > > > > > > > > > > > > >>> > > > wangg...@gmail.com
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >> > > > wrote:
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > Hello Boyang,
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > Thanks for the KIP, I have
> > > some
> > > > > > > comments
> > > > > > > > > > > below:
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > 1. "Once transactions are
> > > > > complete,
> > > > > > > the
> > > > > > > > > call
> > > > > > > > > > > > will
> > > > > > > > > > > > > >>> > return."
> > > > > > > > > > > > > >>> > > > This
> > > > > > > > > > > > > >>> > > > > >> > seems
> > > > > > > > > > > > > >>> > > > > >> > > > > > different from the
> existing
> > > > > > behavior,
> > > > > > > in
> > > > > > > > > > which
> > > > > > > > > > > > we
> > > > > > > > > > > > > >>> would
> > > > > > > > > > > > > >>> > > > return a
> > > > > > > > > > > > > >>> > > > > >> > > > > retriable
> > > > > > > > > > > > > >>> > > > > >> > > > > > CONCURRENT_TRANSACTIONS
> and
> > > let
> > > > > the
> > > > > > > > client
> > > > > > > > > > to
> > > > > > > > > > > > > >>> retry, is
> > > > > > > > > > > > > >>> > > this
> > > > > > > > > > > > > >>> > > > > >> > > > intentional?
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > I don’t think it is
> > intentional,
> > > > > and I
> > > > > > > > will
> > > > > > > > > > > defer
> > > > > > > > > > > > > this
> > > > > > > > > > > > > >>> > > > question to
> > > > > > > > > > > > > >>> > > > > >> > > Jason
> > > > > > > > > > > > > >>> > > > > >> > > > > when he got time to answer
> > since
> > > > > from
> > > > > > > > what I
> > > > > > > > > > > > > >>> understood
> > > > > > > > > > > > > >>> > > retry
> > > > > > > > > > > > > >>> > > > and
> > > > > > > > > > > > > >>> > > > > >> on
> > > > > > > > > > > > > >>> > > > > >> > > hold
> > > > > > > > > > > > > >>> > > > > >> > > > > seem both valid approaches.
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > 2. "an overload to
> > > > > > > onPartitionsAssigned
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > > > >>> > consumer's
> > > > > > > > > > > > > >>> > > > > >> rebalance
> > > > > > > > > > > > > >>> > > > > >> > > > > > listener interface": as
> part
> > > of
> > > > > > > KIP-341
> > > > > > > > > > we've
> > > > > > > > > > > > > >>> already
> > > > > > > > > > > > > >>> > add
> > > > > > > > > > > > > >>> > > > this
> > > > > > > > > > > > > >>> > > > > >> > > > > information
> > > > > > > > > > > > > >>> > > > > >> > > > > > to the onAssignment
> > callback.
> > > > > Would
> > > > > > > this
> > > > > > > > > be
> > > > > > > > > > > > > >>> sufficient?
> > > > > > > > > > > > > >>> > Or
> > > > > > > > > > > > > >>> > > > more
> > > > > > > > > > > > > >>> > > > > >> > > > generally
> > > > > > > > > > > > > >>> > > > > >> > > > > > speaking, which
> information
> > > have
> > > > > to
> > > > > > be
> > > > > > > > > > passed
> > > > > > > > > > > > > >>> around in
> > > > > > > > > > > > > >>> > > > > >> rebalance
> > > > > > > > > > > > > >>> > > > > >> > > > > callback
> > > > > > > > > > > > > >>> > > > > >> > > > > > while others can be passed
> > > > around
> > > > > in
> > > > > > > > > > > > > >>> PartitionAssignor
> > > > > > > > > > > > > >>> > > > > >> callback? In
> > > > > > > > > > > > > >>> > > > > >> > > > > Streams
> > > > > > > > > > > > > >>> > > > > >> > > > > > for example both callbacks
> > are
> > > > > used
> > > > > > > but
> > > > > > > > > most
> > > > > > > > > > > > > >>> critical
> > > > > > > > > > > > > >>> > > > > >> information
> > > > > > > > > > > > > >>> > > > > >> > is
> > > > > > > > > > > > > >>> > > > > >> > > > > passed
> > > > > > > > > > > > > >>> > > > > >> > > > > > via onAssignment.
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > We still need to extend
> > > > > > > > > > > ConsumerRebalanceListener
> > > > > > > > > > > > > >>> because
> > > > > > > > > > > > > >>> > > > it’s the
> > > > > > > > > > > > > >>> > > > > >> > > > > interface we could have
> public
> > > > > access
> > > > > > > to.
> > > > > > > > > The
> > > > > > > > > > > > > >>> > #onAssignment
> > > > > > > > > > > > > >>> > > > call
> > > > > > > > > > > > > >>> > > > > >> is
> > > > > > > > > > > > > >>> > > > > >> > > > defined
> > > > > > > > > > > > > >>> > > > > >> > > > > on PartitionAssignor level
> > which
> > > > is
> > > > > > not
> > > > > > > > easy
> > > > > > > > > > to
> > > > > > > > > > > > work
> > > > > > > > > > > > > >>> with
> > > > > > > > > > > > > >>> > > > external
> > > > > > > > > > > > > >>> > > > > >> > > > > producers.
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > 3. "We propose to use a
> > > separate
> > > > > > > record
> > > > > > > > > type
> > > > > > > > > > > in
> > > > > > > > > > > > > >>> order to
> > > > > > > > > > > > > >>> > > > store
> > > > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > > > >>> > > > > >> > > > group
> > > > > > > > > > > > > >>> > > > > >> > > > > > assignment.": hmm, I
> thought
> > > > with
> > > > > > the
> > > > > > > > > third
> > > > > > > > > > > > typed
> > > > > > > > > > > > > >>> > > > > >> FindCoordinator,
> > > > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > > > >>> > > > > >> > > > > same
> > > > > > > > > > > > > >>> > > > > >> > > > > > broker that act as the
> > > consumer
> > > > > > > > > coordinator
> > > > > > > > > > > > would
> > > > > > > > > > > > > >>> > always
> > > > > > > > > > > > > >>> > > be
> > > > > > > > > > > > > >>> > > > > >> > selected
> > > > > > > > > > > > > >>> > > > > >> > > > as
> > > > > > > > > > > > > >>> > > > > >> > > > > > the txn coordinator, in
> > which
> > > > case
> > > > > > it
> > > > > > > > can
> > > > > > > > > > > access
> > > > > > > > > > > > > its
> > > > > > > > > > > > > >>> > local
> > > > > > > > > > > > > >>> > > > cache
> > > > > > > > > > > > > >>> > > > > >> > > > > metadata /
> > > > > > > > > > > > > >>> > > > > >> > > > > > offset topic to get this
> > > > > information
> > > > > > > > > > already?
> > > > > > > > > > > We
> > > > > > > > > > > > > >>> just
> > > > > > > > > > > > > >>> > need
> > > > > > > > > > > > > >>> > > > to
> > > > > > > > > > > > > >>> > > > > >> think
> > > > > > > > > > > > > >>> > > > > >> > > > about
> > > > > > > > > > > > > >>> > > > > >> > > > > > how to make these two
> > modules
> > > > > > directly
> > > > > > > > > > > exchange
> > > > > > > > > > > > > >>> > > information
> > > > > > > > > > > > > >>> > > > > >> without
> > > > > > > > > > > > > >>> > > > > >> > > > > messing
> > > > > > > > > > > > > >>> > > > > >> > > > > > up the code hierarchy.
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > These two coordinators will
> be
> > > on
> > > > > the
> > > > > > > same
> > > > > > > > > > > broker
> > > > > > > > > > > > > only
> > > > > > > > > > > > > >>> > when
> > > > > > > > > > > > > >>> > > > > >> number of
> > > > > > > > > > > > > >>> > > > > >> > > > > partitions for transaction
> > state
> > > > > topic
> > > > > > > and
> > > > > > > > > > > > consumer
> > > > > > > > > > > > > >>> offset
> > > > > > > > > > > > > >>> > > > topic
> > > > > > > > > > > > > >>> > > > > >> are
> > > > > > > > > > > > > >>> > > > > >> > > the
> > > > > > > > > > > > > >>> > > > > >> > > > > same. This normally holds
> > true,
> > > > but
> > > > > > I'm
> > > > > > > > > afraid
> > > > > > > > > > > > > >>> > > > > >> > > > > we couldn't make this
> > > assumption?
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > 4. The config of
> > > > > > > > > > > > "CONSUMER_GROUP_AWARE_TRANSACTION":
> > > > > > > > > > > > > >>> it
> > > > > > > > > > > > > >>> > > seems
> > > > > > > > > > > > > >>> > > > the
> > > > > > > > > > > > > >>> > > > > >> > goal
> > > > > > > > > > > > > >>> > > > > >> > > of
> > > > > > > > > > > > > >>> > > > > >> > > > > > this config is just to
> avoid
> > > > > > > > old-versioned
> > > > > > > > > > > > broker
> > > > > > > > > > > > > >>> to not
> > > > > > > > > > > > > >>> > > be
> > > > > > > > > > > > > >>> > > > > >> able to
> > > > > > > > > > > > > >>> > > > > >> > > > > > recognize newer versioned
> > > > client.
> > > > > I
> > > > > > > > think
> > > > > > > > > if
> > > > > > > > > > > we
> > > > > > > > > > > > > can
> > > > > > > > > > > > > >>> do
> > > > > > > > > > > > > >>> > > > something
> > > > > > > > > > > > > >>> > > > > >> > else
> > > > > > > > > > > > > >>> > > > > >> > > > to
> > > > > > > > > > > > > >>> > > > > >> > > > > > avoid this config though,
> > for
> > > > > > example
> > > > > > > we
> > > > > > > > > can
> > > > > > > > > > > use
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >>> > > > embedded
> > > > > > > > > > > > > >>> > > > > >> > > > AdminClient
> > > > > > > > > > > > > >>> > > > > >> > > > > > to send the APIVersion
> > request
> > > > > upon
> > > > > > > > > starting
> > > > > > > > > > > up,
> > > > > > > > > > > > > and
> > > > > > > > > > > > > >>> > based
> > > > > > > > > > > > > >>> > > > on
> > > > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > > > >>> > > > > >> > > > > returned
> > > > > > > > > > > > > >>> > > > > >> > > > > > value decides whether to
> go
> > to
> > > > the
> > > > > > old
> > > > > > > > > code
> > > > > > > > > > > path
> > > > > > > > > > > > > or
> > > > > > > > > > > > > >>> the
> > > > > > > > > > > > > >>> > > new
> > > > > > > > > > > > > >>> > > > > >> > behavior.
> > > > > > > > > > > > > >>> > > > > >> > > > > > Admittedly asking a random
> > > > broker
> > > > > > > about
> > > > > > > > > > > > APIVersion
> > > > > > > > > > > > > >>> does
> > > > > > > > > > > > > >>> > > not
> > > > > > > > > > > > > >>> > > > > >> > guarantee
> > > > > > > > > > > > > >>> > > > > >> > > > the
> > > > > > > > > > > > > >>> > > > > >> > > > > > whole cluster's versions,
> > but
> > > > what
> > > > > > we
> > > > > > > > can
> > > > > > > > > do
> > > > > > > > > > > is
> > > > > > > > > > > > to
> > > > > > > > > > > > > >>> first
> > > > > > > > > > > > > >>> > > 1)
> > > > > > > > > > > > > >>> > > > find
> > > > > > > > > > > > > >>> > > > > >> > the
> > > > > > > > > > > > > >>> > > > > >> > > > > > coordinator (and if the
> > random
> > > > > > broker
> > > > > > > > does
> > > > > > > > > > not
> > > > > > > > > > > > > even
> > > > > > > > > > > > > >>> > > > recognize
> > > > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > > > >>> > > > > >> > new
> > > > > > > > > > > > > >>> > > > > >> > > > > > discover type, fall back
> to
> > > old
> > > > > path
> > > > > > > > > > > directly),
> > > > > > > > > > > > > and
> > > > > > > > > > > > > >>> then
> > > > > > > > > > > > > >>> > > 2)
> > > > > > > > > > > > > >>> > > > ask
> > > > > > > > > > > > > >>> > > > > >> the
> > > > > > > > > > > > > >>> > > > > >> > > > > > discovered coordinator
> about
> > > its
> > > > > > > > supported
> > > > > > > > > > > > > >>> APIVersion.
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > The caveat here is that we
> > have
> > > to
> > > > > > make
> > > > > > > > sure
> > > > > > > > > > > both
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >>> > group
> > > > > > > > > > > > > >>> > > > > >> > coordinator
> > > > > > > > > > > > > >>> > > > > >> > > > and
> > > > > > > > > > > > > >>> > > > > >> > > > > transaction coordinator are
> on
> > > the
> > > > > > > latest
> > > > > > > > > > > version
> > > > > > > > > > > > > >>> during
> > > > > > > > > > > > > >>> > > init
> > > > > > > > > > > > > >>> > > > > >> stage.
> > > > > > > > > > > > > >>> > > > > >> > > This
> > > > > > > > > > > > > >>> > > > > >> > > > > is potentially doable as we
> > only
> > > > > need
> > > > > > a
> > > > > > > > > > consumer
> > > > > > > > > > > > > >>> group.id
> > > > > > > > > > > > > >>> > > > > >> > > > > to check that. In the
> > meantime,
> > > a
> > > > > > > > hard-coded
> > > > > > > > > > > > config
> > > > > > > > > > > > > is
> > > > > > > > > > > > > >>> > > still a
> > > > > > > > > > > > > >>> > > > > >> > > favorable
> > > > > > > > > > > > > >>> > > > > >> > > > > backup in case the server
> has
> > > > > > > downgraded,
> > > > > > > > so
> > > > > > > > > > you
> > > > > > > > > > > > > will
> > > > > > > > > > > > > >>> want
> > > > > > > > > > > > > >>> > > to
> > > > > > > > > > > > > >>> > > > use
> > > > > > > > > > > > > >>> > > > > >> a
> > > > > > > > > > > > > >>> > > > > >> > new
> > > > > > > > > > > > > >>> > > > > >> > > > > version client without
> > `consumer
> > > > > > group`
> > > > > > > > > > > > > transactional
> > > > > > > > > > > > > >>> > > support.
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > 5. This is a meta question:
> > have
> > > > you
> > > > > > > > > > considered
> > > > > > > > > > > > how
> > > > > > > > > > > > > >>> this
> > > > > > > > > > > > > >>> > can
> > > > > > > > > > > > > >>> > > > be
> > > > > > > > > > > > > >>> > > > > >> > applied
> > > > > > > > > > > > > >>> > > > > >> > > > to
> > > > > > > > > > > > > >>> > > > > >> > > > > > Kafka Connect as well? For
> > > > > example,
> > > > > > > for
> > > > > > > > > > source
> > > > > > > > > > > > > >>> > connectors,
> > > > > > > > > > > > > >>> > > > the
> > > > > > > > > > > > > >>> > > > > >> > > > assignment
> > > > > > > > > > > > > >>> > > > > >> > > > > > is not by "partitions",
> but
> > by
> > > > > some
> > > > > > > > other
> > > > > > > > > > sort
> > > > > > > > > > > > of
> > > > > > > > > > > > > >>> > > > "resources"
> > > > > > > > > > > > > >>> > > > > >> based
> > > > > > > > > > > > > >>> > > > > >> > > on
> > > > > > > > > > > > > >>> > > > > >> > > > > the
> > > > > > > > > > > > > >>> > > > > >> > > > > > source systems, how
> KIP-447
> > > > would
> > > > > > > affect
> > > > > > > > > > Kafka
> > > > > > > > > > > > > >>> > Connectors
> > > > > > > > > > > > > >>> > > > that
> > > > > > > > > > > > > >>> > > > > >> > > > > implemented
> > > > > > > > > > > > > >>> > > > > >> > > > > > EOS as well?
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > No, it's not currently
> > included
> > > in
> > > > > the
> > > > > > > > > scope.
> > > > > > > > > > > > Could
> > > > > > > > > > > > > >>> you
> > > > > > > > > > > > > >>> > > point
> > > > > > > > > > > > > >>> > > > me
> > > > > > > > > > > > > >>> > > > > >> to a
> > > > > > > > > > > > > >>> > > > > >> > > > > sample source connector who
> > uses
> > > > > EOS?
> > > > > > > > Could
> > > > > > > > > > > always
> > > > > > > > > > > > > >>> > > piggy-back
> > > > > > > > > > > > > >>> > > > into
> > > > > > > > > > > > > >>> > > > > >> > the
> > > > > > > > > > > > > >>> > > > > >> > > > > TxnProducerIdentity struct
> > with
> > > > more
> > > > > > > > > > information
> > > > > > > > > > > > > such
> > > > > > > > > > > > > >>> as
> > > > > > > > > > > > > >>> > > > tasks. If
> > > > > > > > > > > > > >>> > > > > >> > > > > this is something to support
> > in
> > > > near
> > > > > > > term,
> > > > > > > > > an
> > > > > > > > > > > > > abstract
> > > > > > > > > > > > > >>> > type
> > > > > > > > > > > > > >>> > > > called
> > > > > > > > > > > > > >>> > > > > >> > > > > "Resource" could be provided
> > and
> > > > let
> > > > > > > topic
> > > > > > > > > > > > partition
> > > > > > > > > > > > > >>> and
> > > > > > > > > > > > > >>> > > > connect
> > > > > > > > > > > > > >>> > > > > >> task
> > > > > > > > > > > > > >>> > > > > >> > > > > implement it.
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > Guozhang
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > On Sat, Jun 22, 2019 at
> 8:40
> > > PM
> > > > > > Ismael
> > > > > > > > > Juma
> > > > > > > > > > <
> > > > > > > > > > > > > >>> > > > ism...@juma.me.uk>
> > > > > > > > > > > > > >>> > > > > >> > > wrote:
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > > Hi Boyang,
> > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > > Thanks for the KIP. It's
> > > good
> > > > > that
> > > > > > > we
> > > > > > > > > > > listed a
> > > > > > > > > > > > > >>> number
> > > > > > > > > > > > > >>> > of
> > > > > > > > > > > > > >>> > > > > >> rejected
> > > > > > > > > > > > > >>> > > > > >> > > > > > > alternatives. It would
> be
> > > > > helpful
> > > > > > to
> > > > > > > > > have
> > > > > > > > > > an
> > > > > > > > > > > > > >>> > explanation
> > > > > > > > > > > > > >>> > > > of
> > > > > > > > > > > > > >>> > > > > >> why
> > > > > > > > > > > > > >>> > > > > >> > > they
> > > > > > > > > > > > > >>> > > > > >> > > > > were
> > > > > > > > > > > > > >>> > > > > >> > > > > > > rejected.
> > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > > Ismael
> > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > > On Sat, Jun 22, 2019 at
> > 8:31
> > > > PM
> > > > > > > Boyang
> > > > > > > > > > Chen
> > > > > > > > > > > <
> > > > > > > > > > > > > >>> > > > > >> bche...@outlook.com
> > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > >>> > > > > >> > > > > wrote:
> > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > > > Hey all,
> > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > > > I would like to start
> a
> > > > > > discussion
> > > > > > > > for
> > > > > > > > > > > > > KIP-447:
> > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > > > this is a work
> > originated
> > > by
> > > > > > Jason
> > > > > > > > > > > Gustafson
> > > > > > > > > > > > > >>> and we
> > > > > > > > > > > > > >>> > > > would
> > > > > > > > > > > > > >>> > > > > >> like
> > > > > > > > > > > > > >>> > > > > >> > to
> > > > > > > > > > > > > >>> > > > > >> > > > > > proceed
> > > > > > > > > > > > > >>> > > > > >> > > > > > > > into discussion stage.
> > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > > > Let me know your
> > thoughts,
> > > > > > thanks!
> > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > > > Boyang
> > > > > > > > > > > > > >>> > > > > >> > > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > > > --
> > > > > > > > > > > > > >>> > > > > >> > > > > > -- Guozhang
> > > > > > > > > > > > > >>> > > > > >> > > > > >
> > > > > > > > > > > > > >>> > > > > >> > > > >
> > > > > > > > > > > > > >>> > > > > >> > > >
> > > > > > > > > > > > > >>> > > > > >> > >
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >> > --
> > > > > > > > > > > > > >>> > > > > >> > -- Guozhang
> > > > > > > > > > > > > >>> > > > > >> >
> > > > > > > > > > > > > >>> > > > > >>
> > > > > > > > > > > > > >>> > > > > >
> > > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > > >>> > >
> > > > > > > > > > > > > >>> >
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>> --
> > > > > > > > > > > > > >>> -- Guozhang
> > > > > > > > > > > > > >>>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>

Reply via email to