Hi Yash,

Thanks for your detailed thoughts.

1. In KAFKA-4107 [1], the primary request is exactly what's proposed in the
KIP right now: a way to reset offsets for connectors. Sure, there's an
extra step of stopping the connector, but renaming a connector isn't as
convenient of an alternative as it may seem since in many cases you'd also
want to delete the older one, so the complete sequence of steps would be
something like delete the old connector, rename it (possibly requiring
modifications to its config file, depending on which API is used), then
create the renamed variant. It's also just not a great user
experience--even if the practical impacts are limited (which, IMO, they are
not), people have been asking for years about why they have to employ this
kind of a workaround for a fairly common use case, and we don't really have
a good answer beyond "we haven't implemented something better yet". On top
of that, you may have external tooling that needs to be tweaked to handle a
new connector name, you may have strict authorization policies around who
can access what connectors, you may have other ACLs attached to the name of
the connector (which can be especially common in the case of sink
connectors, whose consumer group IDs are tied to their names by default),
and leaving around state in the offsets topic that can never be cleaned up
presents a bit of a footgun for users. It may not be a silver bullet, but
providing some mechanism to reset that state is a step in the right
direction and allows responsible users to more carefully administer their
cluster without resorting to non-public APIs. That said, I do agree that a
fine-grained reset/overwrite API would be useful, and I'd be happy to
review a KIP to add that feature if anyone wants to tackle it!

2. Keeping the two formats symmetrical is motivated mostly by aesthetics
and quality-of-life for programmatic interaction with the API; it's not
really a goal to hide the use of consumer groups from users. I do agree
that the format is a little strange-looking for sink connectors, but it
seemed like it would be easier to work with for UIs, casual jq queries, and
CLIs than a more Kafka-specific alternative such as {"<topic>-<partition>":
"<offset>"}, and although it is a little strange, I don't think it's any
less readable or intuitive. That said, I've made some tweaks to the format
that should make programmatic access even easier; specifically, I've
removed the "source" and "sink" wrapper fields and instead moved them into
a top-level object with a "type" and "offsets" field, just like you
suggested in point 3 (thanks!). We might also consider changing the field
names for sink offsets from "topic", "partition", and "offset" to "Kafka
topic", "Kafka partition", and "Kafka offset" respectively, to reduce the
stuttering effect of having a "partition" field inside a "partition" field
and the same with an "offset" field; thoughts? One final point--by equating
source and sink offsets, we probably make it easier for users to understand
exactly what a source offset is; anyone who's familiar with consumer
offsets can see from the response format that we identify a logical
partition as a combination of two entities (a topic and a partition
number); it should make it easier to grok what a source offset is by seeing
what the two formats have in common.

3. Great idea! Done.

4. Yes, I'm thinking right now that a 409 will be the response status if a
rebalance is pending. I'd rather not add this to the KIP as we may want to
change it at some point and it doesn't seem vital to establish it as part
of the public contract for the new endpoint right now. Also, small
point--yes, a 409 is useful to avoid forwarding requests to an incorrect
leader, but it's also useful to ensure that there aren't any unresolved
writes to the config topic that might cause issues with the request (such
as deleting the connector).

5. That's a good point--it may be misleading to call a connector STOPPED
when it has zombie tasks lying around on the cluster. I don't think it'd be
appropriate to do this synchronously while handling requests to the PUT
/connectors/{connector}/stop since we'd want to give all currently-running
tasks a chance to gracefully shut down, though. I'm also not sure that this
is a significant problem, either. If the connector is resumed, then all
zombie tasks will be automatically fenced out by their successors on
startup; if it's deleted, then we'll have wasted effort by performing an
unnecessary round of fencing. It may be nice to guarantee that source task
resources will be deallocated after the connector transitions to STOPPED,
but realistically, it doesn't do much to just fence out their producers,
since tasks can be blocked on a number of other operations such as
key/value/header conversion, transformation, and task polling. It may be a
little strange if data is produced to Kafka after the connector has
transitioned to STOPPED, but we can't provide the same guarantees for sink
connectors, since their tasks may be stuck on a long-running SinkTask::put
that emits data even after the Connect framework has abandoned them after
exhausting their graceful shutdown timeout. Ultimately, I'd prefer to err
on the side of consistency and ease of implementation for now, but I may be
missing a case where a few extra records from a task that's slow to shut
down may cause serious issues--let me know.

6. I'm hesitant to propose deprecation of the PAUSED state right now as it
does serve a few purposes. Leaving tasks idling-but-ready makes resuming
them less disruptive across the cluster, since a rebalance isn't necessary.
It also reduces latency to resume the connector, especially for ones that
have to do a lot of state gathering on initialization to, e.g., read
offsets from an external system.

7. There should be no risk of mixed tasks after a downgrade, thanks to the
empty set of task configs that gets published to the config topic. Both
upgraded and downgraded workers will render an empty set of tasks for the
connector, and keep that set of empty tasks until the connector is resumed.
Does that address your concerns?

You're also correct that the linked Jira ticket was wrong; thanks for
pointing that out! Yes, KAFKA-4107 is the intended ticket, and I've updated
the link in the KIP accordingly.



[1] - https://issues.apache.org/jira/browse/KAFKA-4107

On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya <yash.ma...@gmail.com> wrote:

> Hi Chris,
> Thanks a lot for this KIP, I think something like this has been long
> overdue for Kafka Connect :)
> Some thoughts and questions that I had -
> 1. I'm wondering if you could elaborate a little more on the use case for
> the `DELETE /connectors/{connector}/offsets` API. I think we can all agree
> that a fine grained reset API that allows setting arbitrary offsets for
> partitions would be quite useful (which you talk about in the Future work
> section). But for the `DELETE /connectors/{connector}/offsets` API in its
> described form, it looks like it would only serve a seemingly niche use
> case where users want to avoid renaming connectors - because this new way
> of resetting offsets actually has more steps (i.e. stop the connector,
> reset offsets via the API, resume the connector) than simply deleting and
> re-creating the connector with a different name?
> 2. The KIP talks about taking care that the response formats (presumably
> only talking about the new GET API here) are symmetrical for both source
> and sink connectors - is the end goal to have users of Kafka Connect not
> even be aware that sink connectors use Kafka consumers under the hood (i.e.
> have that as purely an implementation detail abstracted away from users)?
> While I understand the value of uniformity here, the response format for
> sink connectors currently looks a little odd with the "partition" field
> having "topic" and "partition" as sub-fields, especially to users familiar
> with Kafka semantics. Thoughts?
> 3. Another little nitpick on the response format - why do we need "source"
> / "sink" as a field under "offsets"? Users can query the connector type via
> the existing `GET /connectors` API. If it's deemed important to let users
> know that the offsets they're seeing correspond to a source / sink
> connector, maybe we could have a top level field "type" in the response for
> the `GET /connectors/{connector}/offsets` API similar to the `GET
> /connectors` API?
> 4. For the `DELETE /connectors/{connector}/offsets` API, the KIP mentions
> that requests will be rejected if a rebalance is pending - presumably this
> is to avoid forwarding requests to a leader which may no longer be the
> leader after the pending rebalance? In this case, the API will return a
> `409 Conflict` response similar to some of the existing APIs, right?
> 5. Regarding fencing out previously running tasks for a connector, do you
> think it would make more sense semantically to have this implemented in the
> stop endpoint where an empty set of tasks is generated, rather than the
> delete offsets endpoint? This would also give the new `STOPPED` state a
> higher confidence of sorts, with any zombie tasks being fenced off from
> continuing to produce data.
> 6. Thanks for outlining the issues with the current state of the `PAUSED`
> state - I think a lot of users expect it to behave like the `STOPPED` state
> you outline in the KIP and are (unpleasantly) surprised when it doesn't.
> However, this does beg the question of what the usefulness of having two
> separate `PAUSED` and `STOPPED` states is? Do we want to continue
> supporting both these states in the future, or do you see the `STOPPED`
> state eventually causing the existing `PAUSED` state to be deprecated?
> 7. I think the idea outlined in the KIP for handling a new state during
> cluster downgrades / rolling upgrades is quite clever, but do you think
> there could be any issues with having a mix of "paused" and "stopped" tasks
> for the same connector across workers in a cluster? At the very least, I
> think it would be fairly confusing to most users. I'm wondering if this can
> be avoided by stating clearly in the KIP that the new `PUT
> /connectors/{connector}/stop`
> can only be used on a cluster that is fully upgraded to an AK version newer
> than the one which ends up containing changes from this KIP and that if a
> cluster needs to be downgraded to an older version, the user should ensure
> that none of the connectors on the cluster are in a stopped state? With the
> existing implementation, it looks like an unknown/invalid target state
> record is basically just discarded (with an error message logged), so it
> doesn't seem to be a disastrous failure scenario that can bring down a
> worker.
> Thanks,
> Yash
> On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
> > Hi Ashwin,
> >
> > Thanks for your thoughts. Regarding your questions:
> >
> > 1. The response would show the offsets that are visible to the source
> > connector, so it would combine the contents of the two topics, giving
> > priority to offsets present in the connector-specific topic. I'm
> imagining
> > a follow-up question that some people may have in response to that is
> > whether we'd want to provide insight into the contents of a single topic
> at
> > a time. It may be useful to be able to see this information in order to
> > debug connector issues or verify that it's safe to stop using a
> > connector-specific offsets topic (either explicitly, or implicitly via
> > cluster downgrade). What do you think about adding a URL query parameter
> > that allows users to dictate which view of the connector's offsets they
> are
> > given in the REST response, with options for the worker's global topic,
> the
> > connector-specific topic, and the combined view of them that the
> connector
> > and its tasks see (which would be the default)? This may be too much for
> V1
> > but it feels like it's at least worth exploring a bit.
> >
> > 2. There is no option for this at the moment. Reset semantics are
> extremely
> > coarse-grained; for source connectors, we delete all source offsets, and
> > for sink connectors, we delete the entire consumer group. I'm hoping this
> > will be enough for V1 and that, if there's sufficient demand for it, we
> can
> > introduce a richer API for resetting or even modifying connector offsets
> in
> > a follow-up KIP.
> >
> > 3. Good eye :) I think it's fine to keep the existing behavior for the
> > PAUSED state with the Connector instance, since the primary purpose of
> the
> > Connector is to generate task configs and monitor the external system for
> > changes. If there's no chance for tasks to be running anyways, I don't
> see
> > much value in allowing paused connectors to generate new task configs,
> > especially since each time that happens a rebalance is triggered and
> > there's a non-zero cost to that. What do you think?
> >
> > Cheers,
> >
> > Chris
> >
> > On Fri, Oct 14, 2022 at 12:59 AM Ashwin <apan...@confluent.io.invalid>
> > wrote:
> >
> > > Thanks for KIP Chris - I think this is a useful feature.
> > >
> > > Can you please elaborate on the following in the KIP -
> > >
> > > 1. How would the response of GET /connectors/{connector}/offsets look
> > like
> > > if the worker has both global and connector specific offsets topic ?
> > >
> > > 2. How can we pass the reset options like shift-by , to-date-time etc.
> > > using a REST API like DELETE /connectors/{connector}/offsets ?
> > >
> > > 3. Today PAUSE operation on a connector invokes its stop method - will
> > > there be a change here to reduce confusion with the new proposed
> > > state ?
> > >
> > > Thanks,
> > > Ashwin
> > >
> > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton <chr...@aiven.io.invalid
> >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I noticed a fairly large gap in the first version of this KIP that I
> > > > published last Friday, which has to do with accommodating connectors
> > > > that target different Kafka clusters than the one that the Kafka
> > Connect
> > > > cluster uses for its internal topics and source connectors with
> > dedicated
> > > > offsets topics. I've since updated the KIP to address this gap, which
> > has
> > > > substantially altered the design. Wanted to give a heads-up to anyone
> > > > that's already started reviewing.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton <chr...@aiven.io>
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I'd like to begin discussion on a KIP to add offsets support to the
> > > Kafka
> > > > > Connect REST API:
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > >
> > >
> >

