Hi Boyang,

Thanks for picking this up! Still reading through the updates, but here are
a couple initial comments on the APIs:

1. The `TxnProducerIdentity` class is a bit awkward. I think we are trying
to encapsulate state from the current group assignment. Maybe something
like `ConsumerAssignment` would be clearer? If we make the usage consistent
across the consumer and producer, then we can avoid exposing internal state
like the generationId.

For example:

// Public API
interface ConsumerAssignment {
  Set<TopicPartition> partittions();
}

// Not a public API
class InternalConsumerAssignment implements ConsumerAssignment {
  Set<TopicPartition> partittions;
  int generationId;
}

Then we can change the rebalance listener to something like this:
onPartitionsAssigned(ConsumerAssignment assignment)

And on the producer:
void initTransactions(String groupId, ConsumerAssignment assignment);

2. Another bit of awkwardness is the fact that we have to pass the groupId
through both initTransactions() and sendOffsetsToTransaction(). We could
consider a config instead. Maybe something like `transactional.group.id`?
Then we could simplify the producer APIs, potentially even deprecating the
current sendOffsetsToTransaction. In fact, for this new usage, the `
transational.id` config is not needed. It would be nice if we don't have to
provide it.

By the way, I'm a bit confused about discussion above about colocating the
txn and group coordinators. That is not actually necessary, is it?

Thanks,
Jason

On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Thank you Ismael for the suggestion. We will attempt to address it by
> giving more details to rejected alternative section.
>
>
> Thank you for the comment Guozhang! Answers are inline below.
>
>
>
> On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Boyang,
> >
> > Thanks for the KIP, I have some comments below:
> >
> > 1. "Once transactions are complete, the call will return." This seems
> > different from the existing behavior, in which we would return a
> retriable
> > CONCURRENT_TRANSACTIONS and let the client to retry, is this intentional?
> >
>
> I don’t think it is intentional, and I will defer this question to Jason
> when he got time to answer since from what I understood retry and on hold
> seem both valid approaches.
>
>
> > 2. "an overload to onPartitionsAssigned in the consumer's rebalance
> > listener interface": as part of KIP-341 we've already add this
> information
> > to the onAssignment callback. Would this be sufficient? Or more generally
> > speaking, which information have to be passed around in rebalance
> callback
> > while others can be passed around in PartitionAssignor callback? In
> Streams
> > for example both callbacks are used but most critical information is
> passed
> > via onAssignment.
> >
>
> We still need to extend ConsumerRebalanceListener because it’s the
> interface we could have public access to. The #onAssignment call is defined
> on PartitionAssignor level which is not easy to work with external
> producers.
>
>
> > 3. "We propose to use a separate record type in order to store the group
> > assignment.": hmm, I thought with the third typed FindCoordinator, the
> same
> > broker that act as the  consumer coordinator would always be selected as
> > the txn coordinator, in which case it can access its local cache
> metadata /
> > offset topic to get this information already? We just need to think about
> > how to make these two modules directly exchange information without
> messing
> > up the code hierarchy.
> >
>
> These two coordinators will be on the same broker only when number of
> partitions for transaction state topic and consumer offset topic are the
> same. This normally holds true, but I'm afraid
> we couldn't make this assumption?
>
> 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it seems the goal of
> > this config is just to avoid old-versioned broker to not be able to
> > recognize newer versioned client. I think if we can do something else to
> > avoid this config though, for example we can use the embedded AdminClient
> > to send the APIVersion request upon starting up, and based on the
> returned
> > value decides whether to go to the old code path or the new behavior.
> > Admittedly asking a random broker about APIVersion does not guarantee the
> > whole cluster's versions, but what we can do is to first 1) find the
> > coordinator (and if the random broker does not even recognize the new
> > discover type, fall back to old path directly), and then 2) ask the
> > discovered coordinator about its supported APIVersion.
> >
>
> The caveat here is that we have to make sure both the group coordinator and
> transaction coordinator are on the latest version during init stage. This
> is potentially doable as we only need a consumer group.id
> to check that. In the meantime, a hard-coded config is still a favorable
> backup in case the server has downgraded, so you will want to use a new
> version client without `consumer group` transactional support.
>
> 5. This is a meta question: have you considered how this can be applied to
> > Kafka Connect as well? For example, for source connectors, the assignment
> > is not by "partitions", but by some other sort of "resources" based on
> the
> > source systems, how KIP-447 would affect Kafka Connectors that
> implemented
> > EOS as well?
> >
>
> No, it's not currently included in the scope. Could you point me to a
> sample source connector who uses EOS? Could always piggy-back into the
> TxnProducerIdentity struct with more information such as tasks. If
> this is something to support in near term, an abstract type called
> "Resource" could be provided and let topic partition and connect task
> implement it.
>
>
> >
> > Guozhang
> >
> >
> > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma <ism...@juma.me.uk> wrote:
> >
> > > Hi Boyang,
> > >
> > > Thanks for the KIP. It's good that we listed a number of rejected
> > > alternatives. It would be helpful to have an explanation of why they
> were
> > > rejected.
> > >
> > > Ismael
> > >
> > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen <bche...@outlook.com>
> wrote:
> > >
> > > > Hey all,
> > > >
> > > > I would like to start a discussion for KIP-447:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > >
> > > > this is a work originated by Jason Gustafson and we would like to
> > proceed
> > > > into discussion stage.
> > > >
> > > > Let me know your thoughts, thanks!
> > > >
> > > > Boyang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to