Hi Tom,

Really interesting turn this has taken! Responses inline.

> I'm not quite sure I follow what you mean here. Can you explain? AFAICT it
doesn't apply in a cluster where all tasks are using fencible producers,
but maybe I'm missing something.

Imagine this (unlikely but possible) scenario:

1. A connector is created with N tasks and uses the
fencible-but-not-transactional producer
2. Task N (as in, the task with the highest ID) is allocated to a worker
that then becomes a zombie
3. The connector is reconfigured to use N-1 tasks, and this takes effect on
all non-zombie workers in the cluster
4. The connector is reconfigured to enable full-on exactly-once support
(i.e., use of the transactional producer)

At this point, we would need to know to fence out task N that's running on
the zombie worker. This is what is accomplished in the current design with
the task count records in the config topic; even if the number of tasks in
a connector is decreased, the leader would be aware of the old, higher task
count for that connector, and know to fence out that many tasks.

I was only noting this for completeness' sake; there's nothing about this
requirement that renders your proposal impossible or even significantly
more difficult. We'd just have to make sure to do the task count record
bookkeeping for connectors regardless of whether they're exactly-once or
not, so that if a connector has exactly-once switched on without a cluster
roll in the middle, we'd know exactly how many tasks to fence out before
bringing up that first round of transactional producers.

> That will be the case for the new transactional cluster anyway.

That's true, but we do go from three static ACLs (write/describe on a fixed
transactional ID, and idempotent write on a fixed cluster) to a dynamic
collection of ACLs. In especially large organizations where the people that
administrate Connect clusters aren't necessarily the same as the people
that create and manage connectors this might cause some friction. Still,
since there are benefits to all users (regardless of requirements for
exactly-once delivery guarantees) in the form of fencible producers that
would, in many if not all circumstances, reduce duplicate writes, it's not
out of the question to argue for this change.

I also toyed with the question of "If we're going to require these new ACLs
unconditionally, what's stopping us from just enabling fully-fledged
exactly-once source support by default?". It'd be pretty straightforward to
include zombie fencing for free with this change, for example. The only
remaining blocker seems to be that the connector needs direct write and
read access to the offsets topic that it uses.

Ultimately I think it may behoove us to err on the side of reducing the
breaking changes here for now and saving them for 4.0 (or some later major
release), but would be interested in thoughts from you and others.

> Gouzhang also has a (possible) use case for a fencing-only producer (
https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out
there, you should be able to get these semantics today by calling
initTransactions() and then just using the producer as normal (no
beginTransaction()/abortTransaction()/endTransaction()).

I tested this locally and was not met with success; transactional producers
do a check right now to ensure that any calls to "KafkaProducer::send"
occur within a transaction (see
https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
and
https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451).
Not a blocker, just noting that we'd have to do some legwork to make this
workable with the producer API.

> In light of that (and assuming you buy these arguments), I wonder how much
extra effort it would be to do for EOS-enabled clusters as part of this
KIP?

The extra effort wouldn't be negligible (expansion of the producer API,
more complexity in task count record logic, more thorough upgrade notes),
but ultimately I wouldn't object to the proposal because of the extra work
involved. What it really comes down to IMO is how aggressive we're willing
to be with the breaking changes we make for users. If a good argument can
be made for introducing new ACL requirements for every single connector
running on 3.0 and beyond, then I'd be happy to fold this into the KIP in
exchange for the ability to configure exactly-once support on per-connector
basis.

Really enjoying the fresh perspective you're bringing here, especially with
regards to the transactional producer internals and Kafka Streams use cases!

Cheers,

Chris

On Fri, May 14, 2021 at 10:07 AM Tom Bentley <tbent...@redhat.com> wrote:

> Hi Chris,
>
> Thanks for the reply.
>
> "required"/"requested" sounds good to me. Likewise the pre-flight check and
> "PUT /{connectorType}/config/validate".
>
> The other half is we'd still need to
> > track the number of tasks for that connector that would need to be fenced
> > out if/when exactly-once for it were switched on.
> >
>
> I'm not quite sure I follow what you mean here. Can you explain? AFAICT it
> doesn't apply in a cluster where all tasks are using fencible producers,
> but maybe I'm missing something.
>
> If we had the
> > intermediate producer you describe at our disposal, and it were in use by
> > every running source task for a given connector, we could probably enable
> > users to toggle exactly-once on a per-connector basis, but it would also
> > require new ACLs for all connectors.
> >
>
> That will be the case for the new transactional cluster anyway.
>
> I think there is value to supporting connectors that don't use full-blown
> transactions in an exactly-once cluster, because the overhead in a fencing
> producer should be similar to an idempotent producer (which IIRC is about
> 3% above a non-idempotent producer). That's because we only need to make a
> single InitProducerIdRequest, and thereafter the epoch check is tiny.
>
> If that's right then many people would then be able to use a single cluster
> for both exactly once and non-exactly once connectors (i.e. it would get
> rid of the performance cost of running a non-EOS connector in an
> exactly-once cluster). Only people who cared about the ~3% would need to
> run "old-style" clusters using unfenced producers.
>
> Gouzhang also has a (possible) use case for a fencing-only producer (
> https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out
> there, you should be able to get these semantics today by calling
> initTransactions() and then just using the producer as normal (no
> beginTransaction()/abortTransaction()/endTransaction()).
>
> In light of that (and assuming you buy these arguments), I wonder how much
> extra effort it would be to do for EOS-enabled clusters as part of this
> KIP?
>
> Thanks again,
>
> Tom
>
> On Fri, May 14, 2021 at 2:14 AM Chris Egerton <chr...@confluent.io.invalid
> >
> wrote:
>
> > Hi Tom,
> >
> > I'm fine with an implicit mapping of connector-provided null to
> > user-exposed UNKNOWN, if the design continues down that overall path.
> >
> > Allowing users to assert that a connector should support exactly-once
> > sounds reasonable; it's similar to the pre-flight checks we already do
> for
> > connector configurations such as invoking "Connector::validate" and
> > ensuring that all of the referenced SMTs, Predicates, and Converter
> classes
> > are present on the worker. In fact, I wonder if that's how we could
> > implement it--as a preflight check. That way, Connector and Task
> instances
> > won't even have the chance to fail; if the user states a requirement for
> > exactly-once support but their connector configuration doesn't meet that
> > requirement, we can fail the connector creation/reconfiguration request
> > before even writing the new config to the config topic. We could also add
> > this support to the "PUT /{connectorType}/config/validate" endpoint so
> that
> > users could test exactly-once support for various configurations without
> > having to actually create or reconfigure a connector. We could still fail
> > tasks on startup if something slipped by (possibly due to connector
> > upgrade) but it'd make the UX a bit smoother in most cases to fail
> faster.
> >
> > Since a possible use of the property is to allow future users to control
> > exactly-once support on a per-connector basis, I wonder whether a binary
> > property is sufficient here. Even if a connector doesn't support
> > exactly-once, there could still be benefits to using a transactional
> > producer with rounds of zombie fencing; for example, preventing duplicate
> > task instances from producing data, which could be leveraged to provide
> > at-most-once delivery guarantees. In that case, we'd want a way to signal
> > to Connect that the framework should do everything it does to provide
> > exactly-once source support, but not make the assertion on the connector
> > config, and we'd end up providing three possibilities to users: required,
> > best-effort, and disabled. It sounds like right now what we're proposing
> is
> > that we expose only the first two and don't allow users to actually
> disable
> > exactly-once support on a per-connector basis, but want to leave room for
> > the third option in the future. With that in mind,
> "required/not_required"
> > might not be the best fit. Perhaps "required"/"requested" for now, with
> > "disabled" as the value that could be implemented later?
> >
> > RE: "Is the problem here simply that the zombie fencing provided by the
> > producer is only available when using transactions, and therefore having
> a
> > non-transactional producer in the cluster poses a risk of a zombie not
> > being fenced?"--that's half of it. The other half is we'd still need to
> > track the number of tasks for that connector that would need to be fenced
> > out if/when exactly-once for it were switched on. If we had the
> > intermediate producer you describe at our disposal, and it were in use by
> > every running source task for a given connector, we could probably enable
> > users to toggle exactly-once on a per-connector basis, but it would also
> > require new ACLs for all connectors. Even though we're allowed to make
> > breaking changes with the upcoming 3.0 release, I'm not sure the tradeoff
> > is worth it. I suppose we could break down exactly-once support into two
> > separate config properties--a worker-level property, that causes all
> source
> > tasks on the worker to use producers that can be fenced (either full-on
> > transactional producers or "intermediate" producers), and a per-connector
> > property, that toggles whether the connector itself uses a full-on
> > transactional producer or just an intermediate producer (and whether or
> not
> > zombie fencing is performed for new task configs). This seems like it
> might
> > be overkill for now, though.
> >
> > As far as the zombie fencing endpoint goes--the behavior will be the same
> > either way w/r/t the exactly.once.source.enabled property. The property
> > will dictate whether the endpoint is used by tasks, but it'll be
> available
> > for use no matter what. This is how a rolling upgrade becomes possible;
> > even if the leader hasn't been upgraded yet (to set
> > exactly.once.source.enabled to true), it will still be capable of
> handling
> > fencing requests from workers that have already been upgraded.
> >
> > Cheers,
> >
> > Chris
> >
> > On Wed, May 12, 2021 at 5:33 AM Tom Bentley <tbent...@redhat.com> wrote:
> >
> > > Hi Chris and Randall,
> > >
> > > I can see that for connectors where exactly once is
> > configuration-dependent
> > > it makes sense to use a default method. The problem with having an
> > explicit
> > > UNKNOWN case is we really want connector developers to _not_ use it.
> That
> > > could mean it's deprecated from the start. Alternatively we could omit
> it
> > > from the enum and use null to mean unknown (we'd have to check for a
> null
> > > result anyway), with the contract for the method being that it should
> > > return non-null. Of course, this doesn't remove the ambiguous case, but
> > > avoids the need to eventually remove UNKNOWN in the future.
> > >
> > > I think there's another way for a worker to use the value too: Imagine
> > > you're deploying a connector that you need to be exactly once. It's
> > awkward
> > > to have to query the REST API to determine that exactly once was
> working,
> > > especially if you need to do this after config changes too. What you
> > > actually want is to make an EOS assertion, via a connector config (e.g.
> > > require.exactly.once=true, or perhaps
> > exactly.once=required/not_required),
> > > which would fail the connector/task if exactly once could not be
> > provided.
> > >
> > > The not_required case wouldn't disable the transactional runtime
> > > environment, simply not guarantee that it was providing EOS. Although
> it
> > > would leave the door open to supporting mixed EOS/non-transactional
> > > deployments in the cluster in the future, if that became possible (i.e.
> > we
> > > could retrospectively make not_required mean no transactions).
> > >
> > > On the subject of why it's not possible to enabled exactly once on a
> > > per-connector basis: Is the problem here simply that the zombie fencing
> > > provided by the producer is only available when using transactions, and
> > > therefore having a non-transactional producer in the cluster poses a
> risk
> > > of a zombie not being fenced? This makes me wonder whether there's a
> case
> > > for a producer with zombie fencing that is not transactional
> > (intermediate
> > > between idempotent and transactional producer). IIUC this would need to
> > > make a InitProducerId request and use the PID in produce requests, but
> > > could dispense with the other transactional RPCs. If such a thing
> existed
> > > would the zombie fencing it provided be sufficient to provide safe
> > > semantics for running a non-EOS connector in an EOS-capable cluster?
> > >
> > > The endpoint for zombie fencing: It's not described how this works when
> > > exactly.once.source.enabled=false
> > >
> > > Cheers,
> > >
> > > Tom
> > >
> >
>

Reply via email to