Hey Jason,

thank you for the proposal here. Some of my thoughts below.

On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson <ja...@confluent.io> wrote:

> Hi Boyang,
>
> Thanks for picking this up! Still reading through the updates, but here are
> a couple initial comments on the APIs:
>
> 1. The `TxnProducerIdentity` class is a bit awkward. I think we are trying
> to encapsulate state from the current group assignment. Maybe something
> like `ConsumerAssignment` would be clearer? If we make the usage consistent
> across the consumer and producer, then we can avoid exposing internal state
> like the generationId.
>
> For example:
>
> // Public API
> interface ConsumerAssignment {
>   Set<TopicPartition> partittions();
> }
>
> // Not a public API
> class InternalConsumerAssignment implements ConsumerAssignment {
>   Set<TopicPartition> partittions;
>   int generationId;
> }
>
> Then we can change the rebalance listener to something like this:
> onPartitionsAssigned(ConsumerAssignment assignment)
>
> And on the producer:
> void initTransactions(String groupId, ConsumerAssignment assignment);
>
> 2. Another bit of awkwardness is the fact that we have to pass the groupId
> through both initTransactions() and sendOffsetsToTransaction(). We could
> consider a config instead. Maybe something like `transactional.group.id`?
> Then we could simplify the producer APIs, potentially even deprecating the
> current sendOffsetsToTransaction. In fact, for this new usage, the `
> transational.id` config is not needed. It would be nice if we don't have
> to
> provide it.
>
> I like the idea of combining 1 and 2. We could definitely pass in a
group.id config
so that we could avoid exposing that information in a public API. The
question I have
is that whether we should name the interface `GroupAssignment` instead, so
that Connect later
could also extend on the same interface, just to echo Guozhang's point
here, Also the base interface
is better to be defined empty for easy extension, or define an abstract
type called `Resource` to be shareable
later IMHO.


> By the way, I'm a bit confused about discussion above about colocating the
> txn and group coordinators. That is not actually necessary, is it?
>
> Yes, this is not a requirement for this KIP, because it is inherently
impossible to
achieve co-locating  topic partition of transaction log and consumed offset
topics.


> Thanks,
> Jason
>
On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen <reluctanthero...@gmail.com>
> wrote:
>
> > Thank you Ismael for the suggestion. We will attempt to address it by
> > giving more details to rejected alternative section.
> >
> >
> > Thank you for the comment Guozhang! Answers are inline below.
> >
> >
> >
> > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Boyang,
> > >
> > > Thanks for the KIP, I have some comments below:
> > >
> > > 1. "Once transactions are complete, the call will return." This seems
> > > different from the existing behavior, in which we would return a
> > retriable
> > > CONCURRENT_TRANSACTIONS and let the client to retry, is this
> intentional?
> > >
> >
> > I don’t think it is intentional, and I will defer this question to Jason
> > when he got time to answer since from what I understood retry and on hold
> > seem both valid approaches.
> >
> >
> > > 2. "an overload to onPartitionsAssigned in the consumer's rebalance
> > > listener interface": as part of KIP-341 we've already add this
> > information
> > > to the onAssignment callback. Would this be sufficient? Or more
> generally
> > > speaking, which information have to be passed around in rebalance
> > callback
> > > while others can be passed around in PartitionAssignor callback? In
> > Streams
> > > for example both callbacks are used but most critical information is
> > passed
> > > via onAssignment.
> > >
> >
> > We still need to extend ConsumerRebalanceListener because it’s the
> > interface we could have public access to. The #onAssignment call is
> defined
> > on PartitionAssignor level which is not easy to work with external
> > producers.
> >
> >
> > > 3. "We propose to use a separate record type in order to store the
> group
> > > assignment.": hmm, I thought with the third typed FindCoordinator, the
> > same
> > > broker that act as the  consumer coordinator would always be selected
> as
> > > the txn coordinator, in which case it can access its local cache
> > metadata /
> > > offset topic to get this information already? We just need to think
> about
> > > how to make these two modules directly exchange information without
> > messing
> > > up the code hierarchy.
> > >
> >
> > These two coordinators will be on the same broker only when number of
> > partitions for transaction state topic and consumer offset topic are the
> > same. This normally holds true, but I'm afraid
> > we couldn't make this assumption?
> >
> > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it seems the goal of
> > > this config is just to avoid old-versioned broker to not be able to
> > > recognize newer versioned client. I think if we can do something else
> to
> > > avoid this config though, for example we can use the embedded
> AdminClient
> > > to send the APIVersion request upon starting up, and based on the
> > returned
> > > value decides whether to go to the old code path or the new behavior.
> > > Admittedly asking a random broker about APIVersion does not guarantee
> the
> > > whole cluster's versions, but what we can do is to first 1) find the
> > > coordinator (and if the random broker does not even recognize the new
> > > discover type, fall back to old path directly), and then 2) ask the
> > > discovered coordinator about its supported APIVersion.
> > >
> >
> > The caveat here is that we have to make sure both the group coordinator
> and
> > transaction coordinator are on the latest version during init stage. This
> > is potentially doable as we only need a consumer group.id
> > to check that. In the meantime, a hard-coded config is still a favorable
> > backup in case the server has downgraded, so you will want to use a new
> > version client without `consumer group` transactional support.
> >
> > 5. This is a meta question: have you considered how this can be applied
> to
> > > Kafka Connect as well? For example, for source connectors, the
> assignment
> > > is not by "partitions", but by some other sort of "resources" based on
> > the
> > > source systems, how KIP-447 would affect Kafka Connectors that
> > implemented
> > > EOS as well?
> > >
> >
> > No, it's not currently included in the scope. Could you point me to a
> > sample source connector who uses EOS? Could always piggy-back into the
> > TxnProducerIdentity struct with more information such as tasks. If
> > this is something to support in near term, an abstract type called
> > "Resource" could be provided and let topic partition and connect task
> > implement it.
> >
> >
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma <ism...@juma.me.uk> wrote:
> > >
> > > > Hi Boyang,
> > > >
> > > > Thanks for the KIP. It's good that we listed a number of rejected
> > > > alternatives. It would be helpful to have an explanation of why they
> > were
> > > > rejected.
> > > >
> > > > Ismael
> > > >
> > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen <bche...@outlook.com>
> > wrote:
> > > >
> > > > > Hey all,
> > > > >
> > > > > I would like to start a discussion for KIP-447:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > > >
> > > > > this is a work originated by Jason Gustafson and we would like to
> > > proceed
> > > > > into discussion stage.
> > > > >
> > > > > Let me know your thoughts, thanks!
> > > > >
> > > > > Boyang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to