Hi Yash,

Good question! This is actually a subtle source of asymmetry in the current
proposal. Requests to delete a consumer group with active members will
fail, so if there are zombie sink tasks that are still communicating with
Kafka, offset reset requests for that connector will also fail. It is
possible to use an admin client to remove all active members from the group
and then delete the group. However, this solution isn't as complete as the
zombie fencing that we can perform for exactly-once source tasks, since
removing consumers from a group doesn't prevent them from immediately
rejoining the group, which would either cause the group deletion request to
fail (if they rejoin before the group is deleted), or recreate the group
(if they rejoin after the group is deleted).

For ease of implementation, I'd prefer to leave the asymmetry in the API
for now and fail fast and clearly if there are still consumers active in
the sink connector's group. We can try to detect this case and provide a
helpful error message to the user explaining why the offset reset request
has failed and some steps they can take to try to resolve things (wait for
slow task shutdown to complete, restart zombie workers and/or workers with
blocked tasks on them). In the future we can possibly even revisit KIP-611
[1] or something like it to provide better insight into zombie tasks on a
worker so that it's easier to find which tasks have been abandoned but are
still running.

Let me know what you think; this is an important point to call out and if
we can reach some consensus on how to handle sink connector offset resets
w/r/t zombie tasks, I'll update the KIP with the details.

[1] -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks

Cheers,

Chris

On Tue, Nov 8, 2022 at 8:00 AM Yash Mayya <yash.ma...@gmail.com> wrote:

> Hi Chris,
>
> Thanks for the response and the explanations, I think you've answered
> pretty much all the questions I had meticulously!
>
>
> > if something goes wrong while resetting offsets, there's no
> > immediate impact--the connector will still be in the STOPPED
> >  state. The REST response for requests to reset the offsets
> > will clearly call out that the operation has failed, and if necessary,
> > we can probably also add a scary-looking warning message
> > stating that we can't guarantee which offsets have been successfully
> >  wiped and which haven't. Users can query the exact offsets of
> > the connector at this point to determine what will happen if/what they
> > resume it. And they can repeat attempts to reset the offsets as many
> >  times as they'd like until they get back a 2XX response, indicating
> > that it's finally safe to resume the connector. Thoughts?
>
> Yeah, I agree, the case that I mentioned earlier where a user would try to
> resume a stopped connector after a failed offset reset attempt without
> knowing that the offset reset attempt didn't fail cleanly is probably just
> an extreme edge case. I think as long as the response is verbose enough and
> self explanatory, we should be fine.
>
> Another question that I had was behavior w.r.t sink connector offset resets
> when there are zombie tasks/workers in the Connect cluster - the KIP
> mentions that for sink connectors offset resets will be done by deleting
> the consumer group. However, if there are zombie tasks which are still able
> to communicate with the Kafka cluster that the sink connector is consuming
> from, I think the consumer group will automatically get re-created and the
> zombie task may be able to commit offsets for the partitions that it is
> consuming from?
>
> Thanks,
> Yash
>
>
> On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
> > Hi Yash,
> >
> > Thanks again for your thoughts! Responses to ongoing discussions inline
> > (easier to track context than referencing comment numbers):
> >
> > > However, this then leads me to wonder if we can make that explicit by
> > including "connect" or "connector" in the higher level field names? Or do
> > you think this isn't required given that we're talking about a Connect
> > specific REST API in the first place?
> >
> > I think "partition" and "offset" are fine as field names but I'm not
> hugely
> > opposed to adding "connector " as a prefix to them; would be interested
> in
> > others' thoughts.
> >
> > > I'm not sure I followed why the unresolved writes to the config topic
> > would be an issue - wouldn't the delete offsets request be added to the
> > herder's request queue and whenever it is processed, we'd anyway need to
> > check if all the prerequisites for the request are satisfied?
> >
> > Some requests are handled in multiple steps. For example, deleting a
> > connector (1) adds a request to the herder queue to write a tombstone to
> > the config topic (or, if the worker isn't the leader, forward the request
> > to the leader). (2) Once that tombstone is picked up, (3) a rebalance
> > ensues, and then after it's finally complete, (4) the connector and its
> > tasks are shut down. I probably could have used better terminology, but
> > what I meant by "unresolved writes to the config topic" was a case in
> > between steps (2) and (3)--where the worker has already read that
> tombstone
> > from the config topic and knows that a rebalance is pending, but hasn't
> > begun participating in that rebalance yet. In the DistributedHerder
> class,
> > this is done via the `checkRebalanceNeeded` method.
> >
> > > We can probably revisit this potential deprecation [of the PAUSED
> state]
> > in the future based on user feedback and how the adoption of the new
> > proposed stop endpoint looks like, what do you think?
> >
> > Yeah, revisiting in the future seems reasonable. 👍
> >
> > And responses to new comments here:
> >
> > 8. Yep, we'll start tracking offsets by connector. I don't believe this
> > should be too difficult, and suspect that the only reason we track raw
> byte
> > arrays instead of pre-deserializing offset topic information into
> something
> > more useful is because Connect originally had pluggable internal
> > converters. Now that we're hardcoded to use the JSON converter it should
> be
> > fine to track offsets on a per-connector basis as they're read from the
> > offsets topic.
> >
> > 9. I'm hesitant to introduce this type of feature right now because of
> all
> > of the gotchas that would come with it. In security-conscious
> environments,
> > it's possible that a sink connector's principal may have access to the
> > consumer group used by the connector, but the worker's principal may not.
> > There's also the case where source connectors have separate offsets
> topics,
> > or sink connectors have overridden consumer group IDs, or sink or source
> > connectors work against a different Kafka cluster than the one that their
> > worker uses. Overall, I'd rather provide a single API that works in all
> > cases rather than risk confusing and alienating users by trying to make
> > their lives easier in a subset of cases.
> >
> > 10. Hmm... I don't think the order of the writes matters too much here,
> but
> > we probably could start by deleting from the global topic first, that's
> > true. The reason I'm not hugely concerned about this case is that if
> > something goes wrong while resetting offsets, there's no immediate
> > impact--the connector will still be in the STOPPED state. The REST
> response
> > for requests to reset the offsets will clearly call out that the
> operation
> > has failed, and if necessary, we can probably also add a scary-looking
> > warning message stating that we can't guarantee which offsets have been
> > successfully wiped and which haven't. Users can query the exact offsets
> of
> > the connector at this point to determine what will happen if/what they
> > resume it. And they can repeat attempts to reset the offsets as many
> times
> > as they'd like until they get back a 2XX response, indicating that it's
> > finally safe to resume the connector. Thoughts?
> >
> > 11. I haven't thought too much about it. I think something like the
> > Monitorable* connectors would probably serve our needs here; we can
> > instantiate them on a running Connect cluster and then use various
> handles
> > to know how many times they've been polled, committed records, etc. If
> > necessary we can tweak those classes or even write our own. But anyways,
> > once that's all done, the test will be something like "create a
> connector,
> > wait for it to produce N records (each of which contains some kind of
> > predictable offset), and ensure that the offsets for it in the REST API
> > match up with the ones we'd expect from N records". Does that answer your
> > question?
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya <yash.ma...@gmail.com> wrote:
> >
> > > Hi Chris,
> > >
> > > 1. Thanks a lot for elaborating on this, I'm now convinced about the
> > > usefulness of the new offset reset endpoint. Regarding the follow-up
> KIP
> > > for a fine-grained offset write API, I'd be happy to take that on once
> > this
> > > KIP is finalized and I will definitely look forward to your feedback on
> > > that one!
> > >
> > > 2. Gotcha, the motivation makes more sense to me now. So the higher
> level
> > > partition field represents a Connect specific "logical partition" of
> > sorts
> > > - i.e. the source partition as defined by a connector for source
> > connectors
> > > and a Kafka topic + partition for sink connectors. I like the idea of
> > > adding a Kafka prefix to the lower level partition/offset (and topic)
> > > fields which basically makes it more clear (although implicitly) that
> the
> > > higher level partition/offset field is Connect specific and not the
> same
> > as
> > > what those terms represent in Kafka itself. However, this then leads me
> > to
> > > wonder if we can make that explicit by including "connect" or
> "connector"
> > > in the higher level field names? Or do you think this isn't required
> > given
> > > that we're talking about a Connect specific REST API in the first
> place?
> > >
> > > 3. Thanks, I think the response structure definitely looks better now!
> > >
> > > 4. Interesting, I'd be curious to learn why we might want to change
> this
> > in
> > > the future but that's probably out of scope for this discussion. I'm
> not
> > > sure I followed why the unresolved writes to the config topic would be
> an
> > > issue - wouldn't the delete offsets request be added to the herder's
> > > request queue and whenever it is processed, we'd anyway need to check
> if
> > > all the prerequisites for the request are satisfied?
> > >
> > > 5. Thanks for elaborating that just fencing out the producer still
> leaves
> > > many cases where source tasks remain hanging around and also that we
> > anyway
> > > can't have similar data production guarantees for sink connectors right
> > > now. I agree that it might be better to go with ease of implementation
> > and
> > > consistency for now.
> > >
> > > 6. Right, that does make sense but I still feel like the two states
> will
> > > end up being confusing to end users who might not be able to discern
> the
> > > (fairly low-level) differences between them (also the nuances of state
> > > transitions like STOPPED -> PAUSED or PAUSED -> STOPPED with the
> > > rebalancing implications as well). We can probably revisit this
> potential
> > > deprecation in the future based on user feedback and how the adoption
> of
> > > the new proposed stop endpoint looks like, what do you think?
> > >
> > > 7. Aha, that is completely my bad, I missed that the v1/v2 state is
> only
> > > applicable to the connector's target state and that we don't need to
> > worry
> > > about the tasks since we will have an empty set of tasks. I think I
> was a
> > > little confused by "pause the parts of the connector that they are
> > > assigned" from the KIP. Thanks for clarifying that!
> > >
> > >
> > > Some more thoughts and questions that I had -
> > >
> > > 8. Could you elaborate on what the implementation for offset reset for
> > > source connectors would look like? Currently, it doesn't look like we
> > track
> > > all the partitions for a source connector anywhere. Will we need to
> > > book-keep this somewhere in order to be able to emit a tombstone record
> > for
> > > each source partition?
> > >
> > > 9. The KIP describes the offset reset endpoint as only being usable on
> > > existing connectors that are in a `STOPPED` state. Why wouldn't we want
> > to
> > > allow resetting offsets for a deleted connector which seems to be a
> valid
> > > use case? Or do we plan to handle this use case only via the item
> > outlined
> > > in the future work section - "Automatically delete offsets with
> > > connectors"?
> > >
> > > 10. The KIP mentions that source offsets will be reset transactionally
> > for
> > > each topic (worker global offset topic and connector specific offset
> > topic
> > > if it exists). While it obviously isn't possible to atomically do the
> > > writes to two topics which may be on different Kafka clusters, I'm
> > > wondering about what would happen if the first transaction succeeds but
> > the
> > > second one fails. I think the order of the two transactions matters
> here
> > -
> > > if we successfully emit tombstones to the connector specific offset
> topic
> > > and fail to do so for the worker global offset topic, we'll presumably
> > fail
> > > the offset delete request because the KIP mentions that "A request to
> > reset
> > > offsets for a source connector will only be considered successful if
> the
> > > worker is able to delete all known offsets for that connector, on both
> > the
> > > worker's global offsets topic and (if one is used) the connector's
> > > dedicated offsets topic.". However, this will lead to the connector
> only
> > > being able to read potentially older offsets from the worker global
> > offset
> > > topic on resumption (based on the combined offset view presented as
> > > described in KIP-618 [1]). So, I think we should make sure that the
> > worker
> > > global offset topic tombstoning is attempted first, right? Note that in
> > the
> > > current implementation of `ConnectorOffsetBackingStore::set`, the
> > primary /
> > > connector specific offset store is written to first.
> > >
> > > 11. This probably isn't necessary to elaborate on in the KIP itself,
> but
> > I
> > > was wondering what the second offset test - "verify that that those
> > offsets
> > > reflect an expected level of progress for each connector (i.e., they
> are
> > > greater than or equal to a certain value depending on how the
> connectors
> > > are configured and how long they have been running)" - would look like?
> > >
> > >
> > > Thanks,
> > > Yash
> > >
> > > [1] -
> > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration
> > >
> > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton <chr...@aiven.io.invalid
> >
> > > wrote:
> > >
> > > > Hi Yash,
> > > >
> > > > Thanks for your detailed thoughts.
> > > >
> > > > 1. In KAFKA-4107 [1], the primary request is exactly what's proposed
> in
> > > the
> > > > KIP right now: a way to reset offsets for connectors. Sure, there's
> an
> > > > extra step of stopping the connector, but renaming a connector isn't
> as
> > > > convenient of an alternative as it may seem since in many cases you'd
> > > also
> > > > want to delete the older one, so the complete sequence of steps would
> > be
> > > > something like delete the old connector, rename it (possibly
> requiring
> > > > modifications to its config file, depending on which API is used),
> then
> > > > create the renamed variant. It's also just not a great user
> > > > experience--even if the practical impacts are limited (which, IMO,
> they
> > > are
> > > > not), people have been asking for years about why they have to employ
> > > this
> > > > kind of a workaround for a fairly common use case, and we don't
> really
> > > have
> > > > a good answer beyond "we haven't implemented something better yet".
> On
> > > top
> > > > of that, you may have external tooling that needs to be tweaked to
> > > handle a
> > > > new connector name, you may have strict authorization policies around
> > who
> > > > can access what connectors, you may have other ACLs attached to the
> > name
> > > of
> > > > the connector (which can be especially common in the case of sink
> > > > connectors, whose consumer group IDs are tied to their names by
> > default),
> > > > and leaving around state in the offsets topic that can never be
> cleaned
> > > up
> > > > presents a bit of a footgun for users. It may not be a silver bullet,
> > but
> > > > providing some mechanism to reset that state is a step in the right
> > > > direction and allows responsible users to more carefully administer
> > their
> > > > cluster without resorting to non-public APIs. That said, I do agree
> > that
> > > a
> > > > fine-grained reset/overwrite API would be useful, and I'd be happy to
> > > > review a KIP to add that feature if anyone wants to tackle it!
> > > >
> > > > 2. Keeping the two formats symmetrical is motivated mostly by
> > aesthetics
> > > > and quality-of-life for programmatic interaction with the API; it's
> not
> > > > really a goal to hide the use of consumer groups from users. I do
> agree
> > > > that the format is a little strange-looking for sink connectors, but
> it
> > > > seemed like it would be easier to work with for UIs, casual jq
> queries,
> > > and
> > > > CLIs than a more Kafka-specific alternative such as
> > > {"<topic>-<partition>":
> > > > "<offset>"}, and although it is a little strange, I don't think it's
> > any
> > > > less readable or intuitive. That said, I've made some tweaks to the
> > > format
> > > > that should make programmatic access even easier; specifically, I've
> > > > removed the "source" and "sink" wrapper fields and instead moved them
> > > into
> > > > a top-level object with a "type" and "offsets" field, just like you
> > > > suggested in point 3 (thanks!). We might also consider changing the
> > field
> > > > names for sink offsets from "topic", "partition", and "offset" to
> > "Kafka
> > > > topic", "Kafka partition", and "Kafka offset" respectively, to reduce
> > the
> > > > stuttering effect of having a "partition" field inside a "partition"
> > > field
> > > > and the same with an "offset" field; thoughts? One final point--by
> > > equating
> > > > source and sink offsets, we probably make it easier for users to
> > > understand
> > > > exactly what a source offset is; anyone who's familiar with consumer
> > > > offsets can see from the response format that we identify a logical
> > > > partition as a combination of two entities (a topic and a partition
> > > > number); it should make it easier to grok what a source offset is by
> > > seeing
> > > > what the two formats have in common.
> > > >
> > > > 3. Great idea! Done.
> > > >
> > > > 4. Yes, I'm thinking right now that a 409 will be the response status
> > if
> > > a
> > > > rebalance is pending. I'd rather not add this to the KIP as we may
> want
> > > to
> > > > change it at some point and it doesn't seem vital to establish it as
> > part
> > > > of the public contract for the new endpoint right now. Also, small
> > > > point--yes, a 409 is useful to avoid forwarding requests to an
> > incorrect
> > > > leader, but it's also useful to ensure that there aren't any
> unresolved
> > > > writes to the config topic that might cause issues with the request
> > (such
> > > > as deleting the connector).
> > > >
> > > > 5. That's a good point--it may be misleading to call a connector
> > STOPPED
> > > > when it has zombie tasks lying around on the cluster. I don't think
> > it'd
> > > be
> > > > appropriate to do this synchronously while handling requests to the
> PUT
> > > > /connectors/{connector}/stop since we'd want to give all
> > > currently-running
> > > > tasks a chance to gracefully shut down, though. I'm also not sure
> that
> > > this
> > > > is a significant problem, either. If the connector is resumed, then
> all
> > > > zombie tasks will be automatically fenced out by their successors on
> > > > startup; if it's deleted, then we'll have wasted effort by performing
> > an
> > > > unnecessary round of fencing. It may be nice to guarantee that source
> > > task
> > > > resources will be deallocated after the connector transitions to
> > STOPPED,
> > > > but realistically, it doesn't do much to just fence out their
> > producers,
> > > > since tasks can be blocked on a number of other operations such as
> > > > key/value/header conversion, transformation, and task polling. It may
> > be
> > > a
> > > > little strange if data is produced to Kafka after the connector has
> > > > transitioned to STOPPED, but we can't provide the same guarantees for
> > > sink
> > > > connectors, since their tasks may be stuck on a long-running
> > > SinkTask::put
> > > > that emits data even after the Connect framework has abandoned them
> > after
> > > > exhausting their graceful shutdown timeout. Ultimately, I'd prefer to
> > err
> > > > on the side of consistency and ease of implementation for now, but I
> > may
> > > be
> > > > missing a case where a few extra records from a task that's slow to
> > shut
> > > > down may cause serious issues--let me know.
> > > >
> > > > 6. I'm hesitant to propose deprecation of the PAUSED state right now
> as
> > > it
> > > > does serve a few purposes. Leaving tasks idling-but-ready makes
> > resuming
> > > > them less disruptive across the cluster, since a rebalance isn't
> > > necessary.
> > > > It also reduces latency to resume the connector, especially for ones
> > that
> > > > have to do a lot of state gathering on initialization to, e.g., read
> > > > offsets from an external system.
> > > >
> > > > 7. There should be no risk of mixed tasks after a downgrade, thanks
> to
> > > the
> > > > empty set of task configs that gets published to the config topic.
> Both
> > > > upgraded and downgraded workers will render an empty set of tasks for
> > the
> > > > connector, and keep that set of empty tasks until the connector is
> > > resumed.
> > > > Does that address your concerns?
> > > >
> > > > You're also correct that the linked Jira ticket was wrong; thanks for
> > > > pointing that out! Yes, KAFKA-4107 is the intended ticket, and I've
> > > updated
> > > > the link in the KIP accordingly.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > [1] - https://issues.apache.org/jira/browse/KAFKA-4107
> > > >
> > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya <yash.ma...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Chris,
> > > > >
> > > > > Thanks a lot for this KIP, I think something like this has been
> long
> > > > > overdue for Kafka Connect :)
> > > > >
> > > > > Some thoughts and questions that I had -
> > > > >
> > > > > 1. I'm wondering if you could elaborate a little more on the use
> case
> > > for
> > > > > the `DELETE /connectors/{connector}/offsets` API. I think we can
> all
> > > > agree
> > > > > that a fine grained reset API that allows setting arbitrary offsets
> > for
> > > > > partitions would be quite useful (which you talk about in the
> Future
> > > work
> > > > > section). But for the `DELETE /connectors/{connector}/offsets` API
> in
> > > its
> > > > > described form, it looks like it would only serve a seemingly niche
> > use
> > > > > case where users want to avoid renaming connectors - because this
> new
> > > way
> > > > > of resetting offsets actually has more steps (i.e. stop the
> > connector,
> > > > > reset offsets via the API, resume the connector) than simply
> deleting
> > > and
> > > > > re-creating the connector with a different name?
> > > > >
> > > > > 2. The KIP talks about taking care that the response formats
> > > (presumably
> > > > > only talking about the new GET API here) are symmetrical for both
> > > source
> > > > > and sink connectors - is the end goal to have users of Kafka
> Connect
> > > not
> > > > > even be aware that sink connectors use Kafka consumers under the
> hood
> > > > (i.e.
> > > > > have that as purely an implementation detail abstracted away from
> > > users)?
> > > > > While I understand the value of uniformity here, the response
> format
> > > for
> > > > > sink connectors currently looks a little odd with the "partition"
> > field
> > > > > having "topic" and "partition" as sub-fields, especially to users
> > > > familiar
> > > > > with Kafka semantics. Thoughts?
> > > > >
> > > > > 3. Another little nitpick on the response format - why do we need
> > > > "source"
> > > > > / "sink" as a field under "offsets"? Users can query the connector
> > type
> > > > via
> > > > > the existing `GET /connectors` API. If it's deemed important to let
> > > users
> > > > > know that the offsets they're seeing correspond to a source / sink
> > > > > connector, maybe we could have a top level field "type" in the
> > response
> > > > for
> > > > > the `GET /connectors/{connector}/offsets` API similar to the `GET
> > > > > /connectors` API?
> > > > >
> > > > > 4. For the `DELETE /connectors/{connector}/offsets` API, the KIP
> > > mentions
> > > > > that requests will be rejected if a rebalance is pending -
> presumably
> > > > this
> > > > > is to avoid forwarding requests to a leader which may no longer be
> > the
> > > > > leader after the pending rebalance? In this case, the API will
> > return a
> > > > > `409 Conflict` response similar to some of the existing APIs,
> right?
> > > > >
> > > > > 5. Regarding fencing out previously running tasks for a connector,
> do
> > > you
> > > > > think it would make more sense semantically to have this
> implemented
> > in
> > > > the
> > > > > stop endpoint where an empty set of tasks is generated, rather than
> > the
> > > > > delete offsets endpoint? This would also give the new `STOPPED`
> > state a
> > > > > higher confidence of sorts, with any zombie tasks being fenced off
> > from
> > > > > continuing to produce data.
> > > > >
> > > > > 6. Thanks for outlining the issues with the current state of the
> > > `PAUSED`
> > > > > state - I think a lot of users expect it to behave like the
> `STOPPED`
> > > > state
> > > > > you outline in the KIP and are (unpleasantly) surprised when it
> > > doesn't.
> > > > > However, this does beg the question of what the usefulness of
> having
> > > two
> > > > > separate `PAUSED` and `STOPPED` states is? Do we want to continue
> > > > > supporting both these states in the future, or do you see the
> > `STOPPED`
> > > > > state eventually causing the existing `PAUSED` state to be
> > deprecated?
> > > > >
> > > > > 7. I think the idea outlined in the KIP for handling a new state
> > during
> > > > > cluster downgrades / rolling upgrades is quite clever, but do you
> > think
> > > > > there could be any issues with having a mix of "paused" and
> "stopped"
> > > > tasks
> > > > > for the same connector across workers in a cluster? At the very
> > least,
> > > I
> > > > > think it would be fairly confusing to most users. I'm wondering if
> > this
> > > > can
> > > > > be avoided by stating clearly in the KIP that the new `PUT
> > > > > /connectors/{connector}/stop`
> > > > > can only be used on a cluster that is fully upgraded to an AK
> version
> > > > newer
> > > > > than the one which ends up containing changes from this KIP and
> that
> > > if a
> > > > > cluster needs to be downgraded to an older version, the user should
> > > > ensure
> > > > > that none of the connectors on the cluster are in a stopped state?
> > With
> > > > the
> > > > > existing implementation, it looks like an unknown/invalid target
> > state
> > > > > record is basically just discarded (with an error message logged),
> so
> > > it
> > > > > doesn't seem to be a disastrous failure scenario that can bring
> down
> > a
> > > > > worker.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Yash
> > > > >
> > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton
> > <chr...@aiven.io.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Ashwin,
> > > > > >
> > > > > > Thanks for your thoughts. Regarding your questions:
> > > > > >
> > > > > > 1. The response would show the offsets that are visible to the
> > source
> > > > > > connector, so it would combine the contents of the two topics,
> > giving
> > > > > > priority to offsets present in the connector-specific topic. I'm
> > > > > imagining
> > > > > > a follow-up question that some people may have in response to
> that
> > is
> > > > > > whether we'd want to provide insight into the contents of a
> single
> > > > topic
> > > > > at
> > > > > > a time. It may be useful to be able to see this information in
> > order
> > > to
> > > > > > debug connector issues or verify that it's safe to stop using a
> > > > > > connector-specific offsets topic (either explicitly, or
> implicitly
> > > via
> > > > > > cluster downgrade). What do you think about adding a URL query
> > > > parameter
> > > > > > that allows users to dictate which view of the connector's
> offsets
> > > they
> > > > > are
> > > > > > given in the REST response, with options for the worker's global
> > > topic,
> > > > > the
> > > > > > connector-specific topic, and the combined view of them that the
> > > > > connector
> > > > > > and its tasks see (which would be the default)? This may be too
> > much
> > > > for
> > > > > V1
> > > > > > but it feels like it's at least worth exploring a bit.
> > > > > >
> > > > > > 2. There is no option for this at the moment. Reset semantics are
> > > > > extremely
> > > > > > coarse-grained; for source connectors, we delete all source
> > offsets,
> > > > and
> > > > > > for sink connectors, we delete the entire consumer group. I'm
> > hoping
> > > > this
> > > > > > will be enough for V1 and that, if there's sufficient demand for
> > it,
> > > we
> > > > > can
> > > > > > introduce a richer API for resetting or even modifying connector
> > > > offsets
> > > > > in
> > > > > > a follow-up KIP.
> > > > > >
> > > > > > 3. Good eye :) I think it's fine to keep the existing behavior
> for
> > > the
> > > > > > PAUSED state with the Connector instance, since the primary
> purpose
> > > of
> > > > > the
> > > > > > Connector is to generate task configs and monitor the external
> > system
> > > > for
> > > > > > changes. If there's no chance for tasks to be running anyways, I
> > > don't
> > > > > see
> > > > > > much value in allowing paused connectors to generate new task
> > > configs,
> > > > > > especially since each time that happens a rebalance is triggered
> > and
> > > > > > there's a non-zero cost to that. What do you think?
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Chris
> > > > > >
> > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin
> > <apan...@confluent.io.invalid
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks for KIP Chris - I think this is a useful feature.
> > > > > > >
> > > > > > > Can you please elaborate on the following in the KIP -
> > > > > > >
> > > > > > > 1. How would the response of GET
> /connectors/{connector}/offsets
> > > look
> > > > > > like
> > > > > > > if the worker has both global and connector specific offsets
> > topic
> > > ?
> > > > > > >
> > > > > > > 2. How can we pass the reset options like shift-by ,
> to-date-time
> > > > etc.
> > > > > > > using a REST API like DELETE /connectors/{connector}/offsets ?
> > > > > > >
> > > > > > > 3. Today PAUSE operation on a connector invokes its stop
> method -
> > > > will
> > > > > > > there be a change here to reduce confusion with the new
> proposed
> > > > > STOPPED
> > > > > > > state ?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Ashwin
> > > > > > >
> > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton
> > > > <chr...@aiven.io.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I noticed a fairly large gap in the first version of this KIP
> > > that
> > > > I
> > > > > > > > published last Friday, which has to do with accommodating
> > > > connectors
> > > > > > > > that target different Kafka clusters than the one that the
> > Kafka
> > > > > > Connect
> > > > > > > > cluster uses for its internal topics and source connectors
> with
> > > > > > dedicated
> > > > > > > > offsets topics. I've since updated the KIP to address this
> gap,
> > > > which
> > > > > > has
> > > > > > > > substantially altered the design. Wanted to give a heads-up
> to
> > > > anyone
> > > > > > > > that's already started reviewing.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Chris
> > > > > > > >
> > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton <
> chr...@aiven.io>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I'd like to begin discussion on a KIP to add offsets
> support
> > to
> > > > the
> > > > > > > Kafka
> > > > > > > > > Connect REST API:
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > >
> > > > > > > > > Chris
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
> On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
> > Hi Yash,
> >
> > Thanks again for your thoughts! Responses to ongoing discussions inline
> > (easier to track context than referencing comment numbers):
> >
> > > However, this then leads me to wonder if we can make that explicit by
> > including "connect" or "connector" in the higher level field names? Or do
> > you think this isn't required given that we're talking about a Connect
> > specific REST API in the first place?
> >
> > I think "partition" and "offset" are fine as field names but I'm not
> hugely
> > opposed to adding "connector " as a prefix to them; would be interested
> in
> > others' thoughts.
> >
> > > I'm not sure I followed why the unresolved writes to the config topic
> > would be an issue - wouldn't the delete offsets request be added to the
> > herder's request queue and whenever it is processed, we'd anyway need to
> > check if all the prerequisites for the request are satisfied?
> >
> > Some requests are handled in multiple steps. For example, deleting a
> > connector (1) adds a request to the herder queue to write a tombstone to
> > the config topic (or, if the worker isn't the leader, forward the request
> > to the leader). (2) Once that tombstone is picked up, (3) a rebalance
> > ensues, and then after it's finally complete, (4) the connector and its
> > tasks are shut down. I probably could have used better terminology, but
> > what I meant by "unresolved writes to the config topic" was a case in
> > between steps (2) and (3)--where the worker has already read that
> tombstone
> > from the config topic and knows that a rebalance is pending, but hasn't
> > begun participating in that rebalance yet. In the DistributedHerder
> class,
> > this is done via the `checkRebalanceNeeded` method.
> >
> > > We can probably revisit this potential deprecation [of the PAUSED
> state]
> > in the future based on user feedback and how the adoption of the new
> > proposed stop endpoint looks like, what do you think?
> >
> > Yeah, revisiting in the future seems reasonable. 👍
> >
> > And responses to new comments here:
> >
> > 8. Yep, we'll start tracking offsets by connector. I don't believe this
> > should be too difficult, and suspect that the only reason we track raw
> byte
> > arrays instead of pre-deserializing offset topic information into
> something
> > more useful is because Connect originally had pluggable internal
> > converters. Now that we're hardcoded to use the JSON converter it should
> be
> > fine to track offsets on a per-connector basis as they're read from the
> > offsets topic.
> >
> > 9. I'm hesitant to introduce this type of feature right now because of
> all
> > of the gotchas that would come with it. In security-conscious
> environments,
> > it's possible that a sink connector's principal may have access to the
> > consumer group used by the connector, but the worker's principal may not.
> > There's also the case where source connectors have separate offsets
> topics,
> > or sink connectors have overridden consumer group IDs, or sink or source
> > connectors work against a different Kafka cluster than the one that their
> > worker uses. Overall, I'd rather provide a single API that works in all
> > cases rather than risk confusing and alienating users by trying to make
> > their lives easier in a subset of cases.
> >
> > 10. Hmm... I don't think the order of the writes matters too much here,
> but
> > we probably could start by deleting from the global topic first, that's
> > true. The reason I'm not hugely concerned about this case is that if
> > something goes wrong while resetting offsets, there's no immediate
> > impact--the connector will still be in the STOPPED state. The REST
> response
> > for requests to reset the offsets will clearly call out that the
> operation
> > has failed, and if necessary, we can probably also add a scary-looking
> > warning message stating that we can't guarantee which offsets have been
> > successfully wiped and which haven't. Users can query the exact offsets
> of
> > the connector at this point to determine what will happen if/what they
> > resume it. And they can repeat attempts to reset the offsets as many
> times
> > as they'd like until they get back a 2XX response, indicating that it's
> > finally safe to resume the connector. Thoughts?
> >
> > 11. I haven't thought too much about it. I think something like the
> > Monitorable* connectors would probably serve our needs here; we can
> > instantiate them on a running Connect cluster and then use various
> handles
> > to know how many times they've been polled, committed records, etc. If
> > necessary we can tweak those classes or even write our own. But anyways,
> > once that's all done, the test will be something like "create a
> connector,
> > wait for it to produce N records (each of which contains some kind of
> > predictable offset), and ensure that the offsets for it in the REST API
> > match up with the ones we'd expect from N records". Does that answer your
> > question?
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya <yash.ma...@gmail.com> wrote:
> >
> > > Hi Chris,
> > >
> > > 1. Thanks a lot for elaborating on this, I'm now convinced about the
> > > usefulness of the new offset reset endpoint. Regarding the follow-up
> KIP
> > > for a fine-grained offset write API, I'd be happy to take that on once
> > this
> > > KIP is finalized and I will definitely look forward to your feedback on
> > > that one!
> > >
> > > 2. Gotcha, the motivation makes more sense to me now. So the higher
> level
> > > partition field represents a Connect specific "logical partition" of
> > sorts
> > > - i.e. the source partition as defined by a connector for source
> > connectors
> > > and a Kafka topic + partition for sink connectors. I like the idea of
> > > adding a Kafka prefix to the lower level partition/offset (and topic)
> > > fields which basically makes it more clear (although implicitly) that
> the
> > > higher level partition/offset field is Connect specific and not the
> same
> > as
> > > what those terms represent in Kafka itself. However, this then leads me
> > to
> > > wonder if we can make that explicit by including "connect" or
> "connector"
> > > in the higher level field names? Or do you think this isn't required
> > given
> > > that we're talking about a Connect specific REST API in the first
> place?
> > >
> > > 3. Thanks, I think the response structure definitely looks better now!
> > >
> > > 4. Interesting, I'd be curious to learn why we might want to change
> this
> > in
> > > the future but that's probably out of scope for this discussion. I'm
> not
> > > sure I followed why the unresolved writes to the config topic would be
> an
> > > issue - wouldn't the delete offsets request be added to the herder's
> > > request queue and whenever it is processed, we'd anyway need to check
> if
> > > all the prerequisites for the request are satisfied?
> > >
> > > 5. Thanks for elaborating that just fencing out the producer still
> leaves
> > > many cases where source tasks remain hanging around and also that we
> > anyway
> > > can't have similar data production guarantees for sink connectors right
> > > now. I agree that it might be better to go with ease of implementation
> > and
> > > consistency for now.
> > >
> > > 6. Right, that does make sense but I still feel like the two states
> will
> > > end up being confusing to end users who might not be able to discern
> the
> > > (fairly low-level) differences between them (also the nuances of state
> > > transitions like STOPPED -> PAUSED or PAUSED -> STOPPED with the
> > > rebalancing implications as well). We can probably revisit this
> potential
> > > deprecation in the future based on user feedback and how the adoption
> of
> > > the new proposed stop endpoint looks like, what do you think?
> > >
> > > 7. Aha, that is completely my bad, I missed that the v1/v2 state is
> only
> > > applicable to the connector's target state and that we don't need to
> > worry
> > > about the tasks since we will have an empty set of tasks. I think I
> was a
> > > little confused by "pause the parts of the connector that they are
> > > assigned" from the KIP. Thanks for clarifying that!
> > >
> > >
> > > Some more thoughts and questions that I had -
> > >
> > > 8. Could you elaborate on what the implementation for offset reset for
> > > source connectors would look like? Currently, it doesn't look like we
> > track
> > > all the partitions for a source connector anywhere. Will we need to
> > > book-keep this somewhere in order to be able to emit a tombstone record
> > for
> > > each source partition?
> > >
> > > 9. The KIP describes the offset reset endpoint as only being usable on
> > > existing connectors that are in a `STOPPED` state. Why wouldn't we want
> > to
> > > allow resetting offsets for a deleted connector which seems to be a
> valid
> > > use case? Or do we plan to handle this use case only via the item
> > outlined
> > > in the future work section - "Automatically delete offsets with
> > > connectors"?
> > >
> > > 10. The KIP mentions that source offsets will be reset transactionally
> > for
> > > each topic (worker global offset topic and connector specific offset
> > topic
> > > if it exists). While it obviously isn't possible to atomically do the
> > > writes to two topics which may be on different Kafka clusters, I'm
> > > wondering about what would happen if the first transaction succeeds but
> > the
> > > second one fails. I think the order of the two transactions matters
> here
> > -
> > > if we successfully emit tombstones to the connector specific offset
> topic
> > > and fail to do so for the worker global offset topic, we'll presumably
> > fail
> > > the offset delete request because the KIP mentions that "A request to
> > reset
> > > offsets for a source connector will only be considered successful if
> the
> > > worker is able to delete all known offsets for that connector, on both
> > the
> > > worker's global offsets topic and (if one is used) the connector's
> > > dedicated offsets topic.". However, this will lead to the connector
> only
> > > being able to read potentially older offsets from the worker global
> > offset
> > > topic on resumption (based on the combined offset view presented as
> > > described in KIP-618 [1]). So, I think we should make sure that the
> > worker
> > > global offset topic tombstoning is attempted first, right? Note that in
> > the
> > > current implementation of `ConnectorOffsetBackingStore::set`, the
> > primary /
> > > connector specific offset store is written to first.
> > >
> > > 11. This probably isn't necessary to elaborate on in the KIP itself,
> but
> > I
> > > was wondering what the second offset test - "verify that that those
> > offsets
> > > reflect an expected level of progress for each connector (i.e., they
> are
> > > greater than or equal to a certain value depending on how the
> connectors
> > > are configured and how long they have been running)" - would look like?
> > >
> > >
> > > Thanks,
> > > Yash
> > >
> > > [1] -
> > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration
> > >
> > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton <chr...@aiven.io.invalid
> >
> > > wrote:
> > >
> > > > Hi Yash,
> > > >
> > > > Thanks for your detailed thoughts.
> > > >
> > > > 1. In KAFKA-4107 [1], the primary request is exactly what's proposed
> in
> > > the
> > > > KIP right now: a way to reset offsets for connectors. Sure, there's
> an
> > > > extra step of stopping the connector, but renaming a connector isn't
> as
> > > > convenient of an alternative as it may seem since in many cases you'd
> > > also
> > > > want to delete the older one, so the complete sequence of steps would
> > be
> > > > something like delete the old connector, rename it (possibly
> requiring
> > > > modifications to its config file, depending on which API is used),
> then
> > > > create the renamed variant. It's also just not a great user
> > > > experience--even if the practical impacts are limited (which, IMO,
> they
> > > are
> > > > not), people have been asking for years about why they have to employ
> > > this
> > > > kind of a workaround for a fairly common use case, and we don't
> really
> > > have
> > > > a good answer beyond "we haven't implemented something better yet".
> On
> > > top
> > > > of that, you may have external tooling that needs to be tweaked to
> > > handle a
> > > > new connector name, you may have strict authorization policies around
> > who
> > > > can access what connectors, you may have other ACLs attached to the
> > name
> > > of
> > > > the connector (which can be especially common in the case of sink
> > > > connectors, whose consumer group IDs are tied to their names by
> > default),
> > > > and leaving around state in the offsets topic that can never be
> cleaned
> > > up
> > > > presents a bit of a footgun for users. It may not be a silver bullet,
> > but
> > > > providing some mechanism to reset that state is a step in the right
> > > > direction and allows responsible users to more carefully administer
> > their
> > > > cluster without resorting to non-public APIs. That said, I do agree
> > that
> > > a
> > > > fine-grained reset/overwrite API would be useful, and I'd be happy to
> > > > review a KIP to add that feature if anyone wants to tackle it!
> > > >
> > > > 2. Keeping the two formats symmetrical is motivated mostly by
> > aesthetics
> > > > and quality-of-life for programmatic interaction with the API; it's
> not
> > > > really a goal to hide the use of consumer groups from users. I do
> agree
> > > > that the format is a little strange-looking for sink connectors, but
> it
> > > > seemed like it would be easier to work with for UIs, casual jq
> queries,
> > > and
> > > > CLIs than a more Kafka-specific alternative such as
> > > {"<topic>-<partition>":
> > > > "<offset>"}, and although it is a little strange, I don't think it's
> > any
> > > > less readable or intuitive. That said, I've made some tweaks to the
> > > format
> > > > that should make programmatic access even easier; specifically, I've
> > > > removed the "source" and "sink" wrapper fields and instead moved them
> > > into
> > > > a top-level object with a "type" and "offsets" field, just like you
> > > > suggested in point 3 (thanks!). We might also consider changing the
> > field
> > > > names for sink offsets from "topic", "partition", and "offset" to
> > "Kafka
> > > > topic", "Kafka partition", and "Kafka offset" respectively, to reduce
> > the
> > > > stuttering effect of having a "partition" field inside a "partition"
> > > field
> > > > and the same with an "offset" field; thoughts? One final point--by
> > > equating
> > > > source and sink offsets, we probably make it easier for users to
> > > understand
> > > > exactly what a source offset is; anyone who's familiar with consumer
> > > > offsets can see from the response format that we identify a logical
> > > > partition as a combination of two entities (a topic and a partition
> > > > number); it should make it easier to grok what a source offset is by
> > > seeing
> > > > what the two formats have in common.
> > > >
> > > > 3. Great idea! Done.
> > > >
> > > > 4. Yes, I'm thinking right now that a 409 will be the response status
> > if
> > > a
> > > > rebalance is pending. I'd rather not add this to the KIP as we may
> want
> > > to
> > > > change it at some point and it doesn't seem vital to establish it as
> > part
> > > > of the public contract for the new endpoint right now. Also, small
> > > > point--yes, a 409 is useful to avoid forwarding requests to an
> > incorrect
> > > > leader, but it's also useful to ensure that there aren't any
> unresolved
> > > > writes to the config topic that might cause issues with the request
> > (such
> > > > as deleting the connector).
> > > >
> > > > 5. That's a good point--it may be misleading to call a connector
> > STOPPED
> > > > when it has zombie tasks lying around on the cluster. I don't think
> > it'd
> > > be
> > > > appropriate to do this synchronously while handling requests to the
> PUT
> > > > /connectors/{connector}/stop since we'd want to give all
> > > currently-running
> > > > tasks a chance to gracefully shut down, though. I'm also not sure
> that
> > > this
> > > > is a significant problem, either. If the connector is resumed, then
> all
> > > > zombie tasks will be automatically fenced out by their successors on
> > > > startup; if it's deleted, then we'll have wasted effort by performing
> > an
> > > > unnecessary round of fencing. It may be nice to guarantee that source
> > > task
> > > > resources will be deallocated after the connector transitions to
> > STOPPED,
> > > > but realistically, it doesn't do much to just fence out their
> > producers,
> > > > since tasks can be blocked on a number of other operations such as
> > > > key/value/header conversion, transformation, and task polling. It may
> > be
> > > a
> > > > little strange if data is produced to Kafka after the connector has
> > > > transitioned to STOPPED, but we can't provide the same guarantees for
> > > sink
> > > > connectors, since their tasks may be stuck on a long-running
> > > SinkTask::put
> > > > that emits data even after the Connect framework has abandoned them
> > after
> > > > exhausting their graceful shutdown timeout. Ultimately, I'd prefer to
> > err
> > > > on the side of consistency and ease of implementation for now, but I
> > may
> > > be
> > > > missing a case where a few extra records from a task that's slow to
> > shut
> > > > down may cause serious issues--let me know.
> > > >
> > > > 6. I'm hesitant to propose deprecation of the PAUSED state right now
> as
> > > it
> > > > does serve a few purposes. Leaving tasks idling-but-ready makes
> > resuming
> > > > them less disruptive across the cluster, since a rebalance isn't
> > > necessary.
> > > > It also reduces latency to resume the connector, especially for ones
> > that
> > > > have to do a lot of state gathering on initialization to, e.g., read
> > > > offsets from an external system.
> > > >
> > > > 7. There should be no risk of mixed tasks after a downgrade, thanks
> to
> > > the
> > > > empty set of task configs that gets published to the config topic.
> Both
> > > > upgraded and downgraded workers will render an empty set of tasks for
> > the
> > > > connector, and keep that set of empty tasks until the connector is
> > > resumed.
> > > > Does that address your concerns?
> > > >
> > > > You're also correct that the linked Jira ticket was wrong; thanks for
> > > > pointing that out! Yes, KAFKA-4107 is the intended ticket, and I've
> > > updated
> > > > the link in the KIP accordingly.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > [1] - https://issues.apache.org/jira/browse/KAFKA-4107
> > > >
> > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya <yash.ma...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Chris,
> > > > >
> > > > > Thanks a lot for this KIP, I think something like this has been
> long
> > > > > overdue for Kafka Connect :)
> > > > >
> > > > > Some thoughts and questions that I had -
> > > > >
> > > > > 1. I'm wondering if you could elaborate a little more on the use
> case
> > > for
> > > > > the `DELETE /connectors/{connector}/offsets` API. I think we can
> all
> > > > agree
> > > > > that a fine grained reset API that allows setting arbitrary offsets
> > for
> > > > > partitions would be quite useful (which you talk about in the
> Future
> > > work
> > > > > section). But for the `DELETE /connectors/{connector}/offsets` API
> in
> > > its
> > > > > described form, it looks like it would only serve a seemingly niche
> > use
> > > > > case where users want to avoid renaming connectors - because this
> new
> > > way
> > > > > of resetting offsets actually has more steps (i.e. stop the
> > connector,
> > > > > reset offsets via the API, resume the connector) than simply
> deleting
> > > and
> > > > > re-creating the connector with a different name?
> > > > >
> > > > > 2. The KIP talks about taking care that the response formats
> > > (presumably
> > > > > only talking about the new GET API here) are symmetrical for both
> > > source
> > > > > and sink connectors - is the end goal to have users of Kafka
> Connect
> > > not
> > > > > even be aware that sink connectors use Kafka consumers under the
> hood
> > > > (i.e.
> > > > > have that as purely an implementation detail abstracted away from
> > > users)?
> > > > > While I understand the value of uniformity here, the response
> format
> > > for
> > > > > sink connectors currently looks a little odd with the "partition"
> > field
> > > > > having "topic" and "partition" as sub-fields, especially to users
> > > > familiar
> > > > > with Kafka semantics. Thoughts?
> > > > >
> > > > > 3. Another little nitpick on the response format - why do we need
> > > > "source"
> > > > > / "sink" as a field under "offsets"? Users can query the connector
> > type
> > > > via
> > > > > the existing `GET /connectors` API. If it's deemed important to let
> > > users
> > > > > know that the offsets they're seeing correspond to a source / sink
> > > > > connector, maybe we could have a top level field "type" in the
> > response
> > > > for
> > > > > the `GET /connectors/{connector}/offsets` API similar to the `GET
> > > > > /connectors` API?
> > > > >
> > > > > 4. For the `DELETE /connectors/{connector}/offsets` API, the KIP
> > > mentions
> > > > > that requests will be rejected if a rebalance is pending -
> presumably
> > > > this
> > > > > is to avoid forwarding requests to a leader which may no longer be
> > the
> > > > > leader after the pending rebalance? In this case, the API will
> > return a
> > > > > `409 Conflict` response similar to some of the existing APIs,
> right?
> > > > >
> > > > > 5. Regarding fencing out previously running tasks for a connector,
> do
> > > you
> > > > > think it would make more sense semantically to have this
> implemented
> > in
> > > > the
> > > > > stop endpoint where an empty set of tasks is generated, rather than
> > the
> > > > > delete offsets endpoint? This would also give the new `STOPPED`
> > state a
> > > > > higher confidence of sorts, with any zombie tasks being fenced off
> > from
> > > > > continuing to produce data.
> > > > >
> > > > > 6. Thanks for outlining the issues with the current state of the
> > > `PAUSED`
> > > > > state - I think a lot of users expect it to behave like the
> `STOPPED`
> > > > state
> > > > > you outline in the KIP and are (unpleasantly) surprised when it
> > > doesn't.
> > > > > However, this does beg the question of what the usefulness of
> having
> > > two
> > > > > separate `PAUSED` and `STOPPED` states is? Do we want to continue
> > > > > supporting both these states in the future, or do you see the
> > `STOPPED`
> > > > > state eventually causing the existing `PAUSED` state to be
> > deprecated?
> > > > >
> > > > > 7. I think the idea outlined in the KIP for handling a new state
> > during
> > > > > cluster downgrades / rolling upgrades is quite clever, but do you
> > think
> > > > > there could be any issues with having a mix of "paused" and
> "stopped"
> > > > tasks
> > > > > for the same connector across workers in a cluster? At the very
> > least,
> > > I
> > > > > think it would be fairly confusing to most users. I'm wondering if
> > this
> > > > can
> > > > > be avoided by stating clearly in the KIP that the new `PUT
> > > > > /connectors/{connector}/stop`
> > > > > can only be used on a cluster that is fully upgraded to an AK
> version
> > > > newer
> > > > > than the one which ends up containing changes from this KIP and
> that
> > > if a
> > > > > cluster needs to be downgraded to an older version, the user should
> > > > ensure
> > > > > that none of the connectors on the cluster are in a stopped state?
> > With
> > > > the
> > > > > existing implementation, it looks like an unknown/invalid target
> > state
> > > > > record is basically just discarded (with an error message logged),
> so
> > > it
> > > > > doesn't seem to be a disastrous failure scenario that can bring
> down
> > a
> > > > > worker.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Yash
> > > > >
> > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton
> > <chr...@aiven.io.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Ashwin,
> > > > > >
> > > > > > Thanks for your thoughts. Regarding your questions:
> > > > > >
> > > > > > 1. The response would show the offsets that are visible to the
> > source
> > > > > > connector, so it would combine the contents of the two topics,
> > giving
> > > > > > priority to offsets present in the connector-specific topic. I'm
> > > > > imagining
> > > > > > a follow-up question that some people may have in response to
> that
> > is
> > > > > > whether we'd want to provide insight into the contents of a
> single
> > > > topic
> > > > > at
> > > > > > a time. It may be useful to be able to see this information in
> > order
> > > to
> > > > > > debug connector issues or verify that it's safe to stop using a
> > > > > > connector-specific offsets topic (either explicitly, or
> implicitly
> > > via
> > > > > > cluster downgrade). What do you think about adding a URL query
> > > > parameter
> > > > > > that allows users to dictate which view of the connector's
> offsets
> > > they
> > > > > are
> > > > > > given in the REST response, with options for the worker's global
> > > topic,
> > > > > the
> > > > > > connector-specific topic, and the combined view of them that the
> > > > > connector
> > > > > > and its tasks see (which would be the default)? This may be too
> > much
> > > > for
> > > > > V1
> > > > > > but it feels like it's at least worth exploring a bit.
> > > > > >
> > > > > > 2. There is no option for this at the moment. Reset semantics are
> > > > > extremely
> > > > > > coarse-grained; for source connectors, we delete all source
> > offsets,
> > > > and
> > > > > > for sink connectors, we delete the entire consumer group. I'm
> > hoping
> > > > this
> > > > > > will be enough for V1 and that, if there's sufficient demand for
> > it,
> > > we
> > > > > can
> > > > > > introduce a richer API for resetting or even modifying connector
> > > > offsets
> > > > > in
> > > > > > a follow-up KIP.
> > > > > >
> > > > > > 3. Good eye :) I think it's fine to keep the existing behavior
> for
> > > the
> > > > > > PAUSED state with the Connector instance, since the primary
> purpose
> > > of
> > > > > the
> > > > > > Connector is to generate task configs and monitor the external
> > system
> > > > for
> > > > > > changes. If there's no chance for tasks to be running anyways, I
> > > don't
> > > > > see
> > > > > > much value in allowing paused connectors to generate new task
> > > configs,
> > > > > > especially since each time that happens a rebalance is triggered
> > and
> > > > > > there's a non-zero cost to that. What do you think?
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Chris
> > > > > >
> > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin
> > <apan...@confluent.io.invalid
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks for KIP Chris - I think this is a useful feature.
> > > > > > >
> > > > > > > Can you please elaborate on the following in the KIP -
> > > > > > >
> > > > > > > 1. How would the response of GET
> /connectors/{connector}/offsets
> > > look
> > > > > > like
> > > > > > > if the worker has both global and connector specific offsets
> > topic
> > > ?
> > > > > > >
> > > > > > > 2. How can we pass the reset options like shift-by ,
> to-date-time
> > > > etc.
> > > > > > > using a REST API like DELETE /connectors/{connector}/offsets ?
> > > > > > >
> > > > > > > 3. Today PAUSE operation on a connector invokes its stop
> method -
> > > > will
> > > > > > > there be a change here to reduce confusion with the new
> proposed
> > > > > STOPPED
> > > > > > > state ?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Ashwin
> > > > > > >
> > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton
> > > > <chr...@aiven.io.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I noticed a fairly large gap in the first version of this KIP
> > > that
> > > > I
> > > > > > > > published last Friday, which has to do with accommodating
> > > > connectors
> > > > > > > > that target different Kafka clusters than the one that the
> > Kafka
> > > > > > Connect
> > > > > > > > cluster uses for its internal topics and source connectors
> with
> > > > > > dedicated
> > > > > > > > offsets topics. I've since updated the KIP to address this
> gap,
> > > > which
> > > > > > has
> > > > > > > > substantially altered the design. Wanted to give a heads-up
> to
> > > > anyone
> > > > > > > > that's already started reviewing.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Chris
> > > > > > > >
> > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton <
> chr...@aiven.io>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I'd like to begin discussion on a KIP to add offsets
> support
> > to
> > > > the
> > > > > > > Kafka
> > > > > > > > > Connect REST API:
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > >
> > > > > > > > > Chris
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to