Hi Gwen, Thanks for the feedback!
0. That's a great point; I've updated the motivation section with that rationale. 1. This enables safe "hard downgrades" of clusters where, instead of just disabling exactly-once support on each worker, each worker is rolled back to an earlier version of the Connect framework that doesn't support per-connector offsets topics altogether. Those workers would go back to all using a global offsets topic, and any offsets stored in per-connector topics would be lost to those workers. This would cause a large number of duplicates to flood the downstream system. While technically permissible given that the user in this case will have knowingly switched to a version of the Connect framework that doesn't support exactly-once source connectors (and is therefore susceptible to duplicate delivery of records), the user experience in this case could be pretty bad. A similar situation is if users switch back from per-connector offsets topics to the global offsets topic. I've tried to make this more clear in the KIP by linking to the "Hard downgrade" section from the proposed design, and by expanding on the rationale provided for redundant global offset writes in the "Hard downgrade" section. Let me know if you think this could be improved or think a different approach is warranted. 2. I think the biggest difference between Connect and Streams comes from the fact that Connect allows users to create connectors that target different Kafka clusters on the same worker. This hasn't been a problem in the past because workers use two separate producers to write offset data and source connector records, but in order to write offsets and records in the same transaction, it becomes necessary to use a single producer, which also requires that the internal offsets topic be hosted on the same Kafka cluster that the connector is targeting. This may sound like a niche use case but it was actually one of the driving factors behind KIP-458 ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy), and it's a feature that we rely on heavily today. If there's too much going on in this KIP and we'd prefer to drop support for running that type of setup with exactly-once source connectors for now, I can propose this in a separate KIP. I figured it'd be best to get this out of the way with the initial introduction of exactly-once source support in order to make adoption by existing Connect users as seamless as possible, and since we'd likely have to address this issue before being able to utilize the feature ourselves. I switched around the ordering of the "motivation" section for per-connector offsets topics to put the biggest factor first, and called it out as the major difference between Connect and Streams in this case. 3. Fair enough, after giving it a little more thought I agree that allowing users to shoot themselves in the foot is a bad idea here. There's also some precedent for handling this with the "enable.idempotence" and " transactional.id" producer properties; if you specify a transactional ID but don't specify a value for idempotence, the producer just does the right thing for you by enabling idempotence and emitting a log message letting you know that it's done so. I've adjusted the proposed behavior to try to use a similar approach; let me know what you think. There is the potential gap here where, sometime in the future, a third accepted value for the "isolation.level" property is added to the consumer API and users will be unable to use that new value for their worker. But the likelihood of footgunning seems much greater than this scenario, and we can always address expansions to the consumer API with changes to the Connect framework as well if/when that becomes necessary. I've also added a similar note to the source task's transactional ID property; user overrides of it will also be disabled. 4. Yeah, that's mostly correct. I tried to touch on this in the "Motivation" section with this bit: > The Connect framework periodically writes source task offsets to an internal Kafka topic at a configurable interval, once the source records that they correspond to have been successfully sent to Kafka. I've expanded on this in the "Offset (and record) writes" section, and I've tweaked the "Motivation" section a little bit to add a link to the relevant config property and to make the language a little more accurate. 5. This isn't quite as bad as stop-the-world; more like stop-the-connector. If a worker is running a dozen connectors and one of those (that happens to be a source) is reconfigured, only the tasks for that connector will be preemptively halted, and all other tasks and connectors will continue running. This is pretty close to the current behavior with incremental rebalancing; the only difference is that, instead of waiting for the rebalance to complete before halting the tasks for that connector, the worker will halt them in preparation for the rebalance. This increases the time that the tasks will be down for, but is necessary if we want to give tasks time to gracefully shut down before being fenced out by the leader, and shouldn't impact availability too much as it's only triggered on connector reconfiguration and rebalances generally don't take that long with the new incremental protocol. 6. Ah, thanks for the catch! That's a good point. I've adjusted the proposal to treat the worker's transactional ID like the consumer's isolation level and the source task's transactional ID properties; can't be modified by users and, if an attempt is made to do so, will be discarded after logging a message to that effect. More footguns removed, hooray! Thanks again for taking a look; if you have any other questions or suggestions I'm all ears. Cheers, Chris p.s. - Guozhang--I saw your email; response to you coming next :) On Sun, Feb 21, 2021 at 4:12 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Chris, > > Thanks for the great write-up! I was mainly reviewing the admin > fenceProducers API of the KIP. I think it makes sense overall. I'm just > wondering if we can go one step further, that instead of forcing to fence > out all producers of the previous generation, could we try to achieve > "partial rebalance" still by first generate the new assignment, and then > based on the new assignment only fence out producers involved in tasks that > are indeed migrated? Just a wild thought to bring up for debate. > > Guozhang > > > > On Sat, Feb 20, 2021 at 10:20 PM Gwen Shapira <g...@confluent.io> wrote: > > > Hey Chris, > > > > Thank you for the proposal. Few questions / comments: > > > > 0. It may be worth pointing out in the motivation section that > > source-connector exactly once is more important than sink connector > > exactly once, since many target systems will have unique key > > constraint mechanisms that will prevent duplicates. Kafka does not > > have any such constraints, so without this KIP-618, exactly once won't > > be possible. > > 1. I am not clear why we need the worker to async copy offsets from > > the connector-specific offset topic to a global offsets topic > > 2. While the reasoning you have for offset topic per connector appears > > sound, it doesn't add up with the use of transactions in KafkaStreams. > > My understanding is that KafkaStreams uses shared offsets topic with > > all the other consumers, and (apparently) corrupting data and delays > > by other tenants is a non-issue. Perhaps you can comment on how > > Connect is different? In general much of the complexity in the KIP is > > related to the separate offset topic, and I'm wondering if this can be > > avoided. The migration use-case is interesting, but not related to > > exactly-once and can be handled separately. > > 3. Allowing users to override the isolation level for the offset > > reader, even when exactly-once is enabled, thereby disabling > > exactly-once in a non-obvious way. I get that connect usually allows > > users to shoot themselves in the foot, but are there any actual > > benefits for allowing it in this case? Maybe it is better if we don't? > > I don't find the argument that we always did this to be particularly > > compelling. > > 4. It isn't stated explicitly, but it sounds like connect or source > > connectors already have some batching mechanism, and that transaction > > boundaries will match the batches (i.e. each batch will be a > > transaction?). If so, worth being explicit. > > 5. "When a rebalance is triggered, before (re-)joining the cluster > > group, all workers will preemptively stop all tasks of all source > > connectors for which task configurations are present in the config > > topic after the latest task count record" - how will this play with > > the incremental rebalances? isn't this exactly the stop-the-world > > rebalance we want to avoid? > > 6. "the worker will instantiate a transactional producer whose > > transactional ID is, by default, the group ID of the cluster (but may > > be overwritten by users using the transactional.id worker property)" - > > If users change transactional.id property, zombie leaders won't get > > fenced (since they will have an older and different transactional id) > > > > Thanks, > > > > Gwen > > > > On Thu, May 21, 2020 at 11:21 PM Chris Egerton <chr...@confluent.io> > > wrote: > > > > > > Hi all, > > > > > > I know it's a busy time with the upcoming 2.6 release and I don't > expect > > > this to get a lot of traction until that's done, but I've published a > KIP > > > for allowing atomic commit of offsets and records for source connectors > > and > > > would appreciate your feedback: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets > > > > > > This feature should make it possible to implement source connectors > with > > > exactly-once delivery guarantees, and even allow a wide range of > existing > > > source connectors to provide exactly-once delivery guarantees with no > > > changes required. > > > > > > Cheers, > > > > > > Chris > > > > > > > > -- > > Gwen Shapira > > Engineering Manager | Confluent > > 650.450.2760 | @gwenshap > > Follow us: Twitter | blog > > > > > -- > -- Guozhang >