Hi Boyang, Thanks for picking this up! Still reading through the updates, but here are a couple initial comments on the APIs:
1. The `TxnProducerIdentity` class is a bit awkward. I think we are trying to encapsulate state from the current group assignment. Maybe something like `ConsumerAssignment` would be clearer? If we make the usage consistent across the consumer and producer, then we can avoid exposing internal state like the generationId. For example: // Public API interface ConsumerAssignment { Set<TopicPartition> partittions(); } // Not a public API class InternalConsumerAssignment implements ConsumerAssignment { Set<TopicPartition> partittions; int generationId; } Then we can change the rebalance listener to something like this: onPartitionsAssigned(ConsumerAssignment assignment) And on the producer: void initTransactions(String groupId, ConsumerAssignment assignment); 2. Another bit of awkwardness is the fact that we have to pass the groupId through both initTransactions() and sendOffsetsToTransaction(). We could consider a config instead. Maybe something like `transactional.group.id`? Then we could simplify the producer APIs, potentially even deprecating the current sendOffsetsToTransaction. In fact, for this new usage, the ` transational.id` config is not needed. It would be nice if we don't have to provide it. By the way, I'm a bit confused about discussion above about colocating the txn and group coordinators. That is not actually necessary, is it? Thanks, Jason On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen <reluctanthero...@gmail.com> wrote: > Thank you Ismael for the suggestion. We will attempt to address it by > giving more details to rejected alternative section. > > > Thank you for the comment Guozhang! Answers are inline below. > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Boyang, > > > > Thanks for the KIP, I have some comments below: > > > > 1. "Once transactions are complete, the call will return." This seems > > different from the existing behavior, in which we would return a > retriable > > CONCURRENT_TRANSACTIONS and let the client to retry, is this intentional? > > > > I don’t think it is intentional, and I will defer this question to Jason > when he got time to answer since from what I understood retry and on hold > seem both valid approaches. > > > > 2. "an overload to onPartitionsAssigned in the consumer's rebalance > > listener interface": as part of KIP-341 we've already add this > information > > to the onAssignment callback. Would this be sufficient? Or more generally > > speaking, which information have to be passed around in rebalance > callback > > while others can be passed around in PartitionAssignor callback? In > Streams > > for example both callbacks are used but most critical information is > passed > > via onAssignment. > > > > We still need to extend ConsumerRebalanceListener because it’s the > interface we could have public access to. The #onAssignment call is defined > on PartitionAssignor level which is not easy to work with external > producers. > > > > 3. "We propose to use a separate record type in order to store the group > > assignment.": hmm, I thought with the third typed FindCoordinator, the > same > > broker that act as the consumer coordinator would always be selected as > > the txn coordinator, in which case it can access its local cache > metadata / > > offset topic to get this information already? We just need to think about > > how to make these two modules directly exchange information without > messing > > up the code hierarchy. > > > > These two coordinators will be on the same broker only when number of > partitions for transaction state topic and consumer offset topic are the > same. This normally holds true, but I'm afraid > we couldn't make this assumption? > > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it seems the goal of > > this config is just to avoid old-versioned broker to not be able to > > recognize newer versioned client. I think if we can do something else to > > avoid this config though, for example we can use the embedded AdminClient > > to send the APIVersion request upon starting up, and based on the > returned > > value decides whether to go to the old code path or the new behavior. > > Admittedly asking a random broker about APIVersion does not guarantee the > > whole cluster's versions, but what we can do is to first 1) find the > > coordinator (and if the random broker does not even recognize the new > > discover type, fall back to old path directly), and then 2) ask the > > discovered coordinator about its supported APIVersion. > > > > The caveat here is that we have to make sure both the group coordinator and > transaction coordinator are on the latest version during init stage. This > is potentially doable as we only need a consumer group.id > to check that. In the meantime, a hard-coded config is still a favorable > backup in case the server has downgraded, so you will want to use a new > version client without `consumer group` transactional support. > > 5. This is a meta question: have you considered how this can be applied to > > Kafka Connect as well? For example, for source connectors, the assignment > > is not by "partitions", but by some other sort of "resources" based on > the > > source systems, how KIP-447 would affect Kafka Connectors that > implemented > > EOS as well? > > > > No, it's not currently included in the scope. Could you point me to a > sample source connector who uses EOS? Could always piggy-back into the > TxnProducerIdentity struct with more information such as tasks. If > this is something to support in near term, an abstract type called > "Resource" could be provided and let topic partition and connect task > implement it. > > > > > > Guozhang > > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma <ism...@juma.me.uk> wrote: > > > > > Hi Boyang, > > > > > > Thanks for the KIP. It's good that we listed a number of rejected > > > alternatives. It would be helpful to have an explanation of why they > were > > > rejected. > > > > > > Ismael > > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen <bche...@outlook.com> > wrote: > > > > > > > Hey all, > > > > > > > > I would like to start a discussion for KIP-447: > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics > > > > > > > > this is a work originated by Jason Gustafson and we would like to > > proceed > > > > into discussion stage. > > > > > > > > Let me know your thoughts, thanks! > > > > > > > > Boyang > > > > > > > > > > > > > -- > > -- Guozhang > > >