Hi Gwen,

Thanks for the feedback!

That's a great point; I've updated the motivation section with that

This enables safe "hard downgrades" of clusters where, instead of just
disabling exactly-once support on each worker, each worker is rolled back
to an earlier version of the Connect framework that doesn't support
per-connector offsets topics altogether. Those workers would go back to all
using a global offsets topic, and any offsets stored in per-connector
topics would be lost to those workers. This would cause a large number of
duplicates to flood the downstream system. While technically permissible
given that the user in this case will have knowingly switched to a version
of the Connect framework that doesn't support exactly-once source
connectors (and is therefore susceptible to duplicate delivery of records),
the user experience in this case could be pretty bad. A similar situation
is if users switch back from per-connector offsets topics to the global
offsets topic.
I've tried to make this more clear in the KIP by linking to the "Hard
downgrade" section from the proposed design, and by expanding on the
rationale provided for redundant global offset writes in the "Hard
downgrade" section. Let me know if you think this could be improved or
think a different approach is warranted.

I think the biggest difference between Connect and Streams comes from the
fact that Connect allows users to create connectors that target different
Kafka clusters on the same worker. This hasn't been a problem in the past
because workers use two separate producers to write offset data and source
connector records, but in order to write offsets and records in the same
transaction, it becomes necessary to use a single producer, which also
requires that the internal offsets topic be hosted on the same Kafka
cluster that the connector is targeting.
This may sound like a niche use case but it was actually one of the driving
factors behind KIP-458 (
and it's a feature that we rely on heavily today.
If there's too much going on in this KIP and we'd prefer to drop support
for running that type of setup with exactly-once source connectors for now,
I can propose this in a separate KIP. I figured it'd be best to get this
out of the way with the initial introduction of exactly-once source support
in order to make adoption by existing Connect users as seamless as
possible, and since we'd likely have to address this issue before being
able to utilize the feature ourselves.
I switched around the ordering of the "motivation" section for
per-connector offsets topics to put the biggest factor first, and called it
out as the major difference between Connect and Streams in this case.

Fair enough, after giving it a little more thought I agree that allowing
users to shoot themselves in the foot is a bad idea here. There's also some
precedent for handling this with the "enable.idempotence" and "
transactional.id" producer properties; if you specify a transactional ID
but don't specify a value for idempotence, the producer just does the right
thing for you by enabling idempotence and emitting a log message letting
you know that it's done so. I've adjusted the proposed behavior to try to
use a similar approach; let me know what you think.
There is the potential gap here where, sometime in the future, a third
accepted value for the "isolation.level" property is added to the consumer
API and users will be unable to use that new value for their worker. But
the likelihood of footgunning seems much greater than this scenario, and we
can always address expansions to the consumer API with changes to the
Connect framework as well if/when that becomes necessary.
I've also added a similar note to the source task's transactional ID
property; user overrides of it will also be disabled.

Yeah, that's mostly correct. I tried to touch on this in the "Motivation"
section with this bit:
    > The Connect framework periodically writes source task offsets to an
internal Kafka topic at a configurable interval, once the source records
that they correspond to have been successfully sent to Kafka.
I've expanded on this in the "Offset (and record) writes" section, and I've
tweaked the "Motivation" section a little bit to add a link to the relevant
config property and to make the language a little more accurate.

This isn't quite as bad as stop-the-world; more like stop-the-connector. If
a worker is running a dozen connectors and one of those (that happens to be
a source) is reconfigured, only the tasks for that connector will be
preemptively halted, and all other tasks and connectors will continue
running. This is pretty close to the current behavior with incremental
rebalancing; the only difference is that, instead of waiting for the
rebalance to complete before halting the tasks for that connector, the
worker will halt them in preparation for the rebalance. This increases the
time that the tasks will be down for, but is necessary if we want to give
tasks time to gracefully shut down before being fenced out by the leader,
and shouldn't impact availability too much as it's only triggered on
connector reconfiguration and rebalances generally don't take that long
with the new incremental protocol.

Ah, thanks for the catch! That's a good point. I've adjusted the proposal
to treat the worker's transactional ID like the consumer's isolation level
and the source task's transactional ID properties; can't be modified by
users and, if an attempt is made to do so, will be discarded after logging
a message to that effect. More footguns removed, hooray!

Thanks again for taking a look; if you have any other questions or
suggestions I'm all ears.



p.s. - Guozhang--I saw your email; response to you coming next :)

On Sun, Feb 21, 2021 at 4:12 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Chris,
> Thanks for the great write-up! I was mainly reviewing the admin
> fenceProducers API of the KIP. I think it makes sense overall. I'm just
> wondering if we can go one step further, that instead of forcing to fence
> out all producers of the previous generation, could we try to achieve
> "partial rebalance" still by first generate the new assignment, and then
> based on the new assignment only fence out producers involved in tasks that
> are indeed migrated? Just a wild thought to bring up for debate.
> Guozhang
> On Sat, Feb 20, 2021 at 10:20 PM Gwen Shapira <g...@confluent.io> wrote:
> > Hey Chris,
> >
> > Thank you for the proposal. Few questions / comments:
> >
> > 0. It may be worth pointing out in the motivation section that
> > source-connector exactly once is more important than sink connector
> > exactly once, since many target systems will have unique key
> > constraint mechanisms that will prevent duplicates. Kafka does not
> > have any such constraints, so without this KIP-618, exactly once won't
> > be possible.
> > 1. I am not clear why we need the worker to async copy offsets from
> > the connector-specific offset topic to a global offsets topic
> > 2. While the reasoning you have for offset topic per connector appears
> > sound, it doesn't add up with the use of transactions in KafkaStreams.
> > My understanding is that KafkaStreams uses shared offsets topic with
> > all the other consumers, and (apparently) corrupting data and delays
> > by other tenants is a non-issue. Perhaps you can comment on how
> > Connect is different? In general much of the complexity in the KIP is
> > related to the separate offset topic, and I'm wondering if this can be
> > avoided. The migration use-case is interesting, but not related to
> > exactly-once and can be handled separately.
> > 3. Allowing users to override the isolation level for the offset
> > reader, even when exactly-once is enabled, thereby disabling
> > exactly-once in a non-obvious way. I get that connect usually allows
> > users to shoot themselves in the foot, but are there any actual
> > benefits for allowing it in this case? Maybe it is better if we don't?
> > I don't find the argument that we always did this to be particularly
> > compelling.
> > 4. It isn't stated explicitly, but it sounds like connect or source
> > connectors already have some batching mechanism, and that transaction
> > boundaries will match the batches (i.e. each batch will be a
> > transaction?). If so, worth being explicit.
> > 5. "When a rebalance is triggered, before (re-)joining the cluster
> > group, all workers will preemptively stop all tasks of all source
> > connectors for which task configurations are present in the config
> > topic after the latest task count record" - how will this play with
> > the incremental rebalances? isn't this exactly the stop-the-world
> > rebalance we want to avoid?
> > 6. "the worker will instantiate a transactional producer whose
> > transactional ID is, by default, the group ID of the cluster (but may
> > be overwritten by users using the transactional.id worker property)" -
> > If users change transactional.id property, zombie leaders won't get
> > fenced (since they will have an older and different transactional id)
> >
> > Thanks,
> >
> > Gwen
> >
> > On Thu, May 21, 2020 at 11:21 PM Chris Egerton <chr...@confluent.io>
> > wrote:
> > >
> > > Hi all,
> > >
> > > I know it's a busy time with the upcoming 2.6 release and I don't
> expect
> > > this to get a lot of traction until that's done, but I've published a
> > > for allowing atomic commit of offsets and records for source connectors
> > and
> > > would appreciate your feedback:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> > >
> > > This feature should make it possible to implement source connectors
> with
> > > exactly-once delivery guarantees, and even allow a wide range of
> existing
> > > source connectors to provide exactly-once delivery guarantees with no
> > > changes required.
> > >
> > > Cheers,
> > >
> > > Chris
> >
> >
> >
> > --
> > Gwen Shapira
> > Engineering Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
> >
> --
> -- Guozhang

Reply via email to