Hi Yash,

I've updated the KIP with the correct "kafka_topic", "kafka_partition", and
"kafka_offset" keys in the JSON examples (settled on those instead of
prefixing with "Kafka " for better interactions with tooling like JQ). I've
also added a note about sink offset requests failing if there are still
active members in the consumer group.

I don't believe logging an error message is sufficient for handling
failures to reset-after-delete. IMO it's highly likely that users will
either shoot themselves in the foot by not reading the fine print and
realizing that the offset request may have failed, or will ask for better
visibility into the success or failure of the reset request than scanning
log files. I don't doubt that there are ways to address this, but I would
prefer to leave them to a separate KIP since the required design work is
non-trivial and I do not feel that the added burden is worth tying to this
KIP as a blocker.

I was really hoping to avoid introducing a change to the developer-facing
APIs with this KIP, but after giving it some thought I think this may be
unavoidable. It's debatable whether validation of altered offsets is a good
enough use case on its own for this kind of API, but since there are also
connectors out there that manage offsets externally, we should probably add
a hook to allow those external offsets to be managed, which can then serve
double- or even-triple duty as a hook to validate custom offsets and to
notify users whether offset resets/alterations are supported at all (which
they may not be if, for example, offsets are coupled tightly with the data
written by a sink connector). I've updated the KIP with the
developer-facing API changes for this logic; let me know what you think.

Cheers,

Chris

On Mon, Nov 14, 2022 at 10:16 AM Mickael Maison <mickael.mai...@gmail.com>
wrote:

> Hi Chris,
>
> Thanks for the update!
>
> It's relatively common to only want to reset offsets for a specific
> resource (for example with MirrorMaker for one or a group of topics).
> Could it be possible to add a way to do so? Either by providing a
> payload to DELETE or by setting the offset field to an empty object in
> the PATCH payload?
>
> Thanks,
> Mickael
>
> On Sat, Nov 12, 2022 at 3:33 PM Yash Mayya <yash.ma...@gmail.com> wrote:
> >
> > Hi Chris,
> >
> > Thanks for pointing out that the consumer group deletion step itself will
> > fail in case of zombie sink tasks. Since we can't get any stronger
> > guarantees from consumers (unlike with transactional producers), I think
> it
> > makes perfect sense to fail the offset reset attempt in such scenarios
> with
> > a relevant error message to the user. I was more concerned about silently
> > failing but it looks like that won't be an issue. It's probably worth
> > calling out this difference between source / sink connectors explicitly
> in
> > the KIP, what do you think?
> >
> > > changing the field names for sink offsets
> > > from "topic", "partition", and "offset" to "Kafka
> > > topic", "Kafka partition", and "Kafka offset" respectively, to
> > > reduce the stuttering effect of having a "partition" field inside
> > >  a "partition" field and the same with an "offset" field
> >
> > The KIP is still using the nested partition / offset fields by the way -
> > has it not been updated because we're waiting for consensus on the field
> > names?
> >
> > > The reset-after-delete feature, on the other
> > > hand, is actually pretty tricky to design; I've updated the
> > > rationale in the KIP for delaying it and clarified that it's not
> > > just a matter of implementation but also design work.
> >
> > I like the idea of writing an offset reset request to the config topic
> > which will be processed by the herder's config update listener - I'm not
> > sure I fully follow the concerns with regard to handling failures? Why
> > can't we simply log an error saying that the offset reset for the deleted
> > connector "xyz" failed due to reason "abc"? As long as it's documented
> that
> > connector deletion and offset resets are asynchronous and a success
> > response only means that the request was initiated successfully (which is
> > the case even today with normal connector deletion), we should be fine
> > right?
> >
> > Thanks for adding the new PATCH endpoint to the KIP, I think it's a lot
> > more useful for this use case than a PUT endpoint would be! One thing
> > that I was thinking about with the new PATCH endpoint is that while we
> can
> > easily validate the request body format for sink connectors (since it's
> the
> > same across all connectors), we can't do the same for source connectors
> as
> > things stand today since each source connector implementation can define
> > its own source partition and offset structures. Without any validation,
> > writing a bad offset for a source connector via the PATCH endpoint could
> > cause it to fail with hard to discern errors. I'm wondering if we could
> add
> > a new method to the `SourceConnector` class (which should be overridden
> by
> > source connector implementations) that would validate whether or not the
> > provided source partitions and source offsets are valid for the connector
> > (it could have a default implementation returning true unconditionally
> for
> > backward compatibility).
> >
> > > I've also added an implementation plan to the KIP, which calls
> > > out the different parts that can be worked on independently so that
> > > others (hi Yash 🙂) can also tackle parts of this if they'd like.
> >
> > I'd be more than happy to pick up one or more of the implementation
> parts,
> > thanks for breaking it up into granular pieces!
> >
> > Thanks,
> > Yash
> >
> > On Fri, Nov 11, 2022 at 11:25 PM Chris Egerton <chr...@aiven.io.invalid>
> > wrote:
> >
> > > Hi Mickael,
> > >
> > > Thanks for your feedback. This has been on my TODO list as well :)
> > >
> > > 1. That's fair! Support for altering offsets is easy enough to design,
> so
> > > I've added it to the KIP. The reset-after-delete feature, on the other
> > > hand, is actually pretty tricky to design; I've updated the rationale
> in
> > > the KIP for delaying it and clarified that it's not just a matter of
> > > implementation but also design work. If you or anyone else can think
> of a
> > > clean, simple way to implement it, I'm happy to add it to this KIP, but
> > > otherwise I'd prefer not to tie it to the approval and release of the
> > > features already proposed in the KIP.
> > >
> > > 2. Yeah, it's a little awkward. In my head I've justified the ugliness
> of
> > > the implementation with the smooth user-facing experience; falling back
> > > seamlessly on the PAUSED state without even logging an error message
> is a
> > > lot better than I'd initially hoped for when I was designing this
> feature.
> > >
> > > I've also added an implementation plan to the KIP, which calls out the
> > > different parts that can be worked on independently so that others (hi
> Yash
> > > 🙂) can also tackle parts of this if they'd like.
> > >
> > > Finally, I've removed the "type" field from the response body format
> for
> > > offset read requests. This way, users can copy+paste the response from
> that
> > > endpoint into a request to alter a connector's offsets without having
> to
> > > remove the "type" field first. An alternative was to keep the "type"
> field
> > > and add it to the request body format for altering offsets, but this
> didn't
> > > seem to make enough sense for cases not involving the aforementioned
> > > copy+paste process.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Wed, Nov 9, 2022 at 9:57 AM Mickael Maison <
> mickael.mai...@gmail.com>
> > > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > Thanks for the KIP, you're picking something that has been in my todo
> > > > list for a while ;)
> > > >
> > > > It looks good overall, I just have a couple of questions:
> > > > 1) I consider both features listed in Future Work pretty important.
> In
> > > > both cases you mention the reason for not addressing them now is
> > > > because of the implementation. If the design is simple and if we have
> > > > volunteers to implement them, I wonder if we could include them in
> > > > this KIP. So you would not have to implement everything but we would
> > > > have a single KIP and vote.
> > > >
> > > > 2) Regarding the backward compatibility for the stopped state. The
> > > > "state.v2" field is a bit unfortunate but I can't think of a better
> > > > solution. The other alternative would be to not do anything but I
> > > > think the graceful degradation you propose is a bit better.
> > > >
> > > > Thanks,
> > > > Mickael
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Nov 8, 2022 at 5:58 PM Chris Egerton <chr...@aiven.io.invalid
> >
> > > > wrote:
> > > > >
> > > > > Hi Yash,
> > > > >
> > > > > Good question! This is actually a subtle source of asymmetry in the
> > > > current
> > > > > proposal. Requests to delete a consumer group with active members
> will
> > > > > fail, so if there are zombie sink tasks that are still
> communicating
> > > with
> > > > > Kafka, offset reset requests for that connector will also fail. It
> is
> > > > > possible to use an admin client to remove all active members from
> the
> > > > group
> > > > > and then delete the group. However, this solution isn't as
> complete as
> > > > the
> > > > > zombie fencing that we can perform for exactly-once source tasks,
> since
> > > > > removing consumers from a group doesn't prevent them from
> immediately
> > > > > rejoining the group, which would either cause the group deletion
> > > request
> > > > to
> > > > > fail (if they rejoin before the group is deleted), or recreate the
> > > group
> > > > > (if they rejoin after the group is deleted).
> > > > >
> > > > > For ease of implementation, I'd prefer to leave the asymmetry in
> the
> > > API
> > > > > for now and fail fast and clearly if there are still consumers
> active
> > > in
> > > > > the sink connector's group. We can try to detect this case and
> provide
> > > a
> > > > > helpful error message to the user explaining why the offset reset
> > > request
> > > > > has failed and some steps they can take to try to resolve things
> (wait
> > > > for
> > > > > slow task shutdown to complete, restart zombie workers and/or
> workers
> > > > with
> > > > > blocked tasks on them). In the future we can possibly even revisit
> > > > KIP-611
> > > > > [1] or something like it to provide better insight into zombie
> tasks
> > > on a
> > > > > worker so that it's easier to find which tasks have been abandoned
> but
> > > > are
> > > > > still running.
> > > > >
> > > > > Let me know what you think; this is an important point to call out
> and
> > > if
> > > > > we can reach some consensus on how to handle sink connector offset
> > > resets
> > > > > w/r/t zombie tasks, I'll update the KIP with the details.
> > > > >
> > > > > [1] -
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Tue, Nov 8, 2022 at 8:00 AM Yash Mayya <yash.ma...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Chris,
> > > > > >
> > > > > > Thanks for the response and the explanations, I think you've
> answered
> > > > > > pretty much all the questions I had meticulously!
> > > > > >
> > > > > >
> > > > > > > if something goes wrong while resetting offsets, there's no
> > > > > > > immediate impact--the connector will still be in the STOPPED
> > > > > > >  state. The REST response for requests to reset the offsets
> > > > > > > will clearly call out that the operation has failed, and if
> > > > necessary,
> > > > > > > we can probably also add a scary-looking warning message
> > > > > > > stating that we can't guarantee which offsets have been
> > > successfully
> > > > > > >  wiped and which haven't. Users can query the exact offsets of
> > > > > > > the connector at this point to determine what will happen
> if/what
> > > > they
> > > > > > > resume it. And they can repeat attempts to reset the offsets as
> > > many
> > > > > > >  times as they'd like until they get back a 2XX response,
> > > indicating
> > > > > > > that it's finally safe to resume the connector. Thoughts?
> > > > > >
> > > > > > Yeah, I agree, the case that I mentioned earlier where a user
> would
> > > > try to
> > > > > > resume a stopped connector after a failed offset reset attempt
> > > without
> > > > > > knowing that the offset reset attempt didn't fail cleanly is
> probably
> > > > just
> > > > > > an extreme edge case. I think as long as the response is verbose
> > > > enough and
> > > > > > self explanatory, we should be fine.
> > > > > >
> > > > > > Another question that I had was behavior w.r.t sink connector
> offset
> > > > resets
> > > > > > when there are zombie tasks/workers in the Connect cluster - the
> KIP
> > > > > > mentions that for sink connectors offset resets will be done by
> > > > deleting
> > > > > > the consumer group. However, if there are zombie tasks which are
> > > still
> > > > able
> > > > > > to communicate with the Kafka cluster that the sink connector is
> > > > consuming
> > > > > > from, I think the consumer group will automatically get
> re-created
> > > and
> > > > the
> > > > > > zombie task may be able to commit offsets for the partitions
> that it
> > > is
> > > > > > consuming from?
> > > > > >
> > > > > > Thanks,
> > > > > > Yash
> > > > > >
> > > > > >
> > > > > > On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton
> > > <chr...@aiven.io.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Yash,
> > > > > > >
> > > > > > > Thanks again for your thoughts! Responses to ongoing
> discussions
> > > > inline
> > > > > > > (easier to track context than referencing comment numbers):
> > > > > > >
> > > > > > > > However, this then leads me to wonder if we can make that
> > > explicit
> > > > by
> > > > > > > including "connect" or "connector" in the higher level field
> names?
> > > > Or do
> > > > > > > you think this isn't required given that we're talking about a
> > > > Connect
> > > > > > > specific REST API in the first place?
> > > > > > >
> > > > > > > I think "partition" and "offset" are fine as field names but
> I'm
> > > not
> > > > > > hugely
> > > > > > > opposed to adding "connector " as a prefix to them; would be
> > > > interested
> > > > > > in
> > > > > > > others' thoughts.
> > > > > > >
> > > > > > > > I'm not sure I followed why the unresolved writes to the
> config
> > > > topic
> > > > > > > would be an issue - wouldn't the delete offsets request be
> added to
> > > > the
> > > > > > > herder's request queue and whenever it is processed, we'd
> anyway
> > > > need to
> > > > > > > check if all the prerequisites for the request are satisfied?
> > > > > > >
> > > > > > > Some requests are handled in multiple steps. For example,
> deleting
> > > a
> > > > > > > connector (1) adds a request to the herder queue to write a
> > > > tombstone to
> > > > > > > the config topic (or, if the worker isn't the leader, forward
> the
> > > > request
> > > > > > > to the leader). (2) Once that tombstone is picked up, (3) a
> > > rebalance
> > > > > > > ensues, and then after it's finally complete, (4) the
> connector and
> > > > its
> > > > > > > tasks are shut down. I probably could have used better
> terminology,
> > > > but
> > > > > > > what I meant by "unresolved writes to the config topic" was a
> case
> > > in
> > > > > > > between steps (2) and (3)--where the worker has already read
> that
> > > > > > tombstone
> > > > > > > from the config topic and knows that a rebalance is pending,
> but
> > > > hasn't
> > > > > > > begun participating in that rebalance yet. In the
> DistributedHerder
> > > > > > class,
> > > > > > > this is done via the `checkRebalanceNeeded` method.
> > > > > > >
> > > > > > > > We can probably revisit this potential deprecation [of the
> PAUSED
> > > > > > state]
> > > > > > > in the future based on user feedback and how the adoption of
> the
> > > new
> > > > > > > proposed stop endpoint looks like, what do you think?
> > > > > > >
> > > > > > > Yeah, revisiting in the future seems reasonable. 👍
> > > > > > >
> > > > > > > And responses to new comments here:
> > > > > > >
> > > > > > > 8. Yep, we'll start tracking offsets by connector. I don't
> believe
> > > > this
> > > > > > > should be too difficult, and suspect that the only reason we
> track
> > > > raw
> > > > > > byte
> > > > > > > arrays instead of pre-deserializing offset topic information
> into
> > > > > > something
> > > > > > > more useful is because Connect originally had pluggable
> internal
> > > > > > > converters. Now that we're hardcoded to use the JSON converter
> it
> > > > should
> > > > > > be
> > > > > > > fine to track offsets on a per-connector basis as they're read
> from
> > > > the
> > > > > > > offsets topic.
> > > > > > >
> > > > > > > 9. I'm hesitant to introduce this type of feature right now
> because
> > > > of
> > > > > > all
> > > > > > > of the gotchas that would come with it. In security-conscious
> > > > > > environments,
> > > > > > > it's possible that a sink connector's principal may have
> access to
> > > > the
> > > > > > > consumer group used by the connector, but the worker's
> principal
> > > may
> > > > not.
> > > > > > > There's also the case where source connectors have separate
> offsets
> > > > > > topics,
> > > > > > > or sink connectors have overridden consumer group IDs, or sink
> or
> > > > source
> > > > > > > connectors work against a different Kafka cluster than the one
> that
> > > > their
> > > > > > > worker uses. Overall, I'd rather provide a single API that
> works in
> > > > all
> > > > > > > cases rather than risk confusing and alienating users by
> trying to
> > > > make
> > > > > > > their lives easier in a subset of cases.
> > > > > > >
> > > > > > > 10. Hmm... I don't think the order of the writes matters too
> much
> > > > here,
> > > > > > but
> > > > > > > we probably could start by deleting from the global topic
> first,
> > > > that's
> > > > > > > true. The reason I'm not hugely concerned about this case is
> that
> > > if
> > > > > > > something goes wrong while resetting offsets, there's no
> immediate
> > > > > > > impact--the connector will still be in the STOPPED state. The
> REST
> > > > > > response
> > > > > > > for requests to reset the offsets will clearly call out that
> the
> > > > > > operation
> > > > > > > has failed, and if necessary, we can probably also add a
> > > > scary-looking
> > > > > > > warning message stating that we can't guarantee which offsets
> have
> > > > been
> > > > > > > successfully wiped and which haven't. Users can query the exact
> > > > offsets
> > > > > > of
> > > > > > > the connector at this point to determine what will happen
> if/what
> > > > they
> > > > > > > resume it. And they can repeat attempts to reset the offsets as
> > > many
> > > > > > times
> > > > > > > as they'd like until they get back a 2XX response, indicating
> that
> > > > it's
> > > > > > > finally safe to resume the connector. Thoughts?
> > > > > > >
> > > > > > > 11. I haven't thought too much about it. I think something
> like the
> > > > > > > Monitorable* connectors would probably serve our needs here;
> we can
> > > > > > > instantiate them on a running Connect cluster and then use
> various
> > > > > > handles
> > > > > > > to know how many times they've been polled, committed records,
> etc.
> > > > If
> > > > > > > necessary we can tweak those classes or even write our own. But
> > > > anyways,
> > > > > > > once that's all done, the test will be something like "create a
> > > > > > connector,
> > > > > > > wait for it to produce N records (each of which contains some
> kind
> > > of
> > > > > > > predictable offset), and ensure that the offsets for it in the
> REST
> > > > API
> > > > > > > match up with the ones we'd expect from N records". Does that
> > > answer
> > > > your
> > > > > > > question?
> > > > > > >
> > > > > > > Cheers,
> > > > > > >
> > > > > > > Chris
> > > > > > >
> > > > > > > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya <
> yash.ma...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Chris,
> > > > > > > >
> > > > > > > > 1. Thanks a lot for elaborating on this, I'm now convinced
> about
> > > > the
> > > > > > > > usefulness of the new offset reset endpoint. Regarding the
> > > > follow-up
> > > > > > KIP
> > > > > > > > for a fine-grained offset write API, I'd be happy to take
> that on
> > > > once
> > > > > > > this
> > > > > > > > KIP is finalized and I will definitely look forward to your
> > > > feedback on
> > > > > > > > that one!
> > > > > > > >
> > > > > > > > 2. Gotcha, the motivation makes more sense to me now. So the
> > > higher
> > > > > > level
> > > > > > > > partition field represents a Connect specific "logical
> partition"
> > > > of
> > > > > > > sorts
> > > > > > > > - i.e. the source partition as defined by a connector for
> source
> > > > > > > connectors
> > > > > > > > and a Kafka topic + partition for sink connectors. I like the
> > > idea
> > > > of
> > > > > > > > adding a Kafka prefix to the lower level partition/offset
> (and
> > > > topic)
> > > > > > > > fields which basically makes it more clear (although
> implicitly)
> > > > that
> > > > > > the
> > > > > > > > higher level partition/offset field is Connect specific and
> not
> > > the
> > > > > > same
> > > > > > > as
> > > > > > > > what those terms represent in Kafka itself. However, this
> then
> > > > leads me
> > > > > > > to
> > > > > > > > wonder if we can make that explicit by including "connect" or
> > > > > > "connector"
> > > > > > > > in the higher level field names? Or do you think this isn't
> > > > required
> > > > > > > given
> > > > > > > > that we're talking about a Connect specific REST API in the
> first
> > > > > > place?
> > > > > > > >
> > > > > > > > 3. Thanks, I think the response structure definitely looks
> better
> > > > now!
> > > > > > > >
> > > > > > > > 4. Interesting, I'd be curious to learn why we might want to
> > > change
> > > > > > this
> > > > > > > in
> > > > > > > > the future but that's probably out of scope for this
> discussion.
> > > > I'm
> > > > > > not
> > > > > > > > sure I followed why the unresolved writes to the config topic
> > > > would be
> > > > > > an
> > > > > > > > issue - wouldn't the delete offsets request be added to the
> > > > herder's
> > > > > > > > request queue and whenever it is processed, we'd anyway need
> to
> > > > check
> > > > > > if
> > > > > > > > all the prerequisites for the request are satisfied?
> > > > > > > >
> > > > > > > > 5. Thanks for elaborating that just fencing out the producer
> > > still
> > > > > > leaves
> > > > > > > > many cases where source tasks remain hanging around and also
> that
> > > > we
> > > > > > > anyway
> > > > > > > > can't have similar data production guarantees for sink
> connectors
> > > > right
> > > > > > > > now. I agree that it might be better to go with ease of
> > > > implementation
> > > > > > > and
> > > > > > > > consistency for now.
> > > > > > > >
> > > > > > > > 6. Right, that does make sense but I still feel like the two
> > > states
> > > > > > will
> > > > > > > > end up being confusing to end users who might not be able to
> > > > discern
> > > > > > the
> > > > > > > > (fairly low-level) differences between them (also the
> nuances of
> > > > state
> > > > > > > > transitions like STOPPED -> PAUSED or PAUSED -> STOPPED with
> the
> > > > > > > > rebalancing implications as well). We can probably revisit
> this
> > > > > > potential
> > > > > > > > deprecation in the future based on user feedback and how the
> > > > adoption
> > > > > > of
> > > > > > > > the new proposed stop endpoint looks like, what do you think?
> > > > > > > >
> > > > > > > > 7. Aha, that is completely my bad, I missed that the v1/v2
> state
> > > is
> > > > > > only
> > > > > > > > applicable to the connector's target state and that we don't
> need
> > > > to
> > > > > > > worry
> > > > > > > > about the tasks since we will have an empty set of tasks. I
> > > think I
> > > > > > was a
> > > > > > > > little confused by "pause the parts of the connector that
> they
> > > are
> > > > > > > > assigned" from the KIP. Thanks for clarifying that!
> > > > > > > >
> > > > > > > >
> > > > > > > > Some more thoughts and questions that I had -
> > > > > > > >
> > > > > > > > 8. Could you elaborate on what the implementation for offset
> > > reset
> > > > for
> > > > > > > > source connectors would look like? Currently, it doesn't look
> > > like
> > > > we
> > > > > > > track
> > > > > > > > all the partitions for a source connector anywhere. Will we
> need
> > > to
> > > > > > > > book-keep this somewhere in order to be able to emit a
> tombstone
> > > > record
> > > > > > > for
> > > > > > > > each source partition?
> > > > > > > >
> > > > > > > > 9. The KIP describes the offset reset endpoint as only being
> > > > usable on
> > > > > > > > existing connectors that are in a `STOPPED` state. Why
> wouldn't
> > > we
> > > > want
> > > > > > > to
> > > > > > > > allow resetting offsets for a deleted connector which seems
> to
> > > be a
> > > > > > valid
> > > > > > > > use case? Or do we plan to handle this use case only via the
> item
> > > > > > > outlined
> > > > > > > > in the future work section - "Automatically delete offsets
> with
> > > > > > > > connectors"?
> > > > > > > >
> > > > > > > > 10. The KIP mentions that source offsets will be reset
> > > > transactionally
> > > > > > > for
> > > > > > > > each topic (worker global offset topic and connector specific
> > > > offset
> > > > > > > topic
> > > > > > > > if it exists). While it obviously isn't possible to
> atomically do
> > > > the
> > > > > > > > writes to two topics which may be on different Kafka
> clusters,
> > > I'm
> > > > > > > > wondering about what would happen if the first transaction
> > > > succeeds but
> > > > > > > the
> > > > > > > > second one fails. I think the order of the two transactions
> > > matters
> > > > > > here
> > > > > > > -
> > > > > > > > if we successfully emit tombstones to the connector specific
> > > offset
> > > > > > topic
> > > > > > > > and fail to do so for the worker global offset topic, we'll
> > > > presumably
> > > > > > > fail
> > > > > > > > the offset delete request because the KIP mentions that "A
> > > request
> > > > to
> > > > > > > reset
> > > > > > > > offsets for a source connector will only be considered
> successful
> > > > if
> > > > > > the
> > > > > > > > worker is able to delete all known offsets for that
> connector, on
> > > > both
> > > > > > > the
> > > > > > > > worker's global offsets topic and (if one is used) the
> > > connector's
> > > > > > > > dedicated offsets topic.". However, this will lead to the
> > > connector
> > > > > > only
> > > > > > > > being able to read potentially older offsets from the worker
> > > global
> > > > > > > offset
> > > > > > > > topic on resumption (based on the combined offset view
> presented
> > > as
> > > > > > > > described in KIP-618 [1]). So, I think we should make sure
> that
> > > the
> > > > > > > worker
> > > > > > > > global offset topic tombstoning is attempted first, right?
> Note
> > > > that in
> > > > > > > the
> > > > > > > > current implementation of
> `ConnectorOffsetBackingStore::set`, the
> > > > > > > primary /
> > > > > > > > connector specific offset store is written to first.
> > > > > > > >
> > > > > > > > 11. This probably isn't necessary to elaborate on in the KIP
> > > > itself,
> > > > > > but
> > > > > > > I
> > > > > > > > was wondering what the second offset test - "verify that that
> > > those
> > > > > > > offsets
> > > > > > > > reflect an expected level of progress for each connector
> (i.e.,
> > > > they
> > > > > > are
> > > > > > > > greater than or equal to a certain value depending on how the
> > > > > > connectors
> > > > > > > > are configured and how long they have been running)" - would
> look
> > > > like?
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Yash
> > > > > > > >
> > > > > > > > [1] -
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration
> > > > > > > >
> > > > > > > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton
> > > > <chr...@aiven.io.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Yash,
> > > > > > > > >
> > > > > > > > > Thanks for your detailed thoughts.
> > > > > > > > >
> > > > > > > > > 1. In KAFKA-4107 [1], the primary request is exactly what's
> > > > proposed
> > > > > > in
> > > > > > > > the
> > > > > > > > > KIP right now: a way to reset offsets for connectors. Sure,
> > > > there's
> > > > > > an
> > > > > > > > > extra step of stopping the connector, but renaming a
> connector
> > > > isn't
> > > > > > as
> > > > > > > > > convenient of an alternative as it may seem since in many
> cases
> > > > you'd
> > > > > > > > also
> > > > > > > > > want to delete the older one, so the complete sequence of
> steps
> > > > would
> > > > > > > be
> > > > > > > > > something like delete the old connector, rename it
> (possibly
> > > > > > requiring
> > > > > > > > > modifications to its config file, depending on which API is
> > > > used),
> > > > > > then
> > > > > > > > > create the renamed variant. It's also just not a great user
> > > > > > > > > experience--even if the practical impacts are limited
> (which,
> > > > IMO,
> > > > > > they
> > > > > > > > are
> > > > > > > > > not), people have been asking for years about why they
> have to
> > > > employ
> > > > > > > > this
> > > > > > > > > kind of a workaround for a fairly common use case, and we
> don't
> > > > > > really
> > > > > > > > have
> > > > > > > > > a good answer beyond "we haven't implemented something
> better
> > > > yet".
> > > > > > On
> > > > > > > > top
> > > > > > > > > of that, you may have external tooling that needs to be
> tweaked
> > > > to
> > > > > > > > handle a
> > > > > > > > > new connector name, you may have strict authorization
> policies
> > > > around
> > > > > > > who
> > > > > > > > > can access what connectors, you may have other ACLs
> attached to
> > > > the
> > > > > > > name
> > > > > > > > of
> > > > > > > > > the connector (which can be especially common in the case
> of
> > > sink
> > > > > > > > > connectors, whose consumer group IDs are tied to their
> names by
> > > > > > > default),
> > > > > > > > > and leaving around state in the offsets topic that can
> never be
> > > > > > cleaned
> > > > > > > > up
> > > > > > > > > presents a bit of a footgun for users. It may not be a
> silver
> > > > bullet,
> > > > > > > but
> > > > > > > > > providing some mechanism to reset that state is a step in
> the
> > > > right
> > > > > > > > > direction and allows responsible users to more carefully
> > > > administer
> > > > > > > their
> > > > > > > > > cluster without resorting to non-public APIs. That said, I
> do
> > > > agree
> > > > > > > that
> > > > > > > > a
> > > > > > > > > fine-grained reset/overwrite API would be useful, and I'd
> be
> > > > happy to
> > > > > > > > > review a KIP to add that feature if anyone wants to tackle
> it!
> > > > > > > > >
> > > > > > > > > 2. Keeping the two formats symmetrical is motivated mostly
> by
> > > > > > > aesthetics
> > > > > > > > > and quality-of-life for programmatic interaction with the
> API;
> > > > it's
> > > > > > not
> > > > > > > > > really a goal to hide the use of consumer groups from
> users. I
> > > do
> > > > > > agree
> > > > > > > > > that the format is a little strange-looking for sink
> > > connectors,
> > > > but
> > > > > > it
> > > > > > > > > seemed like it would be easier to work with for UIs,
> casual jq
> > > > > > queries,
> > > > > > > > and
> > > > > > > > > CLIs than a more Kafka-specific alternative such as
> > > > > > > > {"<topic>-<partition>":
> > > > > > > > > "<offset>"}, and although it is a little strange, I don't
> think
> > > > it's
> > > > > > > any
> > > > > > > > > less readable or intuitive. That said, I've made some
> tweaks to
> > > > the
> > > > > > > > format
> > > > > > > > > that should make programmatic access even easier;
> specifically,
> > > > I've
> > > > > > > > > removed the "source" and "sink" wrapper fields and instead
> > > moved
> > > > them
> > > > > > > > into
> > > > > > > > > a top-level object with a "type" and "offsets" field, just
> like
> > > > you
> > > > > > > > > suggested in point 3 (thanks!). We might also consider
> changing
> > > > the
> > > > > > > field
> > > > > > > > > names for sink offsets from "topic", "partition", and
> "offset"
> > > to
> > > > > > > "Kafka
> > > > > > > > > topic", "Kafka partition", and "Kafka offset"
> respectively, to
> > > > reduce
> > > > > > > the
> > > > > > > > > stuttering effect of having a "partition" field inside a
> > > > "partition"
> > > > > > > > field
> > > > > > > > > and the same with an "offset" field; thoughts? One final
> > > > point--by
> > > > > > > > equating
> > > > > > > > > source and sink offsets, we probably make it easier for
> users
> > > to
> > > > > > > > understand
> > > > > > > > > exactly what a source offset is; anyone who's familiar with
> > > > consumer
> > > > > > > > > offsets can see from the response format that we identify a
> > > > logical
> > > > > > > > > partition as a combination of two entities (a topic and a
> > > > partition
> > > > > > > > > number); it should make it easier to grok what a source
> offset
> > > > is by
> > > > > > > > seeing
> > > > > > > > > what the two formats have in common.
> > > > > > > > >
> > > > > > > > > 3. Great idea! Done.
> > > > > > > > >
> > > > > > > > > 4. Yes, I'm thinking right now that a 409 will be the
> response
> > > > status
> > > > > > > if
> > > > > > > > a
> > > > > > > > > rebalance is pending. I'd rather not add this to the KIP
> as we
> > > > may
> > > > > > want
> > > > > > > > to
> > > > > > > > > change it at some point and it doesn't seem vital to
> establish
> > > > it as
> > > > > > > part
> > > > > > > > > of the public contract for the new endpoint right now.
> Also,
> > > > small
> > > > > > > > > point--yes, a 409 is useful to avoid forwarding requests
> to an
> > > > > > > incorrect
> > > > > > > > > leader, but it's also useful to ensure that there aren't
> any
> > > > > > unresolved
> > > > > > > > > writes to the config topic that might cause issues with the
> > > > request
> > > > > > > (such
> > > > > > > > > as deleting the connector).
> > > > > > > > >
> > > > > > > > > 5. That's a good point--it may be misleading to call a
> > > connector
> > > > > > > STOPPED
> > > > > > > > > when it has zombie tasks lying around on the cluster. I
> don't
> > > > think
> > > > > > > it'd
> > > > > > > > be
> > > > > > > > > appropriate to do this synchronously while handling
> requests to
> > > > the
> > > > > > PUT
> > > > > > > > > /connectors/{connector}/stop since we'd want to give all
> > > > > > > > currently-running
> > > > > > > > > tasks a chance to gracefully shut down, though. I'm also
> not
> > > sure
> > > > > > that
> > > > > > > > this
> > > > > > > > > is a significant problem, either. If the connector is
> resumed,
> > > > then
> > > > > > all
> > > > > > > > > zombie tasks will be automatically fenced out by their
> > > > successors on
> > > > > > > > > startup; if it's deleted, then we'll have wasted effort by
> > > > performing
> > > > > > > an
> > > > > > > > > unnecessary round of fencing. It may be nice to guarantee
> that
> > > > source
> > > > > > > > task
> > > > > > > > > resources will be deallocated after the connector
> transitions
> > > to
> > > > > > > STOPPED,
> > > > > > > > > but realistically, it doesn't do much to just fence out
> their
> > > > > > > producers,
> > > > > > > > > since tasks can be blocked on a number of other operations
> such
> > > > as
> > > > > > > > > key/value/header conversion, transformation, and task
> polling.
> > > > It may
> > > > > > > be
> > > > > > > > a
> > > > > > > > > little strange if data is produced to Kafka after the
> connector
> > > > has
> > > > > > > > > transitioned to STOPPED, but we can't provide the same
> > > > guarantees for
> > > > > > > > sink
> > > > > > > > > connectors, since their tasks may be stuck on a
> long-running
> > > > > > > > SinkTask::put
> > > > > > > > > that emits data even after the Connect framework has
> abandoned
> > > > them
> > > > > > > after
> > > > > > > > > exhausting their graceful shutdown timeout. Ultimately, I'd
> > > > prefer to
> > > > > > > err
> > > > > > > > > on the side of consistency and ease of implementation for
> now,
> > > > but I
> > > > > > > may
> > > > > > > > be
> > > > > > > > > missing a case where a few extra records from a task that's
> > > slow
> > > > to
> > > > > > > shut
> > > > > > > > > down may cause serious issues--let me know.
> > > > > > > > >
> > > > > > > > > 6. I'm hesitant to propose deprecation of the PAUSED state
> > > right
> > > > now
> > > > > > as
> > > > > > > > it
> > > > > > > > > does serve a few purposes. Leaving tasks idling-but-ready
> makes
> > > > > > > resuming
> > > > > > > > > them less disruptive across the cluster, since a rebalance
> > > isn't
> > > > > > > > necessary.
> > > > > > > > > It also reduces latency to resume the connector,
> especially for
> > > > ones
> > > > > > > that
> > > > > > > > > have to do a lot of state gathering on initialization to,
> e.g.,
> > > > read
> > > > > > > > > offsets from an external system.
> > > > > > > > >
> > > > > > > > > 7. There should be no risk of mixed tasks after a
> downgrade,
> > > > thanks
> > > > > > to
> > > > > > > > the
> > > > > > > > > empty set of task configs that gets published to the config
> > > > topic.
> > > > > > Both
> > > > > > > > > upgraded and downgraded workers will render an empty set of
> > > > tasks for
> > > > > > > the
> > > > > > > > > connector, and keep that set of empty tasks until the
> connector
> > > > is
> > > > > > > > resumed.
> > > > > > > > > Does that address your concerns?
> > > > > > > > >
> > > > > > > > > You're also correct that the linked Jira ticket was wrong;
> > > > thanks for
> > > > > > > > > pointing that out! Yes, KAFKA-4107 is the intended ticket,
> and
> > > > I've
> > > > > > > > updated
> > > > > > > > > the link in the KIP accordingly.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > >
> > > > > > > > > Chris
> > > > > > > > >
> > > > > > > > > [1] - https://issues.apache.org/jira/browse/KAFKA-4107
> > > > > > > > >
> > > > > > > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya <
> > > > yash.ma...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Chris,
> > > > > > > > > >
> > > > > > > > > > Thanks a lot for this KIP, I think something like this
> has
> > > been
> > > > > > long
> > > > > > > > > > overdue for Kafka Connect :)
> > > > > > > > > >
> > > > > > > > > > Some thoughts and questions that I had -
> > > > > > > > > >
> > > > > > > > > > 1. I'm wondering if you could elaborate a little more on
> the
> > > > use
> > > > > > case
> > > > > > > > for
> > > > > > > > > > the `DELETE /connectors/{connector}/offsets` API. I
> think we
> > > > can
> > > > > > all
> > > > > > > > > agree
> > > > > > > > > > that a fine grained reset API that allows setting
> arbitrary
> > > > offsets
> > > > > > > for
> > > > > > > > > > partitions would be quite useful (which you talk about
> in the
> > > > > > Future
> > > > > > > > work
> > > > > > > > > > section). But for the `DELETE
> > > /connectors/{connector}/offsets`
> > > > API
> > > > > > in
> > > > > > > > its
> > > > > > > > > > described form, it looks like it would only serve a
> seemingly
> > > > niche
> > > > > > > use
> > > > > > > > > > case where users want to avoid renaming connectors -
> because
> > > > this
> > > > > > new
> > > > > > > > way
> > > > > > > > > > of resetting offsets actually has more steps (i.e. stop
> the
> > > > > > > connector,
> > > > > > > > > > reset offsets via the API, resume the connector) than
> simply
> > > > > > deleting
> > > > > > > > and
> > > > > > > > > > re-creating the connector with a different name?
> > > > > > > > > >
> > > > > > > > > > 2. The KIP talks about taking care that the response
> formats
> > > > > > > > (presumably
> > > > > > > > > > only talking about the new GET API here) are symmetrical
> for
> > > > both
> > > > > > > > source
> > > > > > > > > > and sink connectors - is the end goal to have users of
> Kafka
> > > > > > Connect
> > > > > > > > not
> > > > > > > > > > even be aware that sink connectors use Kafka consumers
> under
> > > > the
> > > > > > hood
> > > > > > > > > (i.e.
> > > > > > > > > > have that as purely an implementation detail abstracted
> away
> > > > from
> > > > > > > > users)?
> > > > > > > > > > While I understand the value of uniformity here, the
> response
> > > > > > format
> > > > > > > > for
> > > > > > > > > > sink connectors currently looks a little odd with the
> > > > "partition"
> > > > > > > field
> > > > > > > > > > having "topic" and "partition" as sub-fields, especially
> to
> > > > users
> > > > > > > > > familiar
> > > > > > > > > > with Kafka semantics. Thoughts?
> > > > > > > > > >
> > > > > > > > > > 3. Another little nitpick on the response format - why
> do we
> > > > need
> > > > > > > > > "source"
> > > > > > > > > > / "sink" as a field under "offsets"? Users can query the
> > > > connector
> > > > > > > type
> > > > > > > > > via
> > > > > > > > > > the existing `GET /connectors` API. If it's deemed
> important
> > > > to let
> > > > > > > > users
> > > > > > > > > > know that the offsets they're seeing correspond to a
> source /
> > > > sink
> > > > > > > > > > connector, maybe we could have a top level field "type"
> in
> > > the
> > > > > > > response
> > > > > > > > > for
> > > > > > > > > > the `GET /connectors/{connector}/offsets` API similar to
> the
> > > > `GET
> > > > > > > > > > /connectors` API?
> > > > > > > > > >
> > > > > > > > > > 4. For the `DELETE /connectors/{connector}/offsets` API,
> the
> > > > KIP
> > > > > > > > mentions
> > > > > > > > > > that requests will be rejected if a rebalance is pending
> -
> > > > > > presumably
> > > > > > > > > this
> > > > > > > > > > is to avoid forwarding requests to a leader which may no
> > > > longer be
> > > > > > > the
> > > > > > > > > > leader after the pending rebalance? In this case, the API
> > > will
> > > > > > > return a
> > > > > > > > > > `409 Conflict` response similar to some of the existing
> APIs,
> > > > > > right?
> > > > > > > > > >
> > > > > > > > > > 5. Regarding fencing out previously running tasks for a
> > > > connector,
> > > > > > do
> > > > > > > > you
> > > > > > > > > > think it would make more sense semantically to have this
> > > > > > implemented
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > stop endpoint where an empty set of tasks is generated,
> > > rather
> > > > than
> > > > > > > the
> > > > > > > > > > delete offsets endpoint? This would also give the new
> > > `STOPPED`
> > > > > > > state a
> > > > > > > > > > higher confidence of sorts, with any zombie tasks being
> > > fenced
> > > > off
> > > > > > > from
> > > > > > > > > > continuing to produce data.
> > > > > > > > > >
> > > > > > > > > > 6. Thanks for outlining the issues with the current
> state of
> > > > the
> > > > > > > > `PAUSED`
> > > > > > > > > > state - I think a lot of users expect it to behave like
> the
> > > > > > `STOPPED`
> > > > > > > > > state
> > > > > > > > > > you outline in the KIP and are (unpleasantly) surprised
> when
> > > it
> > > > > > > > doesn't.
> > > > > > > > > > However, this does beg the question of what the
> usefulness of
> > > > > > having
> > > > > > > > two
> > > > > > > > > > separate `PAUSED` and `STOPPED` states is? Do we want to
> > > > continue
> > > > > > > > > > supporting both these states in the future, or do you
> see the
> > > > > > > `STOPPED`
> > > > > > > > > > state eventually causing the existing `PAUSED` state to
> be
> > > > > > > deprecated?
> > > > > > > > > >
> > > > > > > > > > 7. I think the idea outlined in the KIP for handling a
> new
> > > > state
> > > > > > > during
> > > > > > > > > > cluster downgrades / rolling upgrades is quite clever,
> but do
> > > > you
> > > > > > > think
> > > > > > > > > > there could be any issues with having a mix of "paused"
> and
> > > > > > "stopped"
> > > > > > > > > tasks
> > > > > > > > > > for the same connector across workers in a cluster? At
> the
> > > very
> > > > > > > least,
> > > > > > > > I
> > > > > > > > > > think it would be fairly confusing to most users. I'm
> > > > wondering if
> > > > > > > this
> > > > > > > > > can
> > > > > > > > > > be avoided by stating clearly in the KIP that the new
> `PUT
> > > > > > > > > > /connectors/{connector}/stop`
> > > > > > > > > > can only be used on a cluster that is fully upgraded to
> an AK
> > > > > > version
> > > > > > > > > newer
> > > > > > > > > > than the one which ends up containing changes from this
> KIP
> > > and
> > > > > > that
> > > > > > > > if a
> > > > > > > > > > cluster needs to be downgraded to an older version, the
> user
> > > > should
> > > > > > > > > ensure
> > > > > > > > > > that none of the connectors on the cluster are in a
> stopped
> > > > state?
> > > > > > > With
> > > > > > > > > the
> > > > > > > > > > existing implementation, it looks like an unknown/invalid
> > > > target
> > > > > > > state
> > > > > > > > > > record is basically just discarded (with an error message
> > > > logged),
> > > > > > so
> > > > > > > > it
> > > > > > > > > > doesn't seem to be a disastrous failure scenario that can
> > > bring
> > > > > > down
> > > > > > > a
> > > > > > > > > > worker.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Yash
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton
> > > > > > > <chr...@aiven.io.invalid
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Ashwin,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for your thoughts. Regarding your questions:
> > > > > > > > > > >
> > > > > > > > > > > 1. The response would show the offsets that are
> visible to
> > > > the
> > > > > > > source
> > > > > > > > > > > connector, so it would combine the contents of the two
> > > > topics,
> > > > > > > giving
> > > > > > > > > > > priority to offsets present in the connector-specific
> > > topic.
> > > > I'm
> > > > > > > > > > imagining
> > > > > > > > > > > a follow-up question that some people may have in
> response
> > > to
> > > > > > that
> > > > > > > is
> > > > > > > > > > > whether we'd want to provide insight into the contents
> of a
> > > > > > single
> > > > > > > > > topic
> > > > > > > > > > at
> > > > > > > > > > > a time. It may be useful to be able to see this
> information
> > > > in
> > > > > > > order
> > > > > > > > to
> > > > > > > > > > > debug connector issues or verify that it's safe to stop
> > > > using a
> > > > > > > > > > > connector-specific offsets topic (either explicitly, or
> > > > > > implicitly
> > > > > > > > via
> > > > > > > > > > > cluster downgrade). What do you think about adding a
> URL
> > > > query
> > > > > > > > > parameter
> > > > > > > > > > > that allows users to dictate which view of the
> connector's
> > > > > > offsets
> > > > > > > > they
> > > > > > > > > > are
> > > > > > > > > > > given in the REST response, with options for the
> worker's
> > > > global
> > > > > > > > topic,
> > > > > > > > > > the
> > > > > > > > > > > connector-specific topic, and the combined view of them
> > > that
> > > > the
> > > > > > > > > > connector
> > > > > > > > > > > and its tasks see (which would be the default)? This
> may be
> > > > too
> > > > > > > much
> > > > > > > > > for
> > > > > > > > > > V1
> > > > > > > > > > > but it feels like it's at least worth exploring a bit.
> > > > > > > > > > >
> > > > > > > > > > > 2. There is no option for this at the moment. Reset
> > > > semantics are
> > > > > > > > > > extremely
> > > > > > > > > > > coarse-grained; for source connectors, we delete all
> source
> > > > > > > offsets,
> > > > > > > > > and
> > > > > > > > > > > for sink connectors, we delete the entire consumer
> group.
> > > I'm
> > > > > > > hoping
> > > > > > > > > this
> > > > > > > > > > > will be enough for V1 and that, if there's sufficient
> > > demand
> > > > for
> > > > > > > it,
> > > > > > > > we
> > > > > > > > > > can
> > > > > > > > > > > introduce a richer API for resetting or even modifying
> > > > connector
> > > > > > > > > offsets
> > > > > > > > > > in
> > > > > > > > > > > a follow-up KIP.
> > > > > > > > > > >
> > > > > > > > > > > 3. Good eye :) I think it's fine to keep the existing
> > > > behavior
> > > > > > for
> > > > > > > > the
> > > > > > > > > > > PAUSED state with the Connector instance, since the
> primary
> > > > > > purpose
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > Connector is to generate task configs and monitor the
> > > > external
> > > > > > > system
> > > > > > > > > for
> > > > > > > > > > > changes. If there's no chance for tasks to be running
> > > > anyways, I
> > > > > > > > don't
> > > > > > > > > > see
> > > > > > > > > > > much value in allowing paused connectors to generate
> new
> > > task
> > > > > > > > configs,
> > > > > > > > > > > especially since each time that happens a rebalance is
> > > > triggered
> > > > > > > and
> > > > > > > > > > > there's a non-zero cost to that. What do you think?
> > > > > > > > > > >
> > > > > > > > > > > Cheers,
> > > > > > > > > > >
> > > > > > > > > > > Chris
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin
> > > > > > > <apan...@confluent.io.invalid
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks for KIP Chris - I think this is a useful
> feature.
> > > > > > > > > > > >
> > > > > > > > > > > > Can you please elaborate on the following in the KIP
> -
> > > > > > > > > > > >
> > > > > > > > > > > > 1. How would the response of GET
> > > > > > /connectors/{connector}/offsets
> > > > > > > > look
> > > > > > > > > > > like
> > > > > > > > > > > > if the worker has both global and connector specific
> > > > offsets
> > > > > > > topic
> > > > > > > > ?
> > > > > > > > > > > >
> > > > > > > > > > > > 2. How can we pass the reset options like shift-by ,
> > > > > > to-date-time
> > > > > > > > > etc.
> > > > > > > > > > > > using a REST API like DELETE
> > > > /connectors/{connector}/offsets ?
> > > > > > > > > > > >
> > > > > > > > > > > > 3. Today PAUSE operation on a connector invokes its
> stop
> > > > > > method -
> > > > > > > > > will
> > > > > > > > > > > > there be a change here to reduce confusion with the
> new
> > > > > > proposed
> > > > > > > > > > STOPPED
> > > > > > > > > > > > state ?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Ashwin
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton
> > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I noticed a fairly large gap in the first version
> of
> > > > this KIP
> > > > > > > > that
> > > > > > > > > I
> > > > > > > > > > > > > published last Friday, which has to do with
> > > accommodating
> > > > > > > > > connectors
> > > > > > > > > > > > > that target different Kafka clusters than the one
> that
> > > > the
> > > > > > > Kafka
> > > > > > > > > > > Connect
> > > > > > > > > > > > > cluster uses for its internal topics and source
> > > > connectors
> > > > > > with
> > > > > > > > > > > dedicated
> > > > > > > > > > > > > offsets topics. I've since updated the KIP to
> address
> > > > this
> > > > > > gap,
> > > > > > > > > which
> > > > > > > > > > > has
> > > > > > > > > > > > > substantially altered the design. Wanted to give a
> > > > heads-up
> > > > > > to
> > > > > > > > > anyone
> > > > > > > > > > > > > that's already started reviewing.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Chris
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton <
> > > > > > chr...@aiven.io>
> > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I'd like to begin discussion on a KIP to add
> offsets
> > > > > > support
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > Connect REST API:
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > > On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton
> > > <chr...@aiven.io.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Yash,
> > > > > > >
> > > > > > > Thanks again for your thoughts! Responses to ongoing
> discussions
> > > > inline
> > > > > > > (easier to track context than referencing comment numbers):
> > > > > > >
> > > > > > > > However, this then leads me to wonder if we can make that
> > > explicit
> > > > by
> > > > > > > including "connect" or "connector" in the higher level field
> names?
> > > > Or do
> > > > > > > you think this isn't required given that we're talking about a
> > > > Connect
> > > > > > > specific REST API in the first place?
> > > > > > >
> > > > > > > I think "partition" and "offset" are fine as field names but
> I'm
> > > not
> > > > > > hugely
> > > > > > > opposed to adding "connector " as a prefix to them; would be
> > > > interested
> > > > > > in
> > > > > > > others' thoughts.
> > > > > > >
> > > > > > > > I'm not sure I followed why the unresolved writes to the
> config
> > > > topic
> > > > > > > would be an issue - wouldn't the delete offsets request be
> added to
> > > > the
> > > > > > > herder's request queue and whenever it is processed, we'd
> anyway
> > > > need to
> > > > > > > check if all the prerequisites for the request are satisfied?
> > > > > > >
> > > > > > > Some requests are handled in multiple steps. For example,
> deleting
> > > a
> > > > > > > connector (1) adds a request to the herder queue to write a
> > > > tombstone to
> > > > > > > the config topic (or, if the worker isn't the leader, forward
> the
> > > > request
> > > > > > > to the leader). (2) Once that tombstone is picked up, (3) a
> > > rebalance
> > > > > > > ensues, and then after it's finally complete, (4) the
> connector and
> > > > its
> > > > > > > tasks are shut down. I probably could have used better
> terminology,
> > > > but
> > > > > > > what I meant by "unresolved writes to the config topic" was a
> case
> > > in
> > > > > > > between steps (2) and (3)--where the worker has already read
> that
> > > > > > tombstone
> > > > > > > from the config topic and knows that a rebalance is pending,
> but
> > > > hasn't
> > > > > > > begun participating in that rebalance yet. In the
> DistributedHerder
> > > > > > class,
> > > > > > > this is done via the `checkRebalanceNeeded` method.
> > > > > > >
> > > > > > > > We can probably revisit this potential deprecation [of the
> PAUSED
> > > > > > state]
> > > > > > > in the future based on user feedback and how the adoption of
> the
> > > new
> > > > > > > proposed stop endpoint looks like, what do you think?
> > > > > > >
> > > > > > > Yeah, revisiting in the future seems reasonable. 👍
> > > > > > >
> > > > > > > And responses to new comments here:
> > > > > > >
> > > > > > > 8. Yep, we'll start tracking offsets by connector. I don't
> believe
> > > > this
> > > > > > > should be too difficult, and suspect that the only reason we
> track
> > > > raw
> > > > > > byte
> > > > > > > arrays instead of pre-deserializing offset topic information
> into
> > > > > > something
> > > > > > > more useful is because Connect originally had pluggable
> internal
> > > > > > > converters. Now that we're hardcoded to use the JSON converter
> it
> > > > should
> > > > > > be
> > > > > > > fine to track offsets on a per-connector basis as they're read
> from
> > > > the
> > > > > > > offsets topic.
> > > > > > >
> > > > > > > 9. I'm hesitant to introduce this type of feature right now
> because
> > > > of
> > > > > > all
> > > > > > > of the gotchas that would come with it. In security-conscious
> > > > > > environments,
> > > > > > > it's possible that a sink connector's principal may have
> access to
> > > > the
> > > > > > > consumer group used by the connector, but the worker's
> principal
> > > may
> > > > not.
> > > > > > > There's also the case where source connectors have separate
> offsets
> > > > > > topics,
> > > > > > > or sink connectors have overridden consumer group IDs, or sink
> or
> > > > source
> > > > > > > connectors work against a different Kafka cluster than the one
> that
> > > > their
> > > > > > > worker uses. Overall, I'd rather provide a single API that
> works in
> > > > all
> > > > > > > cases rather than risk confusing and alienating users by
> trying to
> > > > make
> > > > > > > their lives easier in a subset of cases.
> > > > > > >
> > > > > > > 10. Hmm... I don't think the order of the writes matters too
> much
> > > > here,
> > > > > > but
> > > > > > > we probably could start by deleting from the global topic
> first,
> > > > that's
> > > > > > > true. The reason I'm not hugely concerned about this case is
> that
> > > if
> > > > > > > something goes wrong while resetting offsets, there's no
> immediate
> > > > > > > impact--the connector will still be in the STOPPED state. The
> REST
> > > > > > response
> > > > > > > for requests to reset the offsets will clearly call out that
> the
> > > > > > operation
> > > > > > > has failed, and if necessary, we can probably also add a
> > > > scary-looking
> > > > > > > warning message stating that we can't guarantee which offsets
> have
> > > > been
> > > > > > > successfully wiped and which haven't. Users can query the exact
> > > > offsets
> > > > > > of
> > > > > > > the connector at this point to determine what will happen
> if/what
> > > > they
> > > > > > > resume it. And they can repeat attempts to reset the offsets as
> > > many
> > > > > > times
> > > > > > > as they'd like until they get back a 2XX response, indicating
> that
> > > > it's
> > > > > > > finally safe to resume the connector. Thoughts?
> > > > > > >
> > > > > > > 11. I haven't thought too much about it. I think something
> like the
> > > > > > > Monitorable* connectors would probably serve our needs here;
> we can
> > > > > > > instantiate them on a running Connect cluster and then use
> various
> > > > > > handles
> > > > > > > to know how many times they've been polled, committed records,
> etc.
> > > > If
> > > > > > > necessary we can tweak those classes or even write our own. But
> > > > anyways,
> > > > > > > once that's all done, the test will be something like "create a
> > > > > > connector,
> > > > > > > wait for it to produce N records (each of which contains some
> kind
> > > of
> > > > > > > predictable offset), and ensure that the offsets for it in the
> REST
> > > > API
> > > > > > > match up with the ones we'd expect from N records". Does that
> > > answer
> > > > your
> > > > > > > question?
> > > > > > >
> > > > > > > Cheers,
> > > > > > >
> > > > > > > Chris
> > > > > > >
> > > > > > > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya <
> yash.ma...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Chris,
> > > > > > > >
> > > > > > > > 1. Thanks a lot for elaborating on this, I'm now convinced
> about
> > > > the
> > > > > > > > usefulness of the new offset reset endpoint. Regarding the
> > > > follow-up
> > > > > > KIP
> > > > > > > > for a fine-grained offset write API, I'd be happy to take
> that on
> > > > once
> > > > > > > this
> > > > > > > > KIP is finalized and I will definitely look forward to your
> > > > feedback on
> > > > > > > > that one!
> > > > > > > >
> > > > > > > > 2. Gotcha, the motivation makes more sense to me now. So the
> > > higher
> > > > > > level
> > > > > > > > partition field represents a Connect specific "logical
> partition"
> > > > of
> > > > > > > sorts
> > > > > > > > - i.e. the source partition as defined by a connector for
> source
> > > > > > > connectors
> > > > > > > > and a Kafka topic + partition for sink connectors. I like the
> > > idea
> > > > of
> > > > > > > > adding a Kafka prefix to the lower level partition/offset
> (and
> > > > topic)
> > > > > > > > fields which basically makes it more clear (although
> implicitly)
> > > > that
> > > > > > the
> > > > > > > > higher level partition/offset field is Connect specific and
> not
> > > the
> > > > > > same
> > > > > > > as
> > > > > > > > what those terms represent in Kafka itself. However, this
> then
> > > > leads me
> > > > > > > to
> > > > > > > > wonder if we can make that explicit by including "connect" or
> > > > > > "connector"
> > > > > > > > in the higher level field names? Or do you think this isn't
> > > > required
> > > > > > > given
> > > > > > > > that we're talking about a Connect specific REST API in the
> first
> > > > > > place?
> > > > > > > >
> > > > > > > > 3. Thanks, I think the response structure definitely looks
> better
> > > > now!
> > > > > > > >
> > > > > > > > 4. Interesting, I'd be curious to learn why we might want to
> > > change
> > > > > > this
> > > > > > > in
> > > > > > > > the future but that's probably out of scope for this
> discussion.
> > > > I'm
> > > > > > not
> > > > > > > > sure I followed why the unresolved writes to the config topic
> > > > would be
> > > > > > an
> > > > > > > > issue - wouldn't the delete offsets request be added to the
> > > > herder's
> > > > > > > > request queue and whenever it is processed, we'd anyway need
> to
> > > > check
> > > > > > if
> > > > > > > > all the prerequisites for the request are satisfied?
> > > > > > > >
> > > > > > > > 5. Thanks for elaborating that just fencing out the producer
> > > still
> > > > > > leaves
> > > > > > > > many cases where source tasks remain hanging around and also
> that
> > > > we
> > > > > > > anyway
> > > > > > > > can't have similar data production guarantees for sink
> connectors
> > > > right
> > > > > > > > now. I agree that it might be better to go with ease of
> > > > implementation
> > > > > > > and
> > > > > > > > consistency for now.
> > > > > > > >
> > > > > > > > 6. Right, that does make sense but I still feel like the two
> > > states
> > > > > > will
> > > > > > > > end up being confusing to end users who might not be able to
> > > > discern
> > > > > > the
> > > > > > > > (fairly low-level) differences between them (also the
> nuances of
> > > > state
> > > > > > > > transitions like STOPPED -> PAUSED or PAUSED -> STOPPED with
> the
> > > > > > > > rebalancing implications as well). We can probably revisit
> this
> > > > > > potential
> > > > > > > > deprecation in the future based on user feedback and how the
> > > > adoption
> > > > > > of
> > > > > > > > the new proposed stop endpoint looks like, what do you think?
> > > > > > > >
> > > > > > > > 7. Aha, that is completely my bad, I missed that the v1/v2
> state
> > > is
> > > > > > only
> > > > > > > > applicable to the connector's target state and that we don't
> need
> > > > to
> > > > > > > worry
> > > > > > > > about the tasks since we will have an empty set of tasks. I
> > > think I
> > > > > > was a
> > > > > > > > little confused by "pause the parts of the connector that
> they
> > > are
> > > > > > > > assigned" from the KIP. Thanks for clarifying that!
> > > > > > > >
> > > > > > > >
> > > > > > > > Some more thoughts and questions that I had -
> > > > > > > >
> > > > > > > > 8. Could you elaborate on what the implementation for offset
> > > reset
> > > > for
> > > > > > > > source connectors would look like? Currently, it doesn't look
> > > like
> > > > we
> > > > > > > track
> > > > > > > > all the partitions for a source connector anywhere. Will we
> need
> > > to
> > > > > > > > book-keep this somewhere in order to be able to emit a
> tombstone
> > > > record
> > > > > > > for
> > > > > > > > each source partition?
> > > > > > > >
> > > > > > > > 9. The KIP describes the offset reset endpoint as only being
> > > > usable on
> > > > > > > > existing connectors that are in a `STOPPED` state. Why
> wouldn't
> > > we
> > > > want
> > > > > > > to
> > > > > > > > allow resetting offsets for a deleted connector which seems
> to
> > > be a
> > > > > > valid
> > > > > > > > use case? Or do we plan to handle this use case only via the
> item
> > > > > > > outlined
> > > > > > > > in the future work section - "Automatically delete offsets
> with
> > > > > > > > connectors"?
> > > > > > > >
> > > > > > > > 10. The KIP mentions that source offsets will be reset
> > > > transactionally
> > > > > > > for
> > > > > > > > each topic (worker global offset topic and connector specific
> > > > offset
> > > > > > > topic
> > > > > > > > if it exists). While it obviously isn't possible to
> atomically do
> > > > the
> > > > > > > > writes to two topics which may be on different Kafka
> clusters,
> > > I'm
> > > > > > > > wondering about what would happen if the first transaction
> > > > succeeds but
> > > > > > > the
> > > > > > > > second one fails. I think the order of the two transactions
> > > matters
> > > > > > here
> > > > > > > -
> > > > > > > > if we successfully emit tombstones to the connector specific
> > > offset
> > > > > > topic
> > > > > > > > and fail to do so for the worker global offset topic, we'll
> > > > presumably
> > > > > > > fail
> > > > > > > > the offset delete request because the KIP mentions that "A
> > > request
> > > > to
> > > > > > > reset
> > > > > > > > offsets for a source connector will only be considered
> successful
> > > > if
> > > > > > the
> > > > > > > > worker is able to delete all known offsets for that
> connector, on
> > > > both
> > > > > > > the
> > > > > > > > worker's global offsets topic and (if one is used) the
> > > connector's
> > > > > > > > dedicated offsets topic.". However, this will lead to the
> > > connector
> > > > > > only
> > > > > > > > being able to read potentially older offsets from the worker
> > > global
> > > > > > > offset
> > > > > > > > topic on resumption (based on the combined offset view
> presented
> > > as
> > > > > > > > described in KIP-618 [1]). So, I think we should make sure
> that
> > > the
> > > > > > > worker
> > > > > > > > global offset topic tombstoning is attempted first, right?
> Note
> > > > that in
> > > > > > > the
> > > > > > > > current implementation of
> `ConnectorOffsetBackingStore::set`, the
> > > > > > > primary /
> > > > > > > > connector specific offset store is written to first.
> > > > > > > >
> > > > > > > > 11. This probably isn't necessary to elaborate on in the KIP
> > > > itself,
> > > > > > but
> > > > > > > I
> > > > > > > > was wondering what the second offset test - "verify that that
> > > those
> > > > > > > offsets
> > > > > > > > reflect an expected level of progress for each connector
> (i.e.,
> > > > they
> > > > > > are
> > > > > > > > greater than or equal to a certain value depending on how the
> > > > > > connectors
> > > > > > > > are configured and how long they have been running)" - would
> look
> > > > like?
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Yash
> > > > > > > >
> > > > > > > > [1] -
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration
> > > > > > > >
> > > > > > > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton
> > > > <chr...@aiven.io.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Yash,
> > > > > > > > >
> > > > > > > > > Thanks for your detailed thoughts.
> > > > > > > > >
> > > > > > > > > 1. In KAFKA-4107 [1], the primary request is exactly what's
> > > > proposed
> > > > > > in
> > > > > > > > the
> > > > > > > > > KIP right now: a way to reset offsets for connectors. Sure,
> > > > there's
> > > > > > an
> > > > > > > > > extra step of stopping the connector, but renaming a
> connector
> > > > isn't
> > > > > > as
> > > > > > > > > convenient of an alternative as it may seem since in many
> cases
> > > > you'd
> > > > > > > > also
> > > > > > > > > want to delete the older one, so the complete sequence of
> steps
> > > > would
> > > > > > > be
> > > > > > > > > something like delete the old connector, rename it
> (possibly
> > > > > > requiring
> > > > > > > > > modifications to its config file, depending on which API is
> > > > used),
> > > > > > then
> > > > > > > > > create the renamed variant. It's also just not a great user
> > > > > > > > > experience--even if the practical impacts are limited
> (which,
> > > > IMO,
> > > > > > they
> > > > > > > > are
> > > > > > > > > not), people have been asking for years about why they
> have to
> > > > employ
> > > > > > > > this
> > > > > > > > > kind of a workaround for a fairly common use case, and we
> don't
> > > > > > really
> > > > > > > > have
> > > > > > > > > a good answer beyond "we haven't implemented something
> better
> > > > yet".
> > > > > > On
> > > > > > > > top
> > > > > > > > > of that, you may have external tooling that needs to be
> tweaked
> > > > to
> > > > > > > > handle a
> > > > > > > > > new connector name, you may have strict authorization
> policies
> > > > around
> > > > > > > who
> > > > > > > > > can access what connectors, you may have other ACLs
> attached to
> > > > the
> > > > > > > name
> > > > > > > > of
> > > > > > > > > the connector (which can be especially common in the case
> of
> > > sink
> > > > > > > > > connectors, whose consumer group IDs are tied to their
> names by
> > > > > > > default),
> > > > > > > > > and leaving around state in the offsets topic that can
> never be
> > > > > > cleaned
> > > > > > > > up
> > > > > > > > > presents a bit of a footgun for users. It may not be a
> silver
> > > > bullet,
> > > > > > > but
> > > > > > > > > providing some mechanism to reset that state is a step in
> the
> > > > right
> > > > > > > > > direction and allows responsible users to more carefully
> > > > administer
> > > > > > > their
> > > > > > > > > cluster without resorting to non-public APIs. That said, I
> do
> > > > agree
> > > > > > > that
> > > > > > > > a
> > > > > > > > > fine-grained reset/overwrite API would be useful, and I'd
> be
> > > > happy to
> > > > > > > > > review a KIP to add that feature if anyone wants to tackle
> it!
> > > > > > > > >
> > > > > > > > > 2. Keeping the two formats symmetrical is motivated mostly
> by
> > > > > > > aesthetics
> > > > > > > > > and quality-of-life for programmatic interaction with the
> API;
> > > > it's
> > > > > > not
> > > > > > > > > really a goal to hide the use of consumer groups from
> users. I
> > > do
> > > > > > agree
> > > > > > > > > that the format is a little strange-looking for sink
> > > connectors,
> > > > but
> > > > > > it
> > > > > > > > > seemed like it would be easier to work with for UIs,
> casual jq
> > > > > > queries,
> > > > > > > > and
> > > > > > > > > CLIs than a more Kafka-specific alternative such as
> > > > > > > > {"<topic>-<partition>":
> > > > > > > > > "<offset>"}, and although it is a little strange, I don't
> think
> > > > it's
> > > > > > > any
> > > > > > > > > less readable or intuitive. That said, I've made some
> tweaks to
> > > > the
> > > > > > > > format
> > > > > > > > > that should make programmatic access even easier;
> specifically,
> > > > I've
> > > > > > > > > removed the "source" and "sink" wrapper fields and instead
> > > moved
> > > > them
> > > > > > > > into
> > > > > > > > > a top-level object with a "type" and "offsets" field, just
> like
> > > > you
> > > > > > > > > suggested in point 3 (thanks!). We might also consider
> changing
> > > > the
> > > > > > > field
> > > > > > > > > names for sink offsets from "topic", "partition", and
> "offset"
> > > to
> > > > > > > "Kafka
> > > > > > > > > topic", "Kafka partition", and "Kafka offset"
> respectively, to
> > > > reduce
> > > > > > > the
> > > > > > > > > stuttering effect of having a "partition" field inside a
> > > > "partition"
> > > > > > > > field
> > > > > > > > > and the same with an "offset" field; thoughts? One final
> > > > point--by
> > > > > > > > equating
> > > > > > > > > source and sink offsets, we probably make it easier for
> users
> > > to
> > > > > > > > understand
> > > > > > > > > exactly what a source offset is; anyone who's familiar with
> > > > consumer
> > > > > > > > > offsets can see from the response format that we identify a
> > > > logical
> > > > > > > > > partition as a combination of two entities (a topic and a
> > > > partition
> > > > > > > > > number); it should make it easier to grok what a source
> offset
> > > > is by
> > > > > > > > seeing
> > > > > > > > > what the two formats have in common.
> > > > > > > > >
> > > > > > > > > 3. Great idea! Done.
> > > > > > > > >
> > > > > > > > > 4. Yes, I'm thinking right now that a 409 will be the
> response
> > > > status
> > > > > > > if
> > > > > > > > a
> > > > > > > > > rebalance is pending. I'd rather not add this to the KIP
> as we
> > > > may
> > > > > > want
> > > > > > > > to
> > > > > > > > > change it at some point and it doesn't seem vital to
> establish
> > > > it as
> > > > > > > part
> > > > > > > > > of the public contract for the new endpoint right now.
> Also,
> > > > small
> > > > > > > > > point--yes, a 409 is useful to avoid forwarding requests
> to an
> > > > > > > incorrect
> > > > > > > > > leader, but it's also useful to ensure that there aren't
> any
> > > > > > unresolved
> > > > > > > > > writes to the config topic that might cause issues with the
> > > > request
> > > > > > > (such
> > > > > > > > > as deleting the connector).
> > > > > > > > >
> > > > > > > > > 5. That's a good point--it may be misleading to call a
> > > connector
> > > > > > > STOPPED
> > > > > > > > > when it has zombie tasks lying around on the cluster. I
> don't
> > > > think
> > > > > > > it'd
> > > > > > > > be
> > > > > > > > > appropriate to do this synchronously while handling
> requests to
> > > > the
> > > > > > PUT
> > > > > > > > > /connectors/{connector}/stop since we'd want to give all
> > > > > > > > currently-running
> > > > > > > > > tasks a chance to gracefully shut down, though. I'm also
> not
> > > sure
> > > > > > that
> > > > > > > > this
> > > > > > > > > is a significant problem, either. If the connector is
> resumed,
> > > > then
> > > > > > all
> > > > > > > > > zombie tasks will be automatically fenced out by their
> > > > successors on
> > > > > > > > > startup; if it's deleted, then we'll have wasted effort by
> > > > performing
> > > > > > > an
> > > > > > > > > unnecessary round of fencing. It may be nice to guarantee
> that
> > > > source
> > > > > > > > task
> > > > > > > > > resources will be deallocated after the connector
> transitions
> > > to
> > > > > > > STOPPED,
> > > > > > > > > but realistically, it doesn't do much to just fence out
> their
> > > > > > > producers,
> > > > > > > > > since tasks can be blocked on a number of other operations
> such
> > > > as
> > > > > > > > > key/value/header conversion, transformation, and task
> polling.
> > > > It may
> > > > > > > be
> > > > > > > > a
> > > > > > > > > little strange if data is produced to Kafka after the
> connector
> > > > has
> > > > > > > > > transitioned to STOPPED, but we can't provide the same
> > > > guarantees for
> > > > > > > > sink
> > > > > > > > > connectors, since their tasks may be stuck on a
> long-running
> > > > > > > > SinkTask::put
> > > > > > > > > that emits data even after the Connect framework has
> abandoned
> > > > them
> > > > > > > after
> > > > > > > > > exhausting their graceful shutdown timeout. Ultimately, I'd
> > > > prefer to
> > > > > > > err
> > > > > > > > > on the side of consistency and ease of implementation for
> now,
> > > > but I
> > > > > > > may
> > > > > > > > be
> > > > > > > > > missing a case where a few extra records from a task that's
> > > slow
> > > > to
> > > > > > > shut
> > > > > > > > > down may cause serious issues--let me know.
> > > > > > > > >
> > > > > > > > > 6. I'm hesitant to propose deprecation of the PAUSED state
> > > right
> > > > now
> > > > > > as
> > > > > > > > it
> > > > > > > > > does serve a few purposes. Leaving tasks idling-but-ready
> makes
> > > > > > > resuming
> > > > > > > > > them less disruptive across the cluster, since a rebalance
> > > isn't
> > > > > > > > necessary.
> > > > > > > > > It also reduces latency to resume the connector,
> especially for
> > > > ones
> > > > > > > that
> > > > > > > > > have to do a lot of state gathering on initialization to,
> e.g.,
> > > > read
> > > > > > > > > offsets from an external system.
> > > > > > > > >
> > > > > > > > > 7. There should be no risk of mixed tasks after a
> downgrade,
> > > > thanks
> > > > > > to
> > > > > > > > the
> > > > > > > > > empty set of task configs that gets published to the config
> > > > topic.
> > > > > > Both
> > > > > > > > > upgraded and downgraded workers will render an empty set of
> > > > tasks for
> > > > > > > the
> > > > > > > > > connector, and keep that set of empty tasks until the
> connector
> > > > is
> > > > > > > > resumed.
> > > > > > > > > Does that address your concerns?
> > > > > > > > >
> > > > > > > > > You're also correct that the linked Jira ticket was wrong;
> > > > thanks for
> > > > > > > > > pointing that out! Yes, KAFKA-4107 is the intended ticket,
> and
> > > > I've
> > > > > > > > updated
> > > > > > > > > the link in the KIP accordingly.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > >
> > > > > > > > > Chris
> > > > > > > > >
> > > > > > > > > [1] - https://issues.apache.org/jira/browse/KAFKA-4107
> > > > > > > > >
> > > > > > > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya <
> > > > yash.ma...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Chris,
> > > > > > > > > >
> > > > > > > > > > Thanks a lot for this KIP, I think something like this
> has
> > > been
> > > > > > long
> > > > > > > > > > overdue for Kafka Connect :)
> > > > > > > > > >
> > > > > > > > > > Some thoughts and questions that I had -
> > > > > > > > > >
> > > > > > > > > > 1. I'm wondering if you could elaborate a little more on
> the
> > > > use
> > > > > > case
> > > > > > > > for
> > > > > > > > > > the `DELETE /connectors/{connector}/offsets` API. I
> think we
> > > > can
> > > > > > all
> > > > > > > > > agree
> > > > > > > > > > that a fine grained reset API that allows setting
> arbitrary
> > > > offsets
> > > > > > > for
> > > > > > > > > > partitions would be quite useful (which you talk about
> in the
> > > > > > Future
> > > > > > > > work
> > > > > > > > > > section). But for the `DELETE
> > > /connectors/{connector}/offsets`
> > > > API
> > > > > > in
> > > > > > > > its
> > > > > > > > > > described form, it looks like it would only serve a
> seemingly
> > > > niche
> > > > > > > use
> > > > > > > > > > case where users want to avoid renaming connectors -
> because
> > > > this
> > > > > > new
> > > > > > > > way
> > > > > > > > > > of resetting offsets actually has more steps (i.e. stop
> the
> > > > > > > connector,
> > > > > > > > > > reset offsets via the API, resume the connector) than
> simply
> > > > > > deleting
> > > > > > > > and
> > > > > > > > > > re-creating the connector with a different name?
> > > > > > > > > >
> > > > > > > > > > 2. The KIP talks about taking care that the response
> formats
> > > > > > > > (presumably
> > > > > > > > > > only talking about the new GET API here) are symmetrical
> for
> > > > both
> > > > > > > > source
> > > > > > > > > > and sink connectors - is the end goal to have users of
> Kafka
> > > > > > Connect
> > > > > > > > not
> > > > > > > > > > even be aware that sink connectors use Kafka consumers
> under
> > > > the
> > > > > > hood
> > > > > > > > > (i.e.
> > > > > > > > > > have that as purely an implementation detail abstracted
> away
> > > > from
> > > > > > > > users)?
> > > > > > > > > > While I understand the value of uniformity here, the
> response
> > > > > > format
> > > > > > > > for
> > > > > > > > > > sink connectors currently looks a little odd with the
> > > > "partition"
> > > > > > > field
> > > > > > > > > > having "topic" and "partition" as sub-fields, especially
> to
> > > > users
> > > > > > > > > familiar
> > > > > > > > > > with Kafka semantics. Thoughts?
> > > > > > > > > >
> > > > > > > > > > 3. Another little nitpick on the response format - why
> do we
> > > > need
> > > > > > > > > "source"
> > > > > > > > > > / "sink" as a field under "offsets"? Users can query the
> > > > connector
> > > > > > > type
> > > > > > > > > via
> > > > > > > > > > the existing `GET /connectors` API. If it's deemed
> important
> > > > to let
> > > > > > > > users
> > > > > > > > > > know that the offsets they're seeing correspond to a
> source /
> > > > sink
> > > > > > > > > > connector, maybe we could have a top level field "type"
> in
> > > the
> > > > > > > response
> > > > > > > > > for
> > > > > > > > > > the `GET /connectors/{connector}/offsets` API similar to
> the
> > > > `GET
> > > > > > > > > > /connectors` API?
> > > > > > > > > >
> > > > > > > > > > 4. For the `DELETE /connectors/{connector}/offsets` API,
> the
> > > > KIP
> > > > > > > > mentions
> > > > > > > > > > that requests will be rejected if a rebalance is pending
> -
> > > > > > presumably
> > > > > > > > > this
> > > > > > > > > > is to avoid forwarding requests to a leader which may no
> > > > longer be
> > > > > > > the
> > > > > > > > > > leader after the pending rebalance? In this case, the API
> > > will
> > > > > > > return a
> > > > > > > > > > `409 Conflict` response similar to some of the existing
> APIs,
> > > > > > right?
> > > > > > > > > >
> > > > > > > > > > 5. Regarding fencing out previously running tasks for a
> > > > connector,
> > > > > > do
> > > > > > > > you
> > > > > > > > > > think it would make more sense semantically to have this
> > > > > > implemented
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > stop endpoint where an empty set of tasks is generated,
> > > rather
> > > > than
> > > > > > > the
> > > > > > > > > > delete offsets endpoint? This would also give the new
> > > `STOPPED`
> > > > > > > state a
> > > > > > > > > > higher confidence of sorts, with any zombie tasks being
> > > fenced
> > > > off
> > > > > > > from
> > > > > > > > > > continuing to produce data.
> > > > > > > > > >
> > > > > > > > > > 6. Thanks for outlining the issues with the current
> state of
> > > > the
> > > > > > > > `PAUSED`
> > > > > > > > > > state - I think a lot of users expect it to behave like
> the
> > > > > > `STOPPED`
> > > > > > > > > state
> > > > > > > > > > you outline in the KIP and are (unpleasantly) surprised
> when
> > > it
> > > > > > > > doesn't.
> > > > > > > > > > However, this does beg the question of what the
> usefulness of
> > > > > > having
> > > > > > > > two
> > > > > > > > > > separate `PAUSED` and `STOPPED` states is? Do we want to
> > > > continue
> > > > > > > > > > supporting both these states in the future, or do you
> see the
> > > > > > > `STOPPED`
> > > > > > > > > > state eventually causing the existing `PAUSED` state to
> be
> > > > > > > deprecated?
> > > > > > > > > >
> > > > > > > > > > 7. I think the idea outlined in the KIP for handling a
> new
> > > > state
> > > > > > > during
> > > > > > > > > > cluster downgrades / rolling upgrades is quite clever,
> but do
> > > > you
> > > > > > > think
> > > > > > > > > > there could be any issues with having a mix of "paused"
> and
> > > > > > "stopped"
> > > > > > > > > tasks
> > > > > > > > > > for the same connector across workers in a cluster? At
> the
> > > very
> > > > > > > least,
> > > > > > > > I
> > > > > > > > > > think it would be fairly confusing to most users. I'm
> > > > wondering if
> > > > > > > this
> > > > > > > > > can
> > > > > > > > > > be avoided by stating clearly in the KIP that the new
> `PUT
> > > > > > > > > > /connectors/{connector}/stop`
> > > > > > > > > > can only be used on a cluster that is fully upgraded to
> an AK
> > > > > > version
> > > > > > > > > newer
> > > > > > > > > > than the one which ends up containing changes from this
> KIP
> > > and
> > > > > > that
> > > > > > > > if a
> > > > > > > > > > cluster needs to be downgraded to an older version, the
> user
> > > > should
> > > > > > > > > ensure
> > > > > > > > > > that none of the connectors on the cluster are in a
> stopped
> > > > state?
> > > > > > > With
> > > > > > > > > the
> > > > > > > > > > existing implementation, it looks like an unknown/invalid
> > > > target
> > > > > > > state
> > > > > > > > > > record is basically just discarded (with an error message
> > > > logged),
> > > > > > so
> > > > > > > > it
> > > > > > > > > > doesn't seem to be a disastrous failure scenario that can
> > > bring
> > > > > > down
> > > > > > > a
> > > > > > > > > > worker.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Yash
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton
> > > > > > > <chr...@aiven.io.invalid
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Ashwin,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for your thoughts. Regarding your questions:
> > > > > > > > > > >
> > > > > > > > > > > 1. The response would show the offsets that are
> visible to
> > > > the
> > > > > > > source
> > > > > > > > > > > connector, so it would combine the contents of the two
> > > > topics,
> > > > > > > giving
> > > > > > > > > > > priority to offsets present in the connector-specific
> > > topic.
> > > > I'm
> > > > > > > > > > imagining
> > > > > > > > > > > a follow-up question that some people may have in
> response
> > > to
> > > > > > that
> > > > > > > is
> > > > > > > > > > > whether we'd want to provide insight into the contents
> of a
> > > > > > single
> > > > > > > > > topic
> > > > > > > > > > at
> > > > > > > > > > > a time. It may be useful to be able to see this
> information
> > > > in
> > > > > > > order
> > > > > > > > to
> > > > > > > > > > > debug connector issues or verify that it's safe to stop
> > > > using a
> > > > > > > > > > > connector-specific offsets topic (either explicitly, or
> > > > > > implicitly
> > > > > > > > via
> > > > > > > > > > > cluster downgrade). What do you think about adding a
> URL
> > > > query
> > > > > > > > > parameter
> > > > > > > > > > > that allows users to dictate which view of the
> connector's
> > > > > > offsets
> > > > > > > > they
> > > > > > > > > > are
> > > > > > > > > > > given in the REST response, with options for the
> worker's
> > > > global
> > > > > > > > topic,
> > > > > > > > > > the
> > > > > > > > > > > connector-specific topic, and the combined view of them
> > > that
> > > > the
> > > > > > > > > > connector
> > > > > > > > > > > and its tasks see (which would be the default)? This
> may be
> > > > too
> > > > > > > much
> > > > > > > > > for
> > > > > > > > > > V1
> > > > > > > > > > > but it feels like it's at least worth exploring a bit.
> > > > > > > > > > >
> > > > > > > > > > > 2. There is no option for this at the moment. Reset
> > > > semantics are
> > > > > > > > > > extremely
> > > > > > > > > > > coarse-grained; for source connectors, we delete all
> source
> > > > > > > offsets,
> > > > > > > > > and
> > > > > > > > > > > for sink connectors, we delete the entire consumer
> group.
> > > I'm
> > > > > > > hoping
> > > > > > > > > this
> > > > > > > > > > > will be enough for V1 and that, if there's sufficient
> > > demand
> > > > for
> > > > > > > it,
> > > > > > > > we
> > > > > > > > > > can
> > > > > > > > > > > introduce a richer API for resetting or even modifying
> > > > connector
> > > > > > > > > offsets
> > > > > > > > > > in
> > > > > > > > > > > a follow-up KIP.
> > > > > > > > > > >
> > > > > > > > > > > 3. Good eye :) I think it's fine to keep the existing
> > > > behavior
> > > > > > for
> > > > > > > > the
> > > > > > > > > > > PAUSED state with the Connector instance, since the
> primary
> > > > > > purpose
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > Connector is to generate task configs and monitor the
> > > > external
> > > > > > > system
> > > > > > > > > for
> > > > > > > > > > > changes. If there's no chance for tasks to be running
> > > > anyways, I
> > > > > > > > don't
> > > > > > > > > > see
> > > > > > > > > > > much value in allowing paused connectors to generate
> new
> > > task
> > > > > > > > configs,
> > > > > > > > > > > especially since each time that happens a rebalance is
> > > > triggered
> > > > > > > and
> > > > > > > > > > > there's a non-zero cost to that. What do you think?
> > > > > > > > > > >
> > > > > > > > > > > Cheers,
> > > > > > > > > > >
> > > > > > > > > > > Chris
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin
> > > > > > > <apan...@confluent.io.invalid
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks for KIP Chris - I think this is a useful
> feature.
> > > > > > > > > > > >
> > > > > > > > > > > > Can you please elaborate on the following in the KIP
> -
> > > > > > > > > > > >
> > > > > > > > > > > > 1. How would the response of GET
> > > > > > /connectors/{connector}/offsets
> > > > > > > > look
> > > > > > > > > > > like
> > > > > > > > > > > > if the worker has both global and connector specific
> > > > offsets
> > > > > > > topic
> > > > > > > > ?
> > > > > > > > > > > >
> > > > > > > > > > > > 2. How can we pass the reset options like shift-by ,
> > > > > > to-date-time
> > > > > > > > > etc.
> > > > > > > > > > > > using a REST API like DELETE
> > > > /connectors/{connector}/offsets ?
> > > > > > > > > > > >
> > > > > > > > > > > > 3. Today PAUSE operation on a connector invokes its
> stop
> > > > > > method -
> > > > > > > > > will
> > > > > > > > > > > > there be a change here to reduce confusion with the
> new
> > > > > > proposed
> > > > > > > > > > STOPPED
> > > > > > > > > > > > state ?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Ashwin
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton
> > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I noticed a fairly large gap in the first version
> of
> > > > this KIP
> > > > > > > > that
> > > > > > > > > I
> > > > > > > > > > > > > published last Friday, which has to do with
> > > accommodating
> > > > > > > > > connectors
> > > > > > > > > > > > > that target different Kafka clusters than the one
> that
> > > > the
> > > > > > > Kafka
> > > > > > > > > > > Connect
> > > > > > > > > > > > > cluster uses for its internal topics and source
> > > > connectors
> > > > > > with
> > > > > > > > > > > dedicated
> > > > > > > > > > > > > offsets topics. I've since updated the KIP to
> address
> > > > this
> > > > > > gap,
> > > > > > > > > which
> > > > > > > > > > > has
> > > > > > > > > > > > > substantially altered the design. Wanted to give a
> > > > heads-up
> > > > > > to
> > > > > > > > > anyone
> > > > > > > > > > > > > that's already started reviewing.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Chris
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton <
> > > > > > chr...@aiven.io>
> > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I'd like to begin discussion on a KIP to add
> offsets
> > > > > > support
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > Connect REST API:
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
>

Reply via email to