Thanks Chris, yeah I think I agree with you that this does not necessarily
have to be in the scope of this KIP.

My understanding was that the source partitions -> tasks are not static but
dynamic, but they are only changed when either the number of partitions
changed or "tasks.max" config changed (please correct me if I'm wrong), so
what I'm thinking that we can try to detect if either of these things
happens, and if they do not happen we can assume the mapping from
partitions -> tasks does not change --- of course this requires some
extension on the API, aligned with what you said. I would like to make sure
that my understanding here is correct :)

Guozhang


On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton <chr...@confluent.io> wrote:

> Hi Guozhang,
>
> Thanks for taking a look, and for your suggestion!
>
> I think there is room for more intelligent fencing strategies, but I think
> that it'd have to be more nuanced than one based on task->worker
> assignments. Connectors can arbitrarily reassign source partitions (such as
> database tables, or Kafka topic partitions) across tasks, so even if the
> assignment of tasks across workers remains unchanged, the assignment of
> source partitions across those workers might. Connect doesn't do any
> inspection of task configurations at the moment, and without expansions to
> the Connector/Task API, it'd likely be impossible to get information from
> tasks about their source partition assignments. With that in mind, I think
> we may want to leave the door open for more intelligent task fencing but
> not include that level of optimization at this stage. Does that sound fair
> to you?
>
> There is one case that I've identified where we can cheaply optimize right
> now: single-task connectors, such as the Debezium CDC source connectors. If
> a connector is configured at some point with a single task, then some other
> part of its configuration is altered but the single-task aspect remains,
> the leader doesn't have to worry about fencing out the older task as the
> new task's producer will do that automatically. In this case, the leader
> can skip the producer fencing round and just write the new task count
> record straight to the config topic. I've added this case to the KIP; if it
> overcomplicates things I'm happy to remove it, but seeing as it serves a
> real use case and comes fairly cheap, I figured it'd be best to include
> now.
>
> Thanks again for your feedback; if you have other thoughts I'd love to hear
> them!
>
> Cheers,
>
> Chris
>
> On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton <chr...@confluent.io> wrote:
>
> > Hi Gwen,
> >
> > Thanks for the feedback!
> >
> > 0.
> > That's a great point; I've updated the motivation section with that
> > rationale.
> >
> > 1.
> > This enables safe "hard downgrades" of clusters where, instead of just
> > disabling exactly-once support on each worker, each worker is rolled back
> > to an earlier version of the Connect framework that doesn't support
> > per-connector offsets topics altogether. Those workers would go back to
> all
> > using a global offsets topic, and any offsets stored in per-connector
> > topics would be lost to those workers. This would cause a large number of
> > duplicates to flood the downstream system. While technically permissible
> > given that the user in this case will have knowingly switched to a
> version
> > of the Connect framework that doesn't support exactly-once source
> > connectors (and is therefore susceptible to duplicate delivery of
> records),
> > the user experience in this case could be pretty bad. A similar situation
> > is if users switch back from per-connector offsets topics to the global
> > offsets topic.
> > I've tried to make this more clear in the KIP by linking to the "Hard
> > downgrade" section from the proposed design, and by expanding on the
> > rationale provided for redundant global offset writes in the "Hard
> > downgrade" section. Let me know if you think this could be improved or
> > think a different approach is warranted.
> >
> > 2.
> > I think the biggest difference between Connect and Streams comes from the
> > fact that Connect allows users to create connectors that target different
> > Kafka clusters on the same worker. This hasn't been a problem in the past
> > because workers use two separate producers to write offset data and
> source
> > connector records, but in order to write offsets and records in the same
> > transaction, it becomes necessary to use a single producer, which also
> > requires that the internal offsets topic be hosted on the same Kafka
> > cluster that the connector is targeting.
> > This may sound like a niche use case but it was actually one of the
> > driving factors behind KIP-458 (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy
> ),
> > and it's a feature that we rely on heavily today.
> > If there's too much going on in this KIP and we'd prefer to drop support
> > for running that type of setup with exactly-once source connectors for
> now,
> > I can propose this in a separate KIP. I figured it'd be best to get this
> > out of the way with the initial introduction of exactly-once source
> support
> > in order to make adoption by existing Connect users as seamless as
> > possible, and since we'd likely have to address this issue before being
> > able to utilize the feature ourselves.
> > I switched around the ordering of the "motivation" section for
> > per-connector offsets topics to put the biggest factor first, and called
> it
> > out as the major difference between Connect and Streams in this case.
> >
> > 3.
> > Fair enough, after giving it a little more thought I agree that allowing
> > users to shoot themselves in the foot is a bad idea here. There's also
> some
> > precedent for handling this with the "enable.idempotence" and "
> > transactional.id" producer properties; if you specify a transactional ID
> > but don't specify a value for idempotence, the producer just does the
> right
> > thing for you by enabling idempotence and emitting a log message letting
> > you know that it's done so. I've adjusted the proposed behavior to try to
> > use a similar approach; let me know what you think.
> > There is the potential gap here where, sometime in the future, a third
> > accepted value for the "isolation.level" property is added to the
> consumer
> > API and users will be unable to use that new value for their worker. But
> > the likelihood of footgunning seems much greater than this scenario, and
> we
> > can always address expansions to the consumer API with changes to the
> > Connect framework as well if/when that becomes necessary.
> > I've also added a similar note to the source task's transactional ID
> > property; user overrides of it will also be disabled.
> >
> > 4.
> > Yeah, that's mostly correct. I tried to touch on this in the "Motivation"
> > section with this bit:
> >     > The Connect framework periodically writes source task offsets to an
> > internal Kafka topic at a configurable interval, once the source records
> > that they correspond to have been successfully sent to Kafka.
> > I've expanded on this in the "Offset (and record) writes" section, and
> > I've tweaked the "Motivation" section a little bit to add a link to the
> > relevant config property and to make the language a little more accurate.
> >
> > 5.
> > This isn't quite as bad as stop-the-world; more like stop-the-connector.
> > If a worker is running a dozen connectors and one of those (that happens
> to
> > be a source) is reconfigured, only the tasks for that connector will be
> > preemptively halted, and all other tasks and connectors will continue
> > running. This is pretty close to the current behavior with incremental
> > rebalancing; the only difference is that, instead of waiting for the
> > rebalance to complete before halting the tasks for that connector, the
> > worker will halt them in preparation for the rebalance. This increases
> the
> > time that the tasks will be down for, but is necessary if we want to give
> > tasks time to gracefully shut down before being fenced out by the leader,
> > and shouldn't impact availability too much as it's only triggered on
> > connector reconfiguration and rebalances generally don't take that long
> > with the new incremental protocol.
> >
> > 6.
> > Ah, thanks for the catch! That's a good point. I've adjusted the proposal
> > to treat the worker's transactional ID like the consumer's isolation
> level
> > and the source task's transactional ID properties; can't be modified by
> > users and, if an attempt is made to do so, will be discarded after
> logging
> > a message to that effect. More footguns removed, hooray!
> >
> > Thanks again for taking a look; if you have any other questions or
> > suggestions I'm all ears.
> >
> > Cheers,
> >
> > Chris
> >
> > p.s. - Guozhang--I saw your email; response to you coming next :)
> >
> > On Sun, Feb 21, 2021 at 4:12 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hello Chris,
> >>
> >> Thanks for the great write-up! I was mainly reviewing the admin
> >> fenceProducers API of the KIP. I think it makes sense overall. I'm just
> >> wondering if we can go one step further, that instead of forcing to
> fence
> >> out all producers of the previous generation, could we try to achieve
> >> "partial rebalance" still by first generate the new assignment, and then
> >> based on the new assignment only fence out producers involved in tasks
> >> that
> >> are indeed migrated? Just a wild thought to bring up for debate.
> >>
> >> Guozhang
> >>
> >>
> >>
> >> On Sat, Feb 20, 2021 at 10:20 PM Gwen Shapira <g...@confluent.io>
> wrote:
> >>
> >> > Hey Chris,
> >> >
> >> > Thank you for the proposal. Few questions / comments:
> >> >
> >> > 0. It may be worth pointing out in the motivation section that
> >> > source-connector exactly once is more important than sink connector
> >> > exactly once, since many target systems will have unique key
> >> > constraint mechanisms that will prevent duplicates. Kafka does not
> >> > have any such constraints, so without this KIP-618, exactly once won't
> >> > be possible.
> >> > 1. I am not clear why we need the worker to async copy offsets from
> >> > the connector-specific offset topic to a global offsets topic
> >> > 2. While the reasoning you have for offset topic per connector appears
> >> > sound, it doesn't add up with the use of transactions in KafkaStreams.
> >> > My understanding is that KafkaStreams uses shared offsets topic with
> >> > all the other consumers, and (apparently) corrupting data and delays
> >> > by other tenants is a non-issue. Perhaps you can comment on how
> >> > Connect is different? In general much of the complexity in the KIP is
> >> > related to the separate offset topic, and I'm wondering if this can be
> >> > avoided. The migration use-case is interesting, but not related to
> >> > exactly-once and can be handled separately.
> >> > 3. Allowing users to override the isolation level for the offset
> >> > reader, even when exactly-once is enabled, thereby disabling
> >> > exactly-once in a non-obvious way. I get that connect usually allows
> >> > users to shoot themselves in the foot, but are there any actual
> >> > benefits for allowing it in this case? Maybe it is better if we don't?
> >> > I don't find the argument that we always did this to be particularly
> >> > compelling.
> >> > 4. It isn't stated explicitly, but it sounds like connect or source
> >> > connectors already have some batching mechanism, and that transaction
> >> > boundaries will match the batches (i.e. each batch will be a
> >> > transaction?). If so, worth being explicit.
> >> > 5. "When a rebalance is triggered, before (re-)joining the cluster
> >> > group, all workers will preemptively stop all tasks of all source
> >> > connectors for which task configurations are present in the config
> >> > topic after the latest task count record" - how will this play with
> >> > the incremental rebalances? isn't this exactly the stop-the-world
> >> > rebalance we want to avoid?
> >> > 6. "the worker will instantiate a transactional producer whose
> >> > transactional ID is, by default, the group ID of the cluster (but may
> >> > be overwritten by users using the transactional.id worker property)"
> -
> >> > If users change transactional.id property, zombie leaders won't get
> >> > fenced (since they will have an older and different transactional id)
> >> >
> >> > Thanks,
> >> >
> >> > Gwen
> >> >
> >> > On Thu, May 21, 2020 at 11:21 PM Chris Egerton <chr...@confluent.io>
> >> > wrote:
> >> > >
> >> > > Hi all,
> >> > >
> >> > > I know it's a busy time with the upcoming 2.6 release and I don't
> >> expect
> >> > > this to get a lot of traction until that's done, but I've published
> a
> >> KIP
> >> > > for allowing atomic commit of offsets and records for source
> >> connectors
> >> > and
> >> > > would appreciate your feedback:
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> >> > >
> >> > > This feature should make it possible to implement source connectors
> >> with
> >> > > exactly-once delivery guarantees, and even allow a wide range of
> >> existing
> >> > > source connectors to provide exactly-once delivery guarantees with
> no
> >> > > changes required.
> >> > >
> >> > > Cheers,
> >> > >
> >> > > Chris
> >> >
> >> >
> >> >
> >> > --
> >> > Gwen Shapira
> >> > Engineering Manager | Confluent
> >> > 650.450.2760 | @gwenshap
> >> > Follow us: Twitter | blog
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>


-- 
-- Guozhang

Reply via email to