Thank you Guozhang for the suggestion! I would normally prefer naming a
flag corresponding to its functionality. Seems to me `isolation_level`
makes us another hop on information track.

As for the generation.id exposure, I'm fine leveraging the new API from
429, but however is that design finalized yet, and whether the API will be
added on the generic Consumer<K, V> interface?

Boyang

On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Boyang, thanks for the updated proposal!
>
> 3.a. As Jason mentioned, with EOS enabled we still need to augment the
> offset fetch request with a boolean to indicate "give me an retriable error
> code if there's pending offset, rather than sending me the committed offset
> immediately". Personally I still feel it is okay to piggy-back on the
> ISOLATION_LEVEL boolean, but I'm also fine with another `await_transaction`
> boolean if you feel strongly about it.
>
> 10. About the exposure of generation id, there may be some refactoring work
> coming from KIP-429 that can benefit KIP-447 as well since we are wrapping
> the consumer subscription / assignment data in new classes. Note that
> current proposal does not `generationId` since with the cooperative sticky
> assignor we think it is not necessary for correctness, but also if we agree
> it is okay to expose it we can potentially include it in
> `ConsumerAssignmentData` as well.
>
>
> Guozhang
>
>
> On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen <reluctanthero...@gmail.com>
> wrote:
>
> > Thank you Jason for the ideas.
> >
> > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Boyang,
> > >
> > > Thanks for the updates. A few comments below:
> > >
> > > 1. The KIP mentions that `transaction.timeout.ms` should be reduced to
> > > 10s.
> > > I think this makes sense for Kafka Streams which is tied to the
> consumer
> > > group semantics and uses a default 10s session timeout. However, it
> > seems a
> > > bit dangerous to make this change for the producer generally. Could we
> > just
> > > change it for streams?
> > >
> > > That sounds good to me.
> >
> > > 2. The new `initTransactions` API takes a `Consumer` instance. I think
> > the
> > > idea is to basically put in a backdoor to give the producer access to
> the
> > > group generationId. It's not clear to me how this would work given
> > package
> > > restrictions. I wonder if it would be better to just expose the state
> we
> > > need from the consumer. I know we have been reluctant to do this so far
> > > because we treat the generationId as an implementation detail.
> However, I
> > > think we might just bite the bullet and expose it rather than coming up
> > > with a messy hack. Concepts such as memberIds have already been exposed
> > in
> > > the AdminClient, so maybe it is not too bad. Alternatively, we could
> use
> > an
> > > opaque type. For example:
> > >
> > > // public
> > > interface GroupMetadata {}
> > >
> > > // private
> > > interface ConsumerGroupMetadata {
> > >   final int generationId;
> > >   final String memberId;
> > > }
> > >
> > > // Consumer API
> > > public GroupMetadata groupMetadata();
> > >
> > > I am probably leaning toward just exposing the state we need.
> > >
> > > Yes, also to mention that Kafka Streams use generic Cosnumer API which
> > doesn't have rich
> > states like a full `KafkaConsumer`. The hack will not work as expected.
> >
> > Instead, just exposing the consumer generation.id seems a way easier
> work.
> > We could consolidate
> > the API and make it
> >
> > 3. Given that we are already providing a way to propagate group state
> from
> > > the consumer to the producer, I wonder if we may as well include the
> > > memberId and groupInstanceId. This would make the validation we do for
> > > TxnOffsetCommit consistent with OffsetCommit. If for no other benefit,
> at
> > > least this may help with debugging.
> > >
> >
> > Yes, we could put them into the GroupMetadata struct.
> >
> >
> > > 4. I like the addition of isolation_level to the offset fetch. At the
> > same
> > > time, its behavior is a bit inconsistent with how it is used in the
> > > consumer generally. There is no reason for the group coordinator to
> ever
> > > expose aborted data, so this is mostly about awaiting pending offset
> > > commits, not reading uncommitted data. Perhaps instead of calling this
> > > "isolation level," it should be more like "await_pending_transaction"
> or
> > > something like that?
> > >
> > > Also, just to be clear, the consumer would treat this as an optional
> > field,
> > > right? So if the broker does not support the latest OffsetFetch API, it
> > > would silently revert to reading the old data. Basically it would be up
> > to
> > > the streams version probing logic to ensure that the expectation on
> this
> > > API fits with the usage of `transctional.id`.
> > >
> > > Sounds like a better naming to me, while I think it could be shortened
> to
> > `await_transaction`.
> > I think the field should be optional, too.
> >
> >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen <reluctanthero...@gmail.com
> >
> > > wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > I will correct my statement from last email. I don't think the
> > > > read_committed (3.a) is necessary to be added to the OffsetFetch
> > request,
> > > > as if we are using EOS application, the underlying consumers within
> the
> > > > group should always back off when there is pending offsets.
> > > >
> > > > Let me know if you think this is correct.
> > > >
> > > > On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen <
> reluctanthero...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Thank you Guozhang for the questions, inline answers are below.
> > > > >
> > > > > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen <
> > reluctanthero...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > >> Hey all,
> > > > >>
> > > > >> I have done a fundamental polish of KIP-447
> > > > >> <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > >
> > > > and
> > > > >> written a design doc
> > > > >> <
> > > >
> > >
> >
> https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit#
> > > >
> > > > depicting
> > > > >> internal changes. We stripped off many implementation details from
> > the
> > > > KIP,
> > > > >> and simplified the public changes by a lot. For reviewers, it is
> > > highly
> > > > >> recommended to fully understand EOS design in KIP-98 and read its
> > > > >> corresponding design doc
> > > > >> <
> > > >
> > >
> >
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit
> > > >
> > > > if
> > > > >> you haven't done so already.
> > > > >>
> > > > >> Let me know if you found anything confusing around the KIP or the
> > > > design.
> > > > >> Would be happy to discuss in depth.
> > > > >>
> > > > >> Best,
> > > > >> Boyang
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang <
> wangg...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >>> 2. The reason we did not expose generation.id from KafkaConsumer
> > > > public
> > > > >>> APIs directly is to abstract this notion from users (since it is
> an
> > > > >>> implementation detail of the rebalance protocol itself, e.g. if
> > user
> > > > >>> calls
> > > > >>> consumer.assign() they do not need to invoke ConsumerCoordinator
> > and
> > > no
> > > > >>> need to be aware of generation.id at all).
> > > > >>>
> > > > >>> On the other hand, with the current proposal the txn.coordiantor
> > did
> > > > not
> > > > >>> know about the latest generation from the source-of-truth
> > > > >>> group.coordinator; instead, it will only bump up the generation
> > from
> > > > the
> > > > >>> producer's InitProducerIdRequest only.
> > > > >>>
> > > > >>> The key here is that GroupCoordinator, when handling
> > > > >>> `InitProducerIdRequest
> > > > >>>
> > > > >> In the new design, we just pass the entire consumer instance into
> > the
> > > > > producer through
> > > > > #initTransaction, so no public API will be created.
> > > > >
> > > > >> 3. I agree that if we rely on the group coordinator to block on
> > > > returning
> > > > >>> offset-fetch-response if read-committed is enabled, then we do
> not
> > > need
> > > > >>> to
> > > > >>> store partition assignment on txn coordinator and therefore it's
> > > better
> > > > >>> to
> > > > >>> still decouple them. For that case we still need to update the
> KIP
> > > wiki
> > > > >>> page that includes:
> > > > >>>
> > > > >>> 3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as well.
> > > > >>> 3.b. Add new error code in OffsetFetchResponse to let client
> > backoff
> > > > and
> > > > >>> retry if there are pending txns including the interested
> > partitions.
> > > > >>> 3.c. Also in the worst case we would let the client be blocked
> for
> > > the
> > > > >>> txn.timeout period, and for that rationale we may need to
> consider
> > > > >>> reducing
> > > > >>> our default txn.timeout value as well.
> > > > >>>
> > > > >>> Addressed 3.b and 3.c, will do 3.a.
> > > > >
> > > > >> 4. According to Colin it seems we do not need to create another
> KIP
> > > and
> > > > we
> > > > >>> can just complete it as part of KIP-117 / KAFKA-5214; and we need
> > to
> > > do
> > > > >>> some cleanup to have BrokerApiVersion exposed from AdminClient
> > > (@Colin
> > > > >>> please let use know if you have any concerns exposing it).
> > > > >>>
> > > > >> I think we no longer need to rely on api version for
> initialization,
> > > > > since we will be using the upgrade.from config anyway.
> > > > >
> > > > >>
> > > > >>> Guozhang
> > > > >>>
> > > > >>>
> > > > >>> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson <
> > ja...@confluent.io>
> > > > >>> wrote:
> > > > >>>
> > > > >>> > For reference, we have BrokerApiVersionCommand already as a
> > public
> > > > >>> > interface. We have a bit of tech debt at the moment because it
> > > uses a
> > > > >>> > custom AdminClient. It would be nice to clean that up. In
> > general,
> > > I
> > > > >>> think
> > > > >>> > it is reasonable to expose from AdminClient. It can be used by
> > > > >>> management
> > > > >>> > tools to inspect running Kafka versions for example.
> > > > >>> >
> > > > >>> > -Jason
> > > > >>> >
> > > > >>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen <
> > > > >>> reluctanthero...@gmail.com>
> > > > >>> > wrote:
> > > > >>> >
> > > > >>> > > Thank you for the context Colin. The groupId was indeed a
> > > > copy-paste
> > > > >>> > error.
> > > > >>> > > Our use case here for 447 is (Quoted from Guozhang):
> > > > >>> > > '''
> > > > >>> > > I think if we can do something else to
> > > > >>> > > avoid this config though, for example we can use the embedded
> > > > >>> AdminClient
> > > > >>> > > to send the APIVersion request upon starting up, and based on
> > the
> > > > >>> > returned
> > > > >>> > > value decides whether to go to the old code path or the new
> > > > behavior.
> > > > >>> > > '''
> > > > >>> > > The benefit we get is to avoid adding a new configuration to
> > > make a
> > > > >>> > > decision simply base on broker version. If you have concerns
> > with
> > > > >>> > exposing
> > > > >>> > > ApiVersion for client, we could
> > > > >>> > > try to think of alternative solutions too.
> > > > >>> > >
> > > > >>> > > Boyang
> > > > >>> > >
> > > > >>> > >
> > > > >>> > >
> > > > >>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe <
> > cmcc...@apache.org
> > > >
> > > > >>> wrote:
> > > > >>> > >
> > > > >>> > > > kafka.api.ApiVersion is an internal class, not suitable to
> > > > exposing
> > > > >>> > > > through AdminClient.  That class is not even accessible
> > without
> > > > >>> having
> > > > >>> > > the
> > > > >>> > > > broker jars on your CLASSPATH.
> > > > >>> > > >
> > > > >>> > > > Another question is, what is the groupId parameter doing in
> > the
> > > > >>> call?
> > > > >>> > > The
> > > > >>> > > > API versions are the same no matter what consumer group we
> > use,
> > > > >>> right?
> > > > >>> > > > Perhaps this was a copy and paste error?
> > > > >>> > > >
> > > > >>> > > > This is not the first time we have discussed having a
> method
> > in
> > > > >>> > > > AdminClient to retrieve API version information.  In fact,
> > the
> > > > >>> original
> > > > >>> > > KIP
> > > > >>> > > > which created KafkaAdminClient specified an API for
> fetching
> > > > >>> version
> > > > >>> > > > information.  It was called apiVersions and it is still
> there
> > > on
> > > > >>> the
> > > > >>> > > wiki.
> > > > >>> > > > See
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> > > > >>> > > >
> > > > >>> > > > However, this API wasn't ready in time for 0.11.0 so we
> > shipped
> > > > >>> without
> > > > >>> > > > it.  There was a JIRA to implement it for later versions,
> > > > >>> > > > https://issues.apache.org/jira/browse/KAFKA-5214 , as well
> > as
> > > a
> > > > >>> PR,
> > > > >>> > > > https://github.com/apache/kafka/pull/3012 .  However, we
> > > started
> > > > >>> to
> > > > >>> > > > rethink whether this AdminClient function was even
> necessary.
> > > > >>> Most of
> > > > >>> > > the
> > > > >>> > > > use-cases we could think of seemed like horrible hacks.  So
> > it
> > > > has
> > > > >>> > never
> > > > >>> > > > really been implemented (yet?).
> > > > >>> > > >
> > > > >>> > > > best,
> > > > >>> > > > Colin
> > > > >>> > > >
> > > > >>> > > >
> > > > >>> > > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote:
> > > > >>> > > > > Actually, after a second thought, I think it actually
> makes
> > > > >>> sense to
> > > > >>> > > > > support auto upgrade through admin client to help use get
> > api
> > > > >>> version
> > > > >>> > > > > from
> > > > >>> > > > > broker.
> > > > >>> > > > > A draft KIP is here:
> > > > >>> > > > >
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> > > > >>> > > > >
> > > > >>> > > > > Boyang
> > > > >>> > > > >
> > > > >>> > > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen <
> > > > >>> > > reluctanthero...@gmail.com>
> > > > >>> > > > > wrote:
> > > > >>> > > > >
> > > > >>> > > > > > Thank you Guozhang, some of my understandings are
> inline
> > > > below.
> > > > >>> > > > > >
> > > > >>> > > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson <
> > > > >>> > ja...@confluent.io
> > > > >>> > > >
> > > > >>> > > > > > wrote:
> > > > >>> > > > > >
> > > > >>> > > > > >> >
> > > > >>> > > > > >> > I think co-locating does have some merits here, i.e.
> > > > >>> letting the
> > > > >>> > > > > >> > ConsumerCoordinator which has the source-of-truth of
> > > > >>> assignment
> > > > >>> > to
> > > > >>> > > > act
> > > > >>> > > > > >> as
> > > > >>> > > > > >> > the TxnCoordinator as well; but I agree there's also
> > > some
> > > > >>> cons
> > > > >>> > of
> > > > >>> > > > > >> coupling
> > > > >>> > > > > >> > them together. I'm still a bit inclining towards
> > > > colocation
> > > > >>> but
> > > > >>> > if
> > > > >>> > > > there
> > > > >>> > > > > >> > are good rationales not to do so I can be convinced
> as
> > > > well.
> > > > >>> > > > > >>
> > > > >>> > > > > >>
> > > > >>> > > > > >> The good rationale is that we have no mechanism to
> > > colocate
> > > > >>> > > > partitions ;).
> > > > >>> > > > > >> Are you suggesting we store the group and transaction
> > > state
> > > > >>> in the
> > > > >>> > > > same
> > > > >>> > > > > >> log? Can you be more concrete about the benefit?
> > > > >>> > > > > >>
> > > > >>> > > > > >> -Jason
> > > > >>> > > > > >>
> > > > >>> > > > > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang <
> > > > >>> > wangg...@gmail.com>
> > > > >>> > > > > >> wrote:
> > > > >>> > > > > >>
> > > > >>> > > > > >> > Hi Boyang,
> > > > >>> > > > > >> >
> > > > >>> > > > > >> > 1. One advantage of retry against on-hold is that it
> > > will
> > > > >>> not
> > > > >>> > > > tie-up a
> > > > >>> > > > > >> > handler thread (of course the latter could do the
> same
> > > but
> > > > >>> that
> > > > >>> > > > involves
> > > > >>> > > > > >> > using a purgatory which is more complicated), and
> also
> > > it
> > > > is
> > > > >>> > less
> > > > >>> > > > > >> likely to
> > > > >>> > > > > >> > violate request timeout. So I think there are some
> > > > >>> rationales to
> > > > >>> > > > prefer
> > > > >>> > > > > >> > retries.
> > > > >>> > > > > >> >
> > > > >>> > > > > >>
> > > > >>> > > > > >  That sounds fair to me, also we are avoiding usage of
> > > > another
> > > > >>> > > > purgatory
> > > > >>> > > > > > instance. Usually for one back-off
> > > > >>> > > > > > we are only delaying 50ms during startup which is
> trivial
> > > > cost.
> > > > >>> > This
> > > > >>> > > > > > behavior shouldn't be changed.
> > > > >>> > > > > >
> > > > >>> > > > > > > 2. Regarding "ConsumerRebalanceListener": both
> > > > >>> > > > ConsumerRebalanceListener
> > > > >>> > > > > >> > and PartitionAssignors are user-customizable
> modules,
> > > and
> > > > >>> only
> > > > >>> > > > > >> difference
> > > > >>> > > > > >> > is that the former is specified via code and the
> > latter
> > > is
> > > > >>> > > > specified via
> > > > >>> > > > > >> > config.
> > > > >>> > > > > >> >
> > > > >>> > > > > >> > Regarding Jason's proposal of ConsumerAssignment,
> one
> > > > thing
> > > > >>> to
> > > > >>> > > note
> > > > >>> > > > > >> though
> > > > >>> > > > > >> > with KIP-429 the onPartitionAssigned may not be
> called
> > > if
> > > > >>> the
> > > > >>> > > > assignment
> > > > >>> > > > > >> > does not change, whereas onAssignment would always
> be
> > > > >>> called at
> > > > >>> > > the
> > > > >>> > > > end
> > > > >>> > > > > >> of
> > > > >>> > > > > >> > sync-group response. My proposed semantics is that
> > > > >>> > > > > >> > `RebalanceListener#onPartitionsXXX` are used for
> > > > >>> notifications
> > > > >>> > to
> > > > >>> > > > user,
> > > > >>> > > > > >> and
> > > > >>> > > > > >> > hence if there's no changes these will not be
> called,
> > > > >>> whereas
> > > > >>> > > > > >> > `PartitionAssignor` is used for assignor logic,
> whose
> > > > >>> callback
> > > > >>> > > would
> > > > >>> > > > > >> always
> > > > >>> > > > > >> > be called no matter if the partitions have changed
> or
> > > not.
> > > > >>> > > > > >>
> > > > >>> > > > > >> I think a third option is to gracefully expose
> > generation
> > > id
> > > > >>> as
> > > > >>> > part
> > > > >>> > > > of
> > > > >>> > > > > > consumer API, so that we don't need to
> > > > >>> > > > > > bother overloading various callbacks. Of course, this
> > > builds
> > > > >>> upon
> > > > >>> > the
> > > > >>> > > > > > assumption that topic partitions
> > > > >>> > > > > > will not be included in new initTransaction API.
> > > > >>> > > > > >
> > > > >>> > > > > > > 3. I feel it is a bit awkward to let the
> TxnCoordinator
> > > > >>> keeping
> > > > >>> > > > partition
> > > > >>> > > > > >> > assignments since it is sort of taking over the job
> of
> > > the
> > > > >>> > > > > >> > ConsumerCoordinator, and may likely cause a
> > split-brain
> > > > >>> problem
> > > > >>> > as
> > > > >>> > > > two
> > > > >>> > > > > >> > coordinators keep a copy of this assignment which
> may
> > be
> > > > >>> > > different.
> > > > >>> > > > > >> >
> > > > >>> > > > > >> > I think co-locating does have some merits here, i.e.
> > > > >>> letting the
> > > > >>> > > > > >> > ConsumerCoordinator which has the source-of-truth of
> > > > >>> assignment
> > > > >>> > to
> > > > >>> > > > act
> > > > >>> > > > > >> as
> > > > >>> > > > > >> > the TxnCoordinator as well; but I agree there's also
> > > some
> > > > >>> cons
> > > > >>> > of
> > > > >>> > > > > >> coupling
> > > > >>> > > > > >> > them together. I'm still a bit inclining towards
> > > > colocation
> > > > >>> but
> > > > >>> > if
> > > > >>> > > > there
> > > > >>> > > > > >> > are good rationales not to do so I can be convinced
> as
> > > > well.
> > > > >>> > > > > >> >
> > > > >>> > > > > >>
> > > > >>> > > > > > The purpose of co-location is to let txn coordinator
> see
> > > the
> > > > >>> group
> > > > >>> > > > > > assignment. This priority is weakened
> > > > >>> > > > > > when we already have defense on the consumer offset
> > fetch,
> > > > so I
> > > > >>> > guess
> > > > >>> > > > it's
> > > > >>> > > > > > not super important anymore.
> > > > >>> > > > > >
> > > > >>> > > > > >
> > > > >>> > > > > >> > 4. I guess I'm preferring the philosophy of "only
> add
> > > > >>> configs if
> > > > >>> > > > > >> there's no
> > > > >>> > > > > >> > other ways", since more and more configs would make
> it
> > > > less
> > > > >>> and
> > > > >>> > > less
> > > > >>> > > > > >> > intuitive out of the box to use.
> > > > >>> > > > > >> >
> > > > >>> > > > > >> > I think it's a valid point that checks upon starting
> > up
> > > > >>> does not
> > > > >>> > > > cope
> > > > >>> > > > > >> with
> > > > >>> > > > > >> > brokers downgrading but even with a config, but it
> is
> > > > still
> > > > >>> hard
> > > > >>> > > for
> > > > >>> > > > > >> users
> > > > >>> > > > > >> > to determine when they can be ensured the broker
> would
> > > > never
> > > > >>> > > > downgrade
> > > > >>> > > > > >> > anymore and hence can safely switch the config. So
> my
> > > > >>> feeling is
> > > > >>> > > > that
> > > > >>> > > > > >> this
> > > > >>> > > > > >> > config would not be helping too much still. If we
> want
> > > to
> > > > >>> be at
> > > > >>> > > the
> > > > >>> > > > > >> safer
> > > > >>> > > > > >> > side, then I'd suggest we modify the Coordinator ->
> > > > >>> > NetworkClient
> > > > >>> > > > > >> hierarchy
> > > > >>> > > > > >> > to allow the NetworkClient being able to pass the
> > > > APIVersion
> > > > >>> > > > metadata to
> > > > >>> > > > > >> > Coordinator, so that Coordinator can rely on that
> > logic
> > > to
> > > > >>> > change
> > > > >>> > > > its
> > > > >>> > > > > >> > behavior dynamically.
> > > > >>> > > > > >>
> > > > >>> > > > > > The stream thread init could not be supported by a
> client
> > > > >>> > coordinator
> > > > >>> > > > > > behavior change on the fly,
> > > > >>> > > > > > we are only losing possibilities after we initialized.
> > > (main
> > > > >>> thread
> > > > >>> > > > gets
> > > > >>> > > > > > exit and no thread has global picture anymore)
> > > > >>> > > > > > If we do want to support auto version detection, admin
> > > client
> > > > >>> > request
> > > > >>> > > > in
> > > > >>> > > > > > this sense shall be easier.
> > > > >>> > > > > >
> > > > >>> > > > > >
> > > > >>> > > > > >> >
> > > > >>> > > > > >> > 5. I do not have a concrete idea about how the
> impact
> > on
> > > > >>> Connect
> > > > >>> > > > would
> > > > >>> > > > > >> > make, maybe Randall or Konstantine can help here?
> > > > >>> > > > > >> >
> > > > >>> > > > > >>
> > > > >>> > > > > > Sounds good, let's see their thoughts.
> > > > >>> > > > > >
> > > > >>> > > > > >
> > > > >>> > > > > >> > Guozhang
> > > > >>> > > > > >> >
> > > > >>> > > > > >> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen <
> > > > >>> > > > > >> reluctanthero...@gmail.com>
> > > > >>> > > > > >> > wrote:
> > > > >>> > > > > >> >
> > > > >>> > > > > >> > > Hey Jason,
> > > > >>> > > > > >> > >
> > > > >>> > > > > >> > > thank you for the proposal here. Some of my
> thoughts
> > > > >>> below.
> > > > >>> > > > > >> > >
> > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson <
> > > > >>> > > > ja...@confluent.io>
> > > > >>> > > > > >> > > wrote:
> > > > >>> > > > > >> > >
> > > > >>> > > > > >> > > > Hi Boyang,
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > > Thanks for picking this up! Still reading
> through
> > > the
> > > > >>> > updates,
> > > > >>> > > > but
> > > > >>> > > > > >> here
> > > > >>> > > > > >> > > are
> > > > >>> > > > > >> > > > a couple initial comments on the APIs:
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > > 1. The `TxnProducerIdentity` class is a bit
> > > awkward. I
> > > > >>> think
> > > > >>> > > we
> > > > >>> > > > are
> > > > >>> > > > > >> > > trying
> > > > >>> > > > > >> > > > to encapsulate state from the current group
> > > > assignment.
> > > > >>> > Maybe
> > > > >>> > > > > >> something
> > > > >>> > > > > >> > > > like `ConsumerAssignment` would be clearer? If
> we
> > > make
> > > > >>> the
> > > > >>> > > usage
> > > > >>> > > > > >> > > consistent
> > > > >>> > > > > >> > > > across the consumer and producer, then we can
> > avoid
> > > > >>> exposing
> > > > >>> > > > > >> internal
> > > > >>> > > > > >> > > state
> > > > >>> > > > > >> > > > like the generationId.
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > > For example:
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > > // Public API
> > > > >>> > > > > >> > > > interface ConsumerAssignment {
> > > > >>> > > > > >> > > >   Set<TopicPartition> partittions();
> > > > >>> > > > > >> > > > }
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > > // Not a public API
> > > > >>> > > > > >> > > > class InternalConsumerAssignment implements
> > > > >>> > > ConsumerAssignment {
> > > > >>> > > > > >> > > >   Set<TopicPartition> partittions;
> > > > >>> > > > > >> > > >   int generationId;
> > > > >>> > > > > >> > > > }
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > > Then we can change the rebalance listener to
> > > something
> > > > >>> like
> > > > >>> > > > this:
> > > > >>> > > > > >> > > > onPartitionsAssigned(ConsumerAssignment
> > assignment)
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > > And on the producer:
> > > > >>> > > > > >> > > > void initTransactions(String groupId,
> > > > ConsumerAssignment
> > > > >>> > > > > >> assignment);
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > > 2. Another bit of awkwardness is the fact that
> we
> > > have
> > > > >>> to
> > > > >>> > pass
> > > > >>> > > > the
> > > > >>> > > > > >> > > groupId
> > > > >>> > > > > >> > > > through both initTransactions() and
> > > > >>> > > sendOffsetsToTransaction().
> > > > >>> > > > We
> > > > >>> > > > > >> > could
> > > > >>> > > > > >> > > > consider a config instead. Maybe something like
> `
> > > > >>> > > > > >> > transactional.group.id
> > > > >>> > > > > >> > > `?
> > > > >>> > > > > >> > > > Then we could simplify the producer APIs,
> > > potentially
> > > > >>> even
> > > > >>> > > > > >> deprecating
> > > > >>> > > > > >> > > the
> > > > >>> > > > > >> > > > current sendOffsetsToTransaction. In fact, for
> > this
> > > > new
> > > > >>> > usage,
> > > > >>> > > > the `
> > > > >>> > > > > >> > > > transational.id` config is not needed. It would
> > be
> > > > >>> nice if
> > > > >>> > we
> > > > >>> > > > don't
> > > > >>> > > > > >> > have
> > > > >>> > > > > >> > > > to
> > > > >>> > > > > >> > > > provide it.
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > > I like the idea of combining 1 and 2. We could
> > > > >>> definitely
> > > > >>> > pass
> > > > >>> > > > in a
> > > > >>> > > > > >> > > group.id config
> > > > >>> > > > > >> > > so that we could avoid exposing that information
> in
> > a
> > > > >>> public
> > > > >>> > > API.
> > > > >>> > > > The
> > > > >>> > > > > >> > > question I have
> > > > >>> > > > > >> > > is that whether we should name the interface
> > > > >>> `GroupAssignment`
> > > > >>> > > > > >> instead,
> > > > >>> > > > > >> > so
> > > > >>> > > > > >> > > that Connect later
> > > > >>> > > > > >> > > could also extend on the same interface, just to
> > echo
> > > > >>> > Guozhang's
> > > > >>> > > > point
> > > > >>> > > > > >> > > here, Also the base interface
> > > > >>> > > > > >> > > is better to be defined empty for easy extension,
> or
> > > > >>> define an
> > > > >>> > > > > >> abstract
> > > > >>> > > > > >> > > type called `Resource` to be shareable
> > > > >>> > > > > >> > > later IMHO.
> > > > >>> > > > > >> > >
> > > > >>> > > > > >> > >
> > > > >>> > > > > >> > > > By the way, I'm a bit confused about discussion
> > > above
> > > > >>> about
> > > > >>> > > > > >> colocating
> > > > >>> > > > > >> > > the
> > > > >>> > > > > >> > > > txn and group coordinators. That is not actually
> > > > >>> necessary,
> > > > >>> > is
> > > > >>> > > > it?
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > > Yes, this is not a requirement for this KIP,
> > because
> > > > it
> > > > >>> is
> > > > >>> > > > > >> inherently
> > > > >>> > > > > >> > > impossible to
> > > > >>> > > > > >> > > achieve co-locating  topic partition of
> transaction
> > > log
> > > > >>> and
> > > > >>> > > > consumed
> > > > >>> > > > > >> > offset
> > > > >>> > > > > >> > > topics.
> > > > >>> > > > > >> > >
> > > > >>> > > > > >> > >
> > > > >>> > > > > >> > > > Thanks,
> > > > >>> > > > > >> > > > Jason
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen <
> > > > >>> > > > > >> reluctanthero...@gmail.com
> > > > >>> > > > > >> > >
> > > > >>> > > > > >> > > > wrote:
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > > > > Thank you Ismael for the suggestion. We will
> > > attempt
> > > > >>> to
> > > > >>> > > > address
> > > > >>> > > > > >> it by
> > > > >>> > > > > >> > > > > giving more details to rejected alternative
> > > section.
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > Thank you for the comment Guozhang! Answers
> are
> > > > inline
> > > > >>> > > below.
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang
> <
> > > > >>> > > > wangg...@gmail.com
> > > > >>> > > > > >> >
> > > > >>> > > > > >> > > > wrote:
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > > Hello Boyang,
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > > > Thanks for the KIP, I have some comments
> > below:
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > > > 1. "Once transactions are complete, the call
> > > will
> > > > >>> > return."
> > > > >>> > > > This
> > > > >>> > > > > >> > seems
> > > > >>> > > > > >> > > > > > different from the existing behavior, in
> which
> > > we
> > > > >>> would
> > > > >>> > > > return a
> > > > >>> > > > > >> > > > > retriable
> > > > >>> > > > > >> > > > > > CONCURRENT_TRANSACTIONS and let the client
> to
> > > > >>> retry, is
> > > > >>> > > this
> > > > >>> > > > > >> > > > intentional?
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > I don’t think it is intentional, and I will
> > defer
> > > > this
> > > > >>> > > > question to
> > > > >>> > > > > >> > > Jason
> > > > >>> > > > > >> > > > > when he got time to answer since from what I
> > > > >>> understood
> > > > >>> > > retry
> > > > >>> > > > and
> > > > >>> > > > > >> on
> > > > >>> > > > > >> > > hold
> > > > >>> > > > > >> > > > > seem both valid approaches.
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > > 2. "an overload to onPartitionsAssigned in
> the
> > > > >>> > consumer's
> > > > >>> > > > > >> rebalance
> > > > >>> > > > > >> > > > > > listener interface": as part of KIP-341
> we've
> > > > >>> already
> > > > >>> > add
> > > > >>> > > > this
> > > > >>> > > > > >> > > > > information
> > > > >>> > > > > >> > > > > > to the onAssignment callback. Would this be
> > > > >>> sufficient?
> > > > >>> > Or
> > > > >>> > > > more
> > > > >>> > > > > >> > > > generally
> > > > >>> > > > > >> > > > > > speaking, which information have to be
> passed
> > > > >>> around in
> > > > >>> > > > > >> rebalance
> > > > >>> > > > > >> > > > > callback
> > > > >>> > > > > >> > > > > > while others can be passed around in
> > > > >>> PartitionAssignor
> > > > >>> > > > > >> callback? In
> > > > >>> > > > > >> > > > > Streams
> > > > >>> > > > > >> > > > > > for example both callbacks are used but most
> > > > >>> critical
> > > > >>> > > > > >> information
> > > > >>> > > > > >> > is
> > > > >>> > > > > >> > > > > passed
> > > > >>> > > > > >> > > > > > via onAssignment.
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > We still need to extend
> > ConsumerRebalanceListener
> > > > >>> because
> > > > >>> > > > it’s the
> > > > >>> > > > > >> > > > > interface we could have public access to. The
> > > > >>> > #onAssignment
> > > > >>> > > > call
> > > > >>> > > > > >> is
> > > > >>> > > > > >> > > > defined
> > > > >>> > > > > >> > > > > on PartitionAssignor level which is not easy
> to
> > > work
> > > > >>> with
> > > > >>> > > > external
> > > > >>> > > > > >> > > > > producers.
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > > 3. "We propose to use a separate record type
> > in
> > > > >>> order to
> > > > >>> > > > store
> > > > >>> > > > > >> the
> > > > >>> > > > > >> > > > group
> > > > >>> > > > > >> > > > > > assignment.": hmm, I thought with the third
> > > typed
> > > > >>> > > > > >> FindCoordinator,
> > > > >>> > > > > >> > > the
> > > > >>> > > > > >> > > > > same
> > > > >>> > > > > >> > > > > > broker that act as the  consumer coordinator
> > > would
> > > > >>> > always
> > > > >>> > > be
> > > > >>> > > > > >> > selected
> > > > >>> > > > > >> > > > as
> > > > >>> > > > > >> > > > > > the txn coordinator, in which case it can
> > access
> > > > its
> > > > >>> > local
> > > > >>> > > > cache
> > > > >>> > > > > >> > > > > metadata /
> > > > >>> > > > > >> > > > > > offset topic to get this information
> already?
> > We
> > > > >>> just
> > > > >>> > need
> > > > >>> > > > to
> > > > >>> > > > > >> think
> > > > >>> > > > > >> > > > about
> > > > >>> > > > > >> > > > > > how to make these two modules directly
> > exchange
> > > > >>> > > information
> > > > >>> > > > > >> without
> > > > >>> > > > > >> > > > > messing
> > > > >>> > > > > >> > > > > > up the code hierarchy.
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > These two coordinators will be on the same
> > broker
> > > > only
> > > > >>> > when
> > > > >>> > > > > >> number of
> > > > >>> > > > > >> > > > > partitions for transaction state topic and
> > > consumer
> > > > >>> offset
> > > > >>> > > > topic
> > > > >>> > > > > >> are
> > > > >>> > > > > >> > > the
> > > > >>> > > > > >> > > > > same. This normally holds true, but I'm afraid
> > > > >>> > > > > >> > > > > we couldn't make this assumption?
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > 4. The config of
> > > "CONSUMER_GROUP_AWARE_TRANSACTION":
> > > > >>> it
> > > > >>> > > seems
> > > > >>> > > > the
> > > > >>> > > > > >> > goal
> > > > >>> > > > > >> > > of
> > > > >>> > > > > >> > > > > > this config is just to avoid old-versioned
> > > broker
> > > > >>> to not
> > > > >>> > > be
> > > > >>> > > > > >> able to
> > > > >>> > > > > >> > > > > > recognize newer versioned client. I think if
> > we
> > > > can
> > > > >>> do
> > > > >>> > > > something
> > > > >>> > > > > >> > else
> > > > >>> > > > > >> > > > to
> > > > >>> > > > > >> > > > > > avoid this config though, for example we can
> > use
> > > > the
> > > > >>> > > > embedded
> > > > >>> > > > > >> > > > AdminClient
> > > > >>> > > > > >> > > > > > to send the APIVersion request upon starting
> > up,
> > > > and
> > > > >>> > based
> > > > >>> > > > on
> > > > >>> > > > > >> the
> > > > >>> > > > > >> > > > > returned
> > > > >>> > > > > >> > > > > > value decides whether to go to the old code
> > path
> > > > or
> > > > >>> the
> > > > >>> > > new
> > > > >>> > > > > >> > behavior.
> > > > >>> > > > > >> > > > > > Admittedly asking a random broker about
> > > APIVersion
> > > > >>> does
> > > > >>> > > not
> > > > >>> > > > > >> > guarantee
> > > > >>> > > > > >> > > > the
> > > > >>> > > > > >> > > > > > whole cluster's versions, but what we can do
> > is
> > > to
> > > > >>> first
> > > > >>> > > 1)
> > > > >>> > > > find
> > > > >>> > > > > >> > the
> > > > >>> > > > > >> > > > > > coordinator (and if the random broker does
> not
> > > > even
> > > > >>> > > > recognize
> > > > >>> > > > > >> the
> > > > >>> > > > > >> > new
> > > > >>> > > > > >> > > > > > discover type, fall back to old path
> > directly),
> > > > and
> > > > >>> then
> > > > >>> > > 2)
> > > > >>> > > > ask
> > > > >>> > > > > >> the
> > > > >>> > > > > >> > > > > > discovered coordinator about its supported
> > > > >>> APIVersion.
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > The caveat here is that we have to make sure
> > both
> > > > the
> > > > >>> > group
> > > > >>> > > > > >> > coordinator
> > > > >>> > > > > >> > > > and
> > > > >>> > > > > >> > > > > transaction coordinator are on the latest
> > version
> > > > >>> during
> > > > >>> > > init
> > > > >>> > > > > >> stage.
> > > > >>> > > > > >> > > This
> > > > >>> > > > > >> > > > > is potentially doable as we only need a
> consumer
> > > > >>> group.id
> > > > >>> > > > > >> > > > > to check that. In the meantime, a hard-coded
> > > config
> > > > is
> > > > >>> > > still a
> > > > >>> > > > > >> > > favorable
> > > > >>> > > > > >> > > > > backup in case the server has downgraded, so
> you
> > > > will
> > > > >>> want
> > > > >>> > > to
> > > > >>> > > > use
> > > > >>> > > > > >> a
> > > > >>> > > > > >> > new
> > > > >>> > > > > >> > > > > version client without `consumer group`
> > > > transactional
> > > > >>> > > support.
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > 5. This is a meta question: have you
> considered
> > > how
> > > > >>> this
> > > > >>> > can
> > > > >>> > > > be
> > > > >>> > > > > >> > applied
> > > > >>> > > > > >> > > > to
> > > > >>> > > > > >> > > > > > Kafka Connect as well? For example, for
> source
> > > > >>> > connectors,
> > > > >>> > > > the
> > > > >>> > > > > >> > > > assignment
> > > > >>> > > > > >> > > > > > is not by "partitions", but by some other
> sort
> > > of
> > > > >>> > > > "resources"
> > > > >>> > > > > >> based
> > > > >>> > > > > >> > > on
> > > > >>> > > > > >> > > > > the
> > > > >>> > > > > >> > > > > > source systems, how KIP-447 would affect
> Kafka
> > > > >>> > Connectors
> > > > >>> > > > that
> > > > >>> > > > > >> > > > > implemented
> > > > >>> > > > > >> > > > > > EOS as well?
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > No, it's not currently included in the scope.
> > > Could
> > > > >>> you
> > > > >>> > > point
> > > > >>> > > > me
> > > > >>> > > > > >> to a
> > > > >>> > > > > >> > > > > sample source connector who uses EOS? Could
> > always
> > > > >>> > > piggy-back
> > > > >>> > > > into
> > > > >>> > > > > >> > the
> > > > >>> > > > > >> > > > > TxnProducerIdentity struct with more
> information
> > > > such
> > > > >>> as
> > > > >>> > > > tasks. If
> > > > >>> > > > > >> > > > > this is something to support in near term, an
> > > > abstract
> > > > >>> > type
> > > > >>> > > > called
> > > > >>> > > > > >> > > > > "Resource" could be provided and let topic
> > > partition
> > > > >>> and
> > > > >>> > > > connect
> > > > >>> > > > > >> task
> > > > >>> > > > > >> > > > > implement it.
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > > > Guozhang
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma
> <
> > > > >>> > > > ism...@juma.me.uk>
> > > > >>> > > > > >> > > wrote:
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > > > > Hi Boyang,
> > > > >>> > > > > >> > > > > > >
> > > > >>> > > > > >> > > > > > > Thanks for the KIP. It's good that we
> > listed a
> > > > >>> number
> > > > >>> > of
> > > > >>> > > > > >> rejected
> > > > >>> > > > > >> > > > > > > alternatives. It would be helpful to have
> an
> > > > >>> > explanation
> > > > >>> > > > of
> > > > >>> > > > > >> why
> > > > >>> > > > > >> > > they
> > > > >>> > > > > >> > > > > were
> > > > >>> > > > > >> > > > > > > rejected.
> > > > >>> > > > > >> > > > > > >
> > > > >>> > > > > >> > > > > > > Ismael
> > > > >>> > > > > >> > > > > > >
> > > > >>> > > > > >> > > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang
> Chen
> > <
> > > > >>> > > > > >> bche...@outlook.com
> > > > >>> > > > > >> > >
> > > > >>> > > > > >> > > > > wrote:
> > > > >>> > > > > >> > > > > > >
> > > > >>> > > > > >> > > > > > > > Hey all,
> > > > >>> > > > > >> > > > > > > >
> > > > >>> > > > > >> > > > > > > > I would like to start a discussion for
> > > > KIP-447:
> > > > >>> > > > > >> > > > > > > >
> > > > >>> > > > > >> > > > > > > >
> > > > >>> > > > > >> > > > > > >
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > >
> > > > >>> > > > > >> >
> > > > >>> > > > > >>
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > > >>> > > > > >> > > > > > > >
> > > > >>> > > > > >> > > > > > > > this is a work originated by Jason
> > Gustafson
> > > > >>> and we
> > > > >>> > > > would
> > > > >>> > > > > >> like
> > > > >>> > > > > >> > to
> > > > >>> > > > > >> > > > > > proceed
> > > > >>> > > > > >> > > > > > > > into discussion stage.
> > > > >>> > > > > >> > > > > > > >
> > > > >>> > > > > >> > > > > > > > Let me know your thoughts, thanks!
> > > > >>> > > > > >> > > > > > > >
> > > > >>> > > > > >> > > > > > > > Boyang
> > > > >>> > > > > >> > > > > > > >
> > > > >>> > > > > >> > > > > > >
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > > > --
> > > > >>> > > > > >> > > > > > -- Guozhang
> > > > >>> > > > > >> > > > > >
> > > > >>> > > > > >> > > > >
> > > > >>> > > > > >> > > >
> > > > >>> > > > > >> > >
> > > > >>> > > > > >> >
> > > > >>> > > > > >> >
> > > > >>> > > > > >> > --
> > > > >>> > > > > >> > -- Guozhang
> > > > >>> > > > > >> >
> > > > >>> > > > > >>
> > > > >>> > > > > >
> > > > >>> > > > >
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > > >>>
> > > > >>> --
> > > > >>> -- Guozhang
> > > > >>>
> > > > >>
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to