Hey Yash,

Thanks for your comments.

1) Hmm the question is how do you qualify a partition as stale or old?
Let's say a connector has implemented updateOffsets and for a certain
partition for which no records are received then it will update it's
offsets. So technically that offset can't be termed as stale anymore. Even
though I can't think of a side effect at this point to disallow offset
deletion via this method, my opinion is to use a proper mechanism like the
ones introduced in KIP-875 to delete offsets. Moreover, if I also consider
the option presented in point #2 , for simplicity sake it seems better to
not add this feature at this point. If we feel it's really needed and users
are requesting it, we can add support for it later on.

2) I get the point now. I can't think of cases where updating offsets would
be needed. As with point #1, we can always add it back if needed later on.
For now, I have removed that part from the KIP.

3) Yes, because the offset commit happens on a different thread, ordering
guarantees might be harder to ensure if we do it from the other thread. The
current mechanism proposed, even though gets invoked multiple times, keeps
things simpler to reason about.

Let me know how things look now. If it's all looking ok, I would go ahead
and create a Vote thread for the same.

Thanks!
Sagar.

On Tue, Jul 25, 2023 at 5:15 PM Yash Mayya <yash.ma...@gmail.com> wrote:

> Hi Sagar,
>
> Thanks for the updates. I had a few more follow up questions:
>
> > I have added that a better way of doing that would be
> > via KIP-875. Also, I didn't want to include any mechamisms
> > for users to meddle with the offsets topic. Allowing tombstone
> > records via this method would be akin to publishing tombstone
> > records directly to the offsets topic which is not recommended
> > generally.
>
> KIP-875 would allow a way for cluster administrators and / or users to do
> so manually externally whereas allowing tombstones in
> SourceTask::updateOffsets would enable connectors to clean up offsets for
> old / stale partitions without user intervention right? I'm not sure I
> follow what you mean by "I didn't want to include any mechamisms for users
> to meddle with the offsets topic" here? Furthermore, I'm not sure why
> publishing tombstone records directly to the offsets topic would not be
> recommended? Isn't that currently the only way to manually clean up offsets
> for a source connector?
>
> > It could be useful in a scenario where the offset of a partition
> > doesn't update for some period of time. In such cases, the
> > connector can do some kind of state tracking and update the
> > offsets after the time period elapses.
>
> I'm not sure I follow? In this case, won't the offsets argument passed
> to SourceTask::updateOffsets *not *contain the source partition which
> hasn't had an update for a long period of time? Wouldn't it make more sense
> to reduce the surface of the API as Chris suggested and only allow adding
> new partition offset pairs to the about to be committed offsets (since
> there don't seem to be any use cases outlined for allowing connectors to
> update offsets for source partitions that are already about to have an
> offset be committed for)?
>
> > All the records returned by the previous poll invocation
> >  got processed successfully
>
> Thanks for this clarification in the KIP, it looks like it does address the
> offsets ordering issue. As to Chris' point about invoking
> SourceTask::updateOffsets less frequently by calling it before offsets are
> committed rather than in every poll loop iteration - I guess that would
> make it a lot more tricky to address the ordering issue?
>
>
> Thanks,
> Yash
>
> On Thu, Jul 20, 2023 at 9:50 PM Sagar <sagarmeansoc...@gmail.com> wrote:
>
> > Hey All,
> >
> > Please let me know how the KIP looks now. Is it at a stage where I can
> > start with the Voting phase? Of course I am still open to
> > feedback/suggestions but planning to start the Vote for it.
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Jul 11, 2023 at 10:00 PM Sagar <sagarmeansoc...@gmail.com>
> wrote:
> >
> > > Hi Yash/Chris,
> > >
> > > Thanks for the feedback! I have updated the KIP with the suggestions
> > > provided. I would also update the PR with the suggestions.
> > >
> > > Also, I was hoping that this could make it to the 3.6 release given
> that
> > > it would benefit source connectors which have some of the problems
> listed
> > > in the Motivation Section.
> > >
> > > Responses Inline:
> > >
> > > Yash:
> > >
> > > 1) In the proposed changes section where you talk about modifying the
> > >> offsets, could you please clarify that tasks shouldn't modify the
> > offsets
> > >> map that is passed as an argument? Currently, the distinction between
> > the
> > >> offsets map passed as an argument and the offsets map that is returned
> > is
> > >> not very clear in numerous places.
> > >
> > >
> > >
> > > Added
> > >
> > > 2) The default return value of Optional.empty() seems to be fairly
> > >> non-intuitive considering that the return value is supposed to be the
> > >> offsets that are to be committed. Can we consider simply returning the
> > >> offsets argument itself by default instead?
> > >
> > >
> > >
> > > Chris is suggesting returning null for the default case. I am thinking
> to
> > > make null
> > > as the default return type. If the returned map is null, there won't be
> > > any further
> > > processing otherwise we will contonue with the existing logic.
> > >
> > > 3) The KIP states that "It is also possible that a task might choose to
> > >> send a tombstone record as an offset. This is not recommended and to
> > >> prevent connectors shooting themselves in the foot due to this" -
> could
> > >> you
> > >> please clarify why this is not recommended / supported?
> > >
> > >
> > >
> > > I have added that a better way of doing that would be via KIP-875.
> Also,
> > I
> > > didn't want to include
> > > any mechamisms for users to meddle with the offsets topic. Allowing
> > > tombstone records via this method
> > > would be akin to publishing tombstone records directly to the offsets
> > > topic which is not recommended
> > > generally.
> > >
> > > 4) The KIP states that "If a task returns an Optional of a null object
> or
> > >> an Optional of an empty map, even for such cases the behaviour would
> > would
> > >> be disabled." - since this is an optional API that source task
> > >> implementations don't necessarily need to implement, I don't think I
> > fully
> > >> follow why the return type of the proposed "updateOffsets" method is
> an
> > >> Optional? Can we not simply use the Map as the return type instead?
> > >
> > >
> > >
> > > Yeah, I updated the return type to be a Map.
> > >
> > >
> > > 5) The KIP states that "The offsets passed to the updateOffsets  method
> > >> would be the offset from the latest source record amongst all source
> > >> records per partition. This way, if the source offset for a given
> source
> > >> partition is updated, that offset is the one that gets committed for
> the
> > >> source partition." - we should clarify that the "latest" offset refers
> > to
> > >> the offsets that are about to be committed, and not the latest offsets
> > >> returned from SourceTask::poll so far (see related discussion in
> > >> https://issues.apache.org/jira/browse/KAFKA-15091 and
> > >> https://issues.apache.org/jira/browse/KAFKA-5716).
> > >
> > >
> > >
> > > Done
> > >
> > >
> > > 6) We haven't used the terminology of "Atleast Once Semantics"
> elsewhere
> > in
> > >> Connect since the framework itself does not (and cannot) make any
> > >> guarantees on the delivery semantics. Depending on the source
> connector
> > >> and
> > >> the source system, both at-least once and at-most once semantics (for
> > >> example - a source system where reads are destructive) are possible.
> We
> > >> should avoid introducing this terminology in the KIP and instead refer
> > to
> > >> this scenario as exactly-once support being disabled.
> > >
> > >
> > >
> > > Done
> > >
> > >
> > > 7) Similar to the above point, we should remove the use of the term
> > >> "Exactly Once Semantics" and instead refer to exactly-once support
> being
> > >> enabled since the framework can't guarantee exactly-once semantics for
> > all
> > >> possible source connectors (for example - a message queue source
> > connector
> > >> where offsets are essentially managed in the source system via an ack
> > >> mechanism).
> > >
> > >
> > > Done
> > >
> > > 8) In a previous attempt to fix this gap in functionality, a
> significant
> > >> concern was raised on offsets ordering guarantees when we retry
> sending
> > a
> > >> batch of records (ref -
> > >> https://github.com/apache/kafka/pull/5553/files#r213329307). It
> doesn't
> > >> look like this KIP addresses that concern either? In the case where
> > >> exactly-once support is disabled - if we update the committableOffsets
> > >> with
> > >> the offsets provided by the task through the new updateOffsets method,
> > >> these offsets could be committed before older "regular" offsets are
> > >> committed due to producer retries which could then lead to an
> > >> inconsistency
> > >> if the send operation eventually succeeds.
> > >
> > >
> > >
> > >
> > > Thanks for bringing this up. I went through the comment shared above.
> If
> > > you see the implementation
> > > that I have in the PR, in EOS-disabled case, updateOffsets is invoked
> > only
> > > when toSend is null. Refer
> > > here:
> > >
> >
> https://github.com/apache/kafka/pull/13899/files#diff-a3107b56382b6ec950dc9d19d21f188c21d4bf41853e0505d60d3bf87adab6a9R324-R330
> > >
> > >
> > > Which means that we invoke updateOffsets only when
> > > 1) Either the last poll invocation didn't return any records or
> > > 2) All the records returned by the previous poll invocation got
> processed
> > > successfully
> > > 3) First iteration of task because toSend would be null initially.
> > >
> > >
> > > IIUC the concern expressed in the link shared by you and the solution
> > > proposed there, it seems that's what is being proposed
> > >
> > >
> > >  What if your new block of code were only performed if sendRecords()
> > >> succeeded
> > >
> > >
> > >
> > >  Even for this there are concerns expressed but those don't seem to be
> > > related to offsets ordering guarantees. WDYT?
> > >
> > >
> > > 9) The KIP states that when exactly-once support is enabled, the new
> > >> SourceTask::updateOffsets method will be invoked only when an offset
> > flush
> > >> is attempted. If the connector is configured to use a connector
> > specified
> > >> transaction boundary rather than a poll or interval based boundary,
> > isn't
> > >> it possible that we don't call SourceTask::updateOffsets until there
> are
> > >> actual records that are also being returned through poll (which would
> > >> defeat the primary motivation of the KIP)? Or are we making the
> > assumption
> > >> that the connector defined transaction boundary should handle this
> case
> > >> appropriately if needed (i.e. source tasks should occasionally request
> > for
> > >> a transaction commit via their transaction context if they want
> offsets
> > to
> > >> be committed without producing records)? If so, I think we should
> > >> explicitly call that out in the KIP.
> > >
> > >
> > >
> > > That's a great point. I didn't consider this case. I have updated the
> > KIP.
> > >
> > > 10) The Javadoc for SourceTask::updateOffsets in the section on public
> > >> interfaces also has the same issue with the definition of latest
> offsets
> > >> that I've mentioned above (latest offsets from poll versus latest
> > offsets
> > >> that are about to be committed).
> > >
> > >
> > > Done
> > >
> > > 11) The Javadoc for SourceTask::updateOffsets also introduces the same
> > >> confusion w.r.t updating offsets that I've mentioned above (modifying
> > the
> > >> offsets map argument versus returning a modified copy of the offsets
> > map).
> > >
> > >
> > >
> > > I have modified the verbiage and even the meaning of the return type as
> > > suggested by Chris.
> > >
> > > 12) In the section on compatibility, we should explicitly mention that
> > >> connectors which implement the new method will still be compatible
> with
> > >> older Connect runtimes where the method will simply not be invoked.
> > >
> > >
> > > Done
> > >
> > >
> > > Chris:
> > >
> > > 1. (Nit) Can we move the "Public Interfaces" section before the
> "Proposed
> > >> Changes" section? It's nice to have a summary of the
> > user/developer-facing
> > >> changes first since that answers many of the questions that I had
> while
> > >> reading the "Proposed Changes" section. I'd bet that this is also why
> we
> > >> use that ordering in the KIP template.
> > >
> > >
> > >
> > > Done
> > >
> > > 2. Why are we invoking SourceTask::updateOffsets so frequently when
> > >> exactly-once support is disabled? Wouldn't it be simpler both for our
> > >> implementation and for connector developers if we only invoked it
> > directly
> > >> before committing offsets, instead of potentially several times
> between
> > >> offset commits, especially since that would also mirror the behavior
> > with
> > >> exactly-once support enabled?
> > >
> > >
> > >
> > > Hmm the idea was to keep the changes bounded within the SourceTask
> loop.
> > > Since the EOS-disabled case
> > > uses a separate thread  to commit offsets, I thought it's easier to
> have
> > > the updateOffsets invoked in
> > > the same loop and have it update the committableOffsets. The committer
> > > thread will keep doing what it
> > > does today. I felt this is easier to reason about. WDYT?
> > >
> > >
> > > 3. Building off of point 2, we wouldn't need to specify any more detail
> > >> than that "SourceTask::updateOffsets will be invoked directly before
> > >> committing offsets, with the to-be-committed offsets". There would be
> no
> > >> need to distinguish between when exactly-once support is enabled or
> > >> disabled.
> > >
> > >
> > >
> > > Yeah I have added the fact that updateOffsets would be invoked before
> > > committing offsets with about to be committed offsets.
> > > I have still left the EOS enabled/disabled intact because there are
> > > differences that I wanted to highlight like honouring
> > > Transaction boundaries and another edge case with Connector transaction
> > > boundary mode that Yash had brought up.
> > >
> > >
> > > 4. Some general stylistic feedback: we shouldn't mention the names of
> > >> internal classes or methods in KIPs. KIPS are for discussing
> high-level
> > >> design proposals. Internal names and APIS may change over time, and
> are
> > >> not
> > >> very helpful to readers who are not already familiar with the code
> base.
> > >> Instead, we should describe changes in behavior, not code.
> > >
> > >
> > >
> > > Yeah I generally avoid dwelling into the details but in this case I
> felt
> > I
> > > need to explain a bit more why
> > > I am proposing what I am proposing. I have made the edits.
> > >
> > > 5. Why return a complete map of to-be-committed offsets instead of a
> map
> > of
> > >> just the offsets that the connector wants to change? This seems
> > especially
> > >> intuitive since we automatically re-insert source partitions that have
> > >> been
> > >> removed by the connector.
> > >
> > >
> > >
> > > Makes sense. I updated the KIP accordingly.
> > >
> > > 6. I don't think we don't need to return an Optional from
> > >> SourceTask::updateOffsets. Developers can return null instead of
> > >> Optional.empty(), and since the framework will have to handle null
> > return
> > >> values either way, this would reduce the number of cases for us to
> > handle
> > >> from three (Optional.of(...), Optional.empty(), null) to two (null,
> > >> non-null).
> > >
> > >
> > >
> > > I see. I didn't want to have explicit null checks but then I realised
> > > connect does have explicit null
> > > checks. Edited.
> > >
> > >
> > > 7. Why disallow tombstone records? If an upstream resource disappears,
> > then
> > >> wouldn't a task want to emit a tombstone record without having to also
> > >> emit
> > >> an accompanying source record? This could help prevent an
> > >> infinitely-growing offsets topic, although with KIP-875 coming out in
> > the
> > >> next release, perhaps we can leave this out for now and let Connect
> > users
> > >> and cluster administrators do this work manually instead of letting
> > >> connector developers automate it.
> > >
> > >
> > >
> > > Even before I considered KIP-875's effects, my thought was to not
> meddle
> > > too much with the inner
> > > workings of the offsets topic. I think even today users can produce an
> > > offset record to the offsets
> > > topic to drop an unwanted partition but that should be used as a last
> > > resort. I didn't want to introduce
> > > any such mechanisms via this proposal. And with KIP-875 coming in, it
> > > makes all the more sense to not do
> > > it and have the offsets deleted in a more standardised way. The last
> part
> > > about KIP-875 is what I have mentioned
> > > in the KIP.
> > >
> > >
> > > 8. Is the information on multiple offsets topics for exactly-once
> > >> connectors relevant to this KIP? If not, we should remove it.
> > >
> > >
> > > Removed.
> > >
> > >
> > > 9. It seems like most of the use cases that motivate this KIP only
> > require
> > >> being able to add a new source partition/source offset pair to the
> > >> to-be-committed offsets. Do we need to allow connector developers to
> > >> modify
> > >> source offsets for already-present source partitions at all? If we
> > reduce
> > >> the surface of the API, then the worst case is still just that the
> > offsets
> > >> we commit are at most one commit out-of-date.
> > >
> > >
> > > It could be useful in a scenario where the offset of a partition
> doesn't
> > > update for some period of time. In
> > > such cases, the connector can do some kind of state tracking and update
> > > the offsets after the time period elapses.
> > >
> > > I had mentioned an example of this scenario in an earlier e-mail:
> > >
> > >
> > > There's also a case at times with CDC source connectors which are REST
> > Api
> > >> / Web Service based(Zendesk Source Connector for example) . These
> > >> connectors typically use timestamps from the responses as offsets. If
> > >> there's a long period of inactivity wherein the API invocations don't
> > >> return any data, then the offsets won't move and the connector would
> > keep
> > >> using the same timestamp that it received from the last non-empty
> > response.
> > >> If this period of inactivity keeps growing, and the API imposes any
> > limits
> > >> on how far back we can go in terms of window start, then this could
> > >> potentially be a problem. In this case even though the connector was
> > caught
> > >> up with all the responses, it may need to snapshot again. In this case
> > >> updating offsets can easily help since all the connector needs to do
> is
> > to
> > >> move the timestamp which would move the offset inherently.
> > >
> > >
> > >
> > >
> > > 10. (Nit) The "Motivation" section states that "offsets are written
> > >> periodically by the connect framework to an offsets topic". This is
> only
> > >> true in distributed mode; in standalone mode, we write offsets to a
> > local
> > >> file.
> > >
> > >
> > >
> > > Ack.
> > >
> > > On Wed, Jul 5, 2023 at 8:47 PM Chris Egerton <chr...@aiven.io.invalid>
> > > wrote:
> > >
> > >> Hi Sagar,
> > >>
> > >> Thanks for updating the KIP! The latest draft seems simpler and more
> > >> focused, which I think is a win for users and developers alike. Here
> are
> > >> my
> > >> thoughts on the current draft:
> > >>
> > >> 1. (Nit) Can we move the "Public Interfaces" section before the
> > "Proposed
> > >> Changes" section? It's nice to have a summary of the
> > user/developer-facing
> > >> changes first since that answers many of the questions that I had
> while
> > >> reading the "Proposed Changes" section. I'd bet that this is also why
> we
> > >> use that ordering in the KIP template.
> > >>
> > >> 2. Why are we invoking SourceTask::updateOffsets so frequently when
> > >> exactly-once support is disabled? Wouldn't it be simpler both for our
> > >> implementation and for connector developers if we only invoked it
> > directly
> > >> before committing offsets, instead of potentially several times
> between
> > >> offset commits, especially since that would also mirror the behavior
> > with
> > >> exactly-once support enabled?
> > >>
> > >> 3. Building off of point 2, we wouldn't need to specify any more
> detail
> > >> than that "SourceTask::updateOffsets will be invoked directly before
> > >> committing offsets, with the to-be-committed offsets". There would be
> no
> > >> need to distinguish between when exactly-once support is enabled or
> > >> disabled.
> > >>
> > >> 4. Some general stylistic feedback: we shouldn't mention the names of
> > >> internal classes or methods in KIPs. KIPS are for discussing
> high-level
> > >> design proposals. Internal names and APIS may change over time, and
> are
> > >> not
> > >> very helpful to readers who are not already familiar with the code
> base.
> > >> Instead, we should describe changes in behavior, not code.
> > >>
> > >> 5. Why return a complete map of to-be-committed offsets instead of a
> map
> > >> of
> > >> just the offsets that the connector wants to change? This seems
> > especially
> > >> intuitive since we automatically re-insert source partitions that have
> > >> been
> > >> removed by the connector.
> > >>
> > >> 6. I don't think we don't need to return an Optional from
> > >> SourceTask::updateOffsets. Developers can return null instead of
> > >> Optional.empty(), and since the framework will have to handle null
> > return
> > >> values either way, this would reduce the number of cases for us to
> > handle
> > >> from three (Optional.of(...), Optional.empty(), null) to two (null,
> > >> non-null).
> > >>
> > >> 7. Why disallow tombstone records? If an upstream resource disappears,
> > >> then
> > >> wouldn't a task want to emit a tombstone record without having to also
> > >> emit
> > >> an accompanying source record? This could help prevent an
> > >> infinitely-growing offsets topic, although with KIP-875 coming out in
> > the
> > >> next release, perhaps we can leave this out for now and let Connect
> > users
> > >> and cluster administrators do this work manually instead of letting
> > >> connector developers automate it.
> > >>
> > >> 8. Is the information on multiple offsets topics for exactly-once
> > >> connectors relevant to this KIP? If not, we should remove it.
> > >>
> > >> 9. It seems like most of the use cases that motivate this KIP only
> > require
> > >> being able to add a new source partition/source offset pair to the
> > >> to-be-committed offsets. Do we need to allow connector developers to
> > >> modify
> > >> source offsets for already-present source partitions at all? If we
> > reduce
> > >> the surface of the API, then the worst case is still just that the
> > offsets
> > >> we commit are at most one commit out-of-date.
> > >>
> > >> 10. (Nit) The "Motivation" section states that "offsets are written
> > >> periodically by the connect framework to an offsets topic". This is
> only
> > >> true in distributed mode; in standalone mode, we write offsets to a
> > local
> > >> file.
> > >>
> > >> Cheers,
> > >>
> > >> Chris
> > >>
> > >> On Tue, Jul 4, 2023 at 8:42 AM Yash Mayya <yash.ma...@gmail.com>
> wrote:
> > >>
> > >> > Hi Sagar,
> > >> >
> > >> > Thanks for your continued work on this KIP! Here are my thoughts on
> > your
> > >> > updated proposal:
> > >> >
> > >> > 1) In the proposed changes section where you talk about modifying
> the
> > >> > offsets, could you please clarify that tasks shouldn't modify the
> > >> offsets
> > >> > map that is passed as an argument? Currently, the distinction
> between
> > >> the
> > >> > offsets map passed as an argument and the offsets map that is
> returned
> > >> is
> > >> > not very clear in numerous places.
> > >> >
> > >> > 2) The default return value of Optional.empty() seems to be fairly
> > >> > non-intuitive considering that the return value is supposed to be
> the
> > >> > offsets that are to be committed. Can we consider simply returning
> the
> > >> > offsets argument itself by default instead?
> > >> >
> > >> > 3) The KIP states that "It is also possible that a task might choose
> > to
> > >> > send a tombstone record as an offset. This is not recommended and to
> > >> > prevent connectors shooting themselves in the foot due to this" -
> > could
> > >> you
> > >> > please clarify why this is not recommended / supported?
> > >> >
> > >> > 4) The KIP states that "If a task returns an Optional of a null
> object
> > >> or
> > >> > an Optional of an empty map, even for such cases the behaviour would
> > >> would
> > >> > be disabled." - since this is an optional API that source task
> > >> > implementations don't necessarily need to implement, I don't think I
> > >> fully
> > >> > follow why the return type of the proposed "updateOffsets" method is
> > an
> > >> > Optional? Can we not simply use the Map as the return type instead?
> > >> >
> > >> > 5) The KIP states that "The offsets passed to the updateOffsets
> > method
> > >> > would be the offset from the latest source record amongst all source
> > >> > records per partition. This way, if the source offset for a given
> > source
> > >> > partition is updated, that offset is the one that gets committed for
> > the
> > >> > source partition." - we should clarify that the "latest" offset
> refers
> > >> to
> > >> > the offsets that are about to be committed, and not the latest
> offsets
> > >> > returned from SourceTask::poll so far (see related discussion in
> > >> > https://issues.apache.org/jira/browse/KAFKA-15091 and
> > >> > https://issues.apache.org/jira/browse/KAFKA-5716).
> > >> >
> > >> > 6) We haven't used the terminology of "Atleast Once Semantics"
> > >> elsewhere in
> > >> > Connect since the framework itself does not (and cannot) make any
> > >> > guarantees on the delivery semantics. Depending on the source
> > connector
> > >> and
> > >> > the source system, both at-least once and at-most once semantics
> (for
> > >> > example - a source system where reads are destructive) are possible.
> > We
> > >> > should avoid introducing this terminology in the KIP and instead
> refer
> > >> to
> > >> > this scenario as exactly-once support being disabled.
> > >> >
> > >> > 7) Similar to the above point, we should remove the use of the term
> > >> > "Exactly Once Semantics" and instead refer to exactly-once support
> > being
> > >> > enabled since the framework can't guarantee exactly-once semantics
> for
> > >> all
> > >> > possible source connectors (for example - a message queue source
> > >> connector
> > >> > where offsets are essentially managed in the source system via an
> ack
> > >> > mechanism).
> > >> >
> > >> > 8) In a previous attempt to fix this gap in functionality, a
> > significant
> > >> > concern was raised on offsets ordering guarantees when we retry
> > sending
> > >> a
> > >> > batch of records (ref -
> > >> > https://github.com/apache/kafka/pull/5553/files#r213329307). It
> > doesn't
> > >> > look like this KIP addresses that concern either? In the case where
> > >> > exactly-once support is disabled - if we update the
> committableOffsets
> > >> with
> > >> > the offsets provided by the task through the new updateOffsets
> method,
> > >> > these offsets could be committed before older "regular" offsets are
> > >> > committed due to producer retries which could then lead to an
> > >> inconsistency
> > >> > if the send operation eventually succeeds.
> > >> >
> > >> > 9) The KIP states that when exactly-once support is enabled, the new
> > >> > SourceTask::updateOffsets method will be invoked only when an offset
> > >> flush
> > >> > is attempted. If the connector is configured to use a connector
> > >> specified
> > >> > transaction boundary rather than a poll or interval based boundary,
> > >> isn't
> > >> > it possible that we don't call SourceTask::updateOffsets until there
> > are
> > >> > actual records that are also being returned through poll (which
> would
> > >> > defeat the primary motivation of the KIP)? Or are we making the
> > >> assumption
> > >> > that the connector defined transaction boundary should handle this
> > case
> > >> > appropriately if needed (i.e. source tasks should occasionally
> request
> > >> for
> > >> > a transaction commit via their transaction context if they want
> > offsets
> > >> to
> > >> > be committed without producing records)? If so, I think we should
> > >> > explicitly call that out in the KIP.
> > >> >
> > >> > 10) The Javadoc for SourceTask::updateOffsets in the section on
> public
> > >> > interfaces also has the same issue with the definition of latest
> > offsets
> > >> > that I've mentioned above (latest offsets from poll versus latest
> > >> offsets
> > >> > that are about to be committed).
> > >> >
> > >> > 11) The Javadoc for SourceTask::updateOffsets also introduces the
> same
> > >> > confusion w.r.t updating offsets that I've mentioned above
> (modifying
> > >> the
> > >> > offsets map argument versus returning a modified copy of the offsets
> > >> map).
> > >> >
> > >> > 12) In the section on compatibility, we should explicitly mention
> that
> > >> > connectors which implement the new method will still be compatible
> > with
> > >> > older Connect runtimes where the method will simply not be invoked.
> > >> >
> > >> >
> > >> > Thanks,
> > >> > Yash
> > >> >
> > >> > On Wed, Jun 21, 2023 at 10:25 PM Sagar <sagarmeansoc...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Hi All,
> > >> > >
> > >> > > I have created this PR:
> https://github.com/apache/kafka/pull/13899
> > >> which
> > >> > > implements the approach outlined in the latest version of the
> KIP. I
> > >> > > thought I could use this to validate the approach based on my
> > >> > understanding
> > >> > > while the KIP itself gets reviewed. I can always change the
> > >> > implementation
> > >> > > once we move to a final decision on the KIP.
> > >> > >
> > >> > > Thanks!
> > >> > > Sagar.
> > >> > >
> > >> > >
> > >> > > On Wed, Jun 14, 2023 at 4:59 PM Sagar <sagarmeansoc...@gmail.com>
> > >> wrote:
> > >> > >
> > >> > > > Hey All,
> > >> > > >
> > >> > > > Bumping this discussion thread again to see how the modified KIP
> > >> looks
> > >> > > > like.
> > >> > > >
> > >> > > > Thanks!
> > >> > > > Sagar.
> > >> > > >
> > >> > > > On Mon, May 29, 2023 at 8:12 PM Sagar <
> sagarmeansoc...@gmail.com>
> > >> > wrote:
> > >> > > >
> > >> > > >> Hi,
> > >> > > >>
> > >> > > >> Bumping this thread again for further reviews.
> > >> > > >>
> > >> > > >> Thanks!
> > >> > > >> Sagar.
> > >> > > >>
> > >> > > >> On Fri, May 12, 2023 at 3:38 PM Sagar <
> sagarmeansoc...@gmail.com
> > >
> > >> > > wrote:
> > >> > > >>
> > >> > > >>> Hi All,
> > >> > > >>>
> > >> > > >>> Thanks for the comments/reviews. I have updated the KIP
> > >> > > >>>
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> > >> > > >>> with a newer approach which shelves the need for an explicit
> > >> topic.
> > >> > > >>>
> > >> > > >>> Please review again and let me know what you think.
> > >> > > >>>
> > >> > > >>> Thanks!
> > >> > > >>> Sagar.
> > >> > > >>>
> > >> > > >>>
> > >> > > >>> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya <
> > yash.ma...@gmail.com>
> > >> > > wrote:
> > >> > > >>>
> > >> > > >>>> Hi Sagar,
> > >> > > >>>>
> > >> > > >>>> Thanks for the KIP! I have a few questions and comments:
> > >> > > >>>>
> > >> > > >>>> 1) I agree with Chris' point about the separation of a
> > connector
> > >> > > >>>> heartbeat
> > >> > > >>>> mechanism and allowing source connectors to generate offsets
> > >> without
> > >> > > >>>> producing data. What is the purpose of the heartbeat topic
> here
> > >> and
> > >> > > are
> > >> > > >>>> there any concrete use cases for downstream consumers on this
> > >> topic?
> > >> > > Why
> > >> > > >>>> can't we instead simply introduce a mechanism to retrieve a
> > list
> > >> of
> > >> > > >>>> source
> > >> > > >>>> partition / source offset pairs from the source tasks?
> > >> > > >>>>
> > >> > > >>>> 2) With the currently described mechanism, the new
> > >> > > >>>> "SourceTask::produceHeartbeatRecords" method returns a
> > >> > > >>>> "List<SourceRecord>"
> > >> > > >>>> - what happens with the topic in each of these source
> records?
> > >> Chris
> > >> > > >>>> pointed this out above, but it doesn't seem to have been
> > >> addressed?
> > >> > > The
> > >> > > >>>> "SourceRecord" class also has a bunch of other fields which
> > will
> > >> be
> > >> > > >>>> irrelevant here (partition, key / value schema, key / value
> > data,
> > >> > > >>>> timestamp, headers). In fact, it seems like only the source
> > >> > partition
> > >> > > >>>> and
> > >> > > >>>> source offset are relevant here, so we should either
> introduce
> > a
> > >> new
> > >> > > >>>> abstraction or simply use a data structure like a mapping
> from
> > >> > source
> > >> > > >>>> partitions to source offsets (adds to the above point)?
> > >> > > >>>>
> > >> > > >>>> 3) I'm not sure I fully follow why the heartbeat timer /
> > >> interval is
> > >> > > >>>> needed? What are the downsides of
> > >> > > >>>> calling "SourceTask::produceHeartbeatRecords" in every
> > execution
> > >> > loop
> > >> > > >>>> (similar to the existing "SourceTask::poll" method)? Is this
> > >> only to
> > >> > > >>>> prevent the generation of a lot of offset records? Since
> > >> Connect's
> > >> > > >>>> offsets
> > >> > > >>>> topics are log compacted (and source partitions are used as
> > keys
> > >> for
> > >> > > >>>> each
> > >> > > >>>> source offset), I'm not sure if such concerns are valid and
> > such
> > >> a
> > >> > > >>>> heartbeat timer / interval mechanism is required?
> > >> > > >>>>
> > >> > > >>>> 4) The first couple of rejected alternatives state that the
> use
> > >> of a
> > >> > > >>>> null
> > >> > > >>>> topic / key / value are preferably avoided - but the current
> > >> > proposal
> > >> > > >>>> would
> > >> > > >>>> also likely require connectors to use such workarounds (null
> > >> topic
> > >> > > when
> > >> > > >>>> the
> > >> > > >>>> heartbeat topic is configured at a worker level and always
> for
> > >> the
> > >> > > key /
> > >> > > >>>> value)?
> > >> > > >>>>
> > >> > > >>>> 5) The third rejected alternative talks about subclassing the
> > >> > > >>>> "SourceRecord" class - this presumably means allowing
> > connectors
> > >> to
> > >> > > pass
> > >> > > >>>> special offset only records via the existing poll mechanism?
> > Why
> > >> was
> > >> > > >>>> this
> > >> > > >>>> considered a more invasive option? Was it because of the
> > backward
> > >> > > >>>> compatibility issues that would be introduced for plugins
> using
> > >> the
> > >> > > new
> > >> > > >>>> public API class that still need to be deployed onto older
> > >> Connect
> > >> > > >>>> workers?
> > >> > > >>>>
> > >> > > >>>> Thanks,
> > >> > > >>>> Yash
> > >> > > >>>>
> > >> > > >>>> On Fri, Apr 14, 2023 at 6:45 PM Sagar <
> > sagarmeansoc...@gmail.com
> > >> >
> > >> > > >>>> wrote:
> > >> > > >>>>
> > >> > > >>>> > One thing I forgot to mention in my previous email was that
> > the
> > >> > > >>>> reason I
> > >> > > >>>> > chose to include the opt-in behaviour via configs was that
> > the
> > >> > users
> > >> > > >>>> of the
> > >> > > >>>> > connector know their workload patterns. If the workload is
> > such
> > >> > that
> > >> > > >>>> the
> > >> > > >>>> >  connector would receive regular valid updates then there’s
> > >> > ideally
> > >> > > >>>> no need
> > >> > > >>>> > for moving offsets since it would update automatically.
> > >> > > >>>> >
> > >> > > >>>> > This way they aren’t forced to use this feature and can use
> > it
> > >> > only
> > >> > > >>>> when
> > >> > > >>>> > the workload is expected to be batchy or not frequent.
> > >> > > >>>> >
> > >> > > >>>> > Thanks!
> > >> > > >>>> > Sagar.
> > >> > > >>>> >
> > >> > > >>>> >
> > >> > > >>>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar <
> > >> sagarmeansoc...@gmail.com>
> > >> > > >>>> wrote:
> > >> > > >>>> >
> > >> > > >>>> > > Hi Chris,
> > >> > > >>>> > >
> > >> > > >>>> > > Thanks for following up on the response. Sharing my
> > thoughts
> > >> > > >>>> further:
> > >> > > >>>> > >
> > >> > > >>>> > > If we want to add support for connectors to emit offsets
> > >> without
> > >> > > >>>> > >> accompanying source records, we could (and IMO should)
> do
> > >> that
> > >> > > >>>> without
> > >> > > >>>> > >> requiring users to manually enable that feature by
> > adjusting
> > >> > > >>>> worker or
> > >> > > >>>> > >> connector configurations.
> > >> > > >>>> > >
> > >> > > >>>> > >
> > >> > > >>>> > > With the current KIP design, I have tried to implement
> this
> > >> in
> > >> > an
> > >> > > >>>> opt-in
> > >> > > >>>> > > manner via configs. I guess what you are trying to say is
> > >> that
> > >> > > this
> > >> > > >>>> > doesn't
> > >> > > >>>> > > need a config of it's own and instead could be part of
> the
> > >> poll
> > >> > ->
> > >> > > >>>> > > transform etc -> produce -> commit cycle. That way, the
> > users
> > >> > > don't
> > >> > > >>>> need
> > >> > > >>>> > to
> > >> > > >>>> > > set any config and if the connector supports moving
> offsets
> > >> w/o
> > >> > > >>>> producing
> > >> > > >>>> > > SourceRecords, it should happen automatically. Is that
> > >> correct?
> > >> > If
> > >> > > >>>> that
> > >> > > >>>> > > is the concern, then I can think of not exposing a config
> > and
> > >> > try
> > >> > > >>>> to make
> > >> > > >>>> > > this process automatically. That should ease the load on
> > >> > connector
> > >> > > >>>> users,
> > >> > > >>>> > > but your point about cognitive load on Connector
> > developers,
> > >> I
> > >> > am
> > >> > > >>>> still
> > >> > > >>>> > not
> > >> > > >>>> > > sure how to address that. The offsets are privy to a
> > >> connector
> > >> > and
> > >> > > >>>> the
> > >> > > >>>> > > framework at best can provide hooks to the tasks to
> update
> > >> their
> > >> > > >>>> offsets.
> > >> > > >>>> > > Connector developers would still have to consider all
> cases
> > >> > before
> > >> > > >>>> > updating
> > >> > > >>>> > > offsets.  And if I ignore the heartbeat topic and
> heartbeat
> > >> > > >>>> interval ms
> > >> > > >>>> > > configs, then what the KIP proposes currently isn't much
> > >> > different
> > >> > > >>>> in
> > >> > > >>>> > that
> > >> > > >>>> > > regard. Just that it produces a List of SourceRecord
> which
> > >> can
> > >> > be
> > >> > > >>>> changed
> > >> > > >>>> > > to a Map of SourcePartition and their offsets if you
> think
> > >> that
> > >> > > >>>> would
> > >> > > >>>> > > simplify things. Are there other cases in your mind which
> > >> need
> > >> > > >>>> > addressing?
> > >> > > >>>> > >
> > >> > > >>>> > > Here's my take on the usecases:
> > >> > > >>>> > >
> > >> > > >>>> > >    1. Regarding the example about SMTs with Object
> Storage
> > >> based
> > >> > > >>>> > >    connectors, it was one of the scenarios identified. We
> > >> have
> > >> > > some
> > >> > > >>>> > connectors
> > >> > > >>>> > >    that rely on the offsets topic to check if the next
> > batch
> > >> of
> > >> > > >>>> files
> > >> > > >>>> > should
> > >> > > >>>> > >    be processed and because of filtering of the last
> record
> > >> from
> > >> > > the
> > >> > > >>>> > files,
> > >> > > >>>> > >    the eof supposedly is  never reached and the connector
> > >> can't
> > >> > > >>>> commit
> > >> > > >>>> > offsets
> > >> > > >>>> > >    for that source partition(file). If there was a
> > mechanism
> > >> to
> > >> > > >>>> update
> > >> > > >>>> > offsets
> > >> > > >>>> > >    for such a source file, then with some moderately
> > complex
> > >> > state
> > >> > > >>>> > tracking,
> > >> > > >>>> > >    the connector can mark that file as processed and
> > proceed.
> > >> > > >>>> > >    2. There's another use case with the same class of
> > >> connectors
> > >> > > >>>> where if
> > >> > > >>>> > >    a file is malformed, then the connector couldn't
> produce
> > >> any
> > >> > > >>>> offsets
> > >> > > >>>> > >    because the file couldn't get processed completely. To
> > >> handle
> > >> > > >>>> such
> > >> > > >>>> > cases,
> > >> > > >>>> > >    the connector developers have introduced a dev/null
> sort
> > >> of
> > >> > > topic
> > >> > > >>>> > where
> > >> > > >>>> > >    they produce a record to this corrupted file topic and
> > >> move
> > >> > the
> > >> > > >>>> offset
> > >> > > >>>> > >    somehow. This topic ideally isn't needed and with a
> > >> mechanism
> > >> > > to
> > >> > > >>>> > update
> > >> > > >>>> > >    offsets would have helped in this case as well.
> > >> > > >>>> > >    3. Coming to CDC based connectors,
> > >> > > >>>> > >       1. We had a similar issue with Oracle CDC source
> > >> connector
> > >> > > and
> > >> > > >>>> > >       needed to employ the same heartbeat mechanism to
> get
> > >> > around
> > >> > > >>>> it.
> > >> > > >>>> > >       2. MongoDB CDC source Connector  has employed the
> > same
> > >> > > >>>> heartbeat
> > >> > > >>>> > >       mechanism Check `heartbeat.interval.ms` here (
> > >> > > >>>> > >
> > >> > > >>>> >
> > >> > > >>>>
> > >> > >
> > >> >
> > >>
> >
> https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/
> > >> > > >>>> > >       ).
> > >> > > >>>> > >       3. Another CDC connector for ScyllaDB employs a
> > similar
> > >> > > >>>> mechanism.
> > >> > > >>>> > >
> > >> > > >>>> >
> > >> > > >>>>
> > >> > >
> > >> >
> > >>
> >
> https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat
> > >> > > >>>> > >       4. For CDC based connectors, you could argue that
> > these
> > >> > > >>>> connectors
> > >> > > >>>> > >       have been able to solve this error then why do we
> > need
> > >> > > >>>> framework
> > >> > > >>>> > level
> > >> > > >>>> > >       support. But the point I am trying to make is that
> > this
> > >> > > >>>> limitation
> > >> > > >>>> > from the
> > >> > > >>>> > >       framework is forcing CDC connector developers to
> > >> implement
> > >> > > >>>> > per-connector
> > >> > > >>>> > >       solutions/hacks(at times). And there could always
> be
> > >> more
> > >> > > CDC
> > >> > > >>>> > connectors in
> > >> > > >>>> > >       the pipeline forcing them to take a similar route
> as
> > >> well.
> > >> > > >>>> > >    4. There's also a case at times with CDC source
> > connectors
> > >> > > which
> > >> > > >>>> are
> > >> > > >>>> > >    REST Api / Web Service based(Zendesk Source Connector
> > for
> > >> > > >>>> example) .
> > >> > > >>>> > These
> > >> > > >>>> > >    connectors typically use timestamps from the responses
> > as
> > >> > > >>>> offsets. If
> > >> > > >>>> > >    there's a long period of inactivity wherein the API
> > >> > invocations
> > >> > > >>>> don't
> > >> > > >>>> > >    return any data, then the offsets won't move and the
> > >> > connector
> > >> > > >>>> would
> > >> > > >>>> > keep
> > >> > > >>>> > >    using the same timestamp that it received from the
> last
> > >> > > non-empty
> > >> > > >>>> > response.
> > >> > > >>>> > >    If this period of inactivity keeps growing, and the
> API
> > >> > imposes
> > >> > > >>>> any
> > >> > > >>>> > limits
> > >> > > >>>> > >    on how far back we can go in terms of window start,
> then
> > >> this
> > >> > > >>>> could
> > >> > > >>>> > >    potentially be a problem. In this case even though the
> > >> > > connector
> > >> > > >>>> was
> > >> > > >>>> > caught
> > >> > > >>>> > >    up with all the responses, it may need to snapshot
> > again.
> > >> In
> > >> > > >>>> this case
> > >> > > >>>> > >    updating offsets can easily help since all the
> connector
> > >> > needs
> > >> > > >>>> to do
> > >> > > >>>> > is to
> > >> > > >>>> > >    move the timestamp which would move the offset
> > inherently.
> > >> > > >>>> > >
> > >> > > >>>> > > I still believe that this is something the framework
> should
> > >> > > support
> > >> > > >>>> OOB
> > >> > > >>>> > > irrespective of whether the connectors have been able to
> > get
> > >> > > around
> > >> > > >>>> this
> > >> > > >>>> > > restriction or not.
> > >> > > >>>> > >
> > >> > > >>>> > > Lastly, about your comments here:
> > >> > > >>>> > >
> > >> > > >>>> > > I'm also not sure that it's worth preserving the current
> > >> > behavior
> > >> > > >>>> that
> > >> > > >>>> > >> offsets for records that have been filtered out via SMT
> > are
> > >> not
> > >> > > >>>> > committed.
> > >> > > >>>> > >
> > >> > > >>>> > >
> > >> > > >>>> > > Let me know if we need a separate JIRA to track this?
> This
> > >> > somehow
> > >> > > >>>> didn't
> > >> > > >>>> > > look related to this discussion.
> > >> > > >>>> > >
> > >> > > >>>> > > Thanks!
> > >> > > >>>> > > Sagar.
> > >> > > >>>> > >
> > >> > > >>>> > >
> > >> > > >>>> > > On Wed, Apr 12, 2023 at 9:34 PM Chris Egerton
> > >> > > >>>> <chr...@aiven.io.invalid>
> > >> > > >>>> > > wrote:
> > >> > > >>>> > >
> > >> > > >>>> > >> Hi Sagar,
> > >> > > >>>> > >>
> > >> > > >>>> > >> I'm sorry, I'm still not convinced that this design
> solves
> > >> the
> > >> > > >>>> > problem(s)
> > >> > > >>>> > >> it sets out to solve in the best way possible. I tried
> to
> > >> > > >>>> highlight this
> > >> > > >>>> > >> in
> > >> > > >>>> > >> my last email:
> > >> > > >>>> > >>
> > >> > > >>>> > >> > In general, it seems like we're trying to solve two
> > >> > completely
> > >> > > >>>> > different
> > >> > > >>>> > >> problems with this single KIP: adding framework-level
> > >> support
> > >> > for
> > >> > > >>>> > emitting
> > >> > > >>>> > >> heartbeat records for source connectors, and allowing
> > source
> > >> > > >>>> connectors
> > >> > > >>>> > to
> > >> > > >>>> > >> emit offsets without also emitting source records. I
> don't
> > >> mind
> > >> > > >>>> > addressing
> > >> > > >>>> > >> the two at the same time if the result is elegant and
> > >> doesn't
> > >> > > >>>> compromise
> > >> > > >>>> > >> on
> > >> > > >>>> > >> the solution for either problem, but that doesn't seem
> to
> > be
> > >> > the
> > >> > > >>>> case
> > >> > > >>>> > >> here.
> > >> > > >>>> > >> Of the two problems, could we describe one as the
> primary
> > >> and
> > >> > one
> > >> > > >>>> as the
> > >> > > >>>> > >> secondary? If so, we might consider dropping the
> secondary
> > >> > > problem
> > >> > > >>>> from
> > >> > > >>>> > >> this KIP and addressing it separately.
> > >> > > >>>> > >>
> > >> > > >>>> > >> If we wanted to add support for heartbeat records, we
> > could
> > >> > (and
> > >> > > >>>> IMO
> > >> > > >>>> > >> should) do that without requiring connectors to
> implement
> > >> any
> > >> > new
> > >> > > >>>> > methods
> > >> > > >>>> > >> and only require adjustments to worker or connector
> > >> > > configurations
> > >> > > >>>> by
> > >> > > >>>> > >> users
> > >> > > >>>> > >> in order to enable that feature.
> > >> > > >>>> > >>
> > >> > > >>>> > >> If we want to add support for connectors to emit offsets
> > >> > without
> > >> > > >>>> > >> accompanying source records, we could (and IMO should)
> do
> > >> that
> > >> > > >>>> without
> > >> > > >>>> > >> requiring users to manually enable that feature by
> > adjusting
> > >> > > >>>> worker or
> > >> > > >>>> > >> connector configurations.
> > >> > > >>>> > >>
> > >> > > >>>> > >>
> > >> > > >>>> > >> I'm also not sure that it's worth preserving the current
> > >> > behavior
> > >> > > >>>> that
> > >> > > >>>> > >> offsets for records that have been filtered out via SMT
> > are
> > >> not
> > >> > > >>>> > committed.
> > >> > > >>>> > >> I can't think of a case where this would be useful and
> > there
> > >> > are
> > >> > > >>>> > obviously
> > >> > > >>>> > >> plenty where it isn't. There's also a slight discrepancy
> > in
> > >> how
> > >> > > >>>> these
> > >> > > >>>> > >> kinds
> > >> > > >>>> > >> of records are treated by the Connect runtime now; if a
> > >> record
> > >> > is
> > >> > > >>>> > dropped
> > >> > > >>>> > >> because of an SMT, then its offset isn't committed, but
> if
> > >> it's
> > >> > > >>>> dropped
> > >> > > >>>> > >> because exactly-once support is enabled and the
> connector
> > >> chose
> > >> > > to
> > >> > > >>>> abort
> > >> > > >>>> > >> the batch containing the record, then its offset is
> still
> > >> > > >>>> committed.
> > >> > > >>>> > After
> > >> > > >>>> > >> thinking carefully about the aborted transaction
> behavior,
> > >> we
> > >> > > >>>> realized
> > >> > > >>>> > >> that
> > >> > > >>>> > >> it was fine to commit the offsets for those records,
> and I
> > >> > > believe
> > >> > > >>>> that
> > >> > > >>>> > >> the
> > >> > > >>>> > >> same logic can be applied to any record that we're done
> > >> trying
> > >> > to
> > >> > > >>>> send
> > >> > > >>>> > to
> > >> > > >>>> > >> Kafka (regardless of whether it was sent correctly,
> > dropped
> > >> due
> > >> > > to
> > >> > > >>>> > >> producer
> > >> > > >>>> > >> error, filtered via SMT, etc.).
> > >> > > >>>> > >>
> > >> > > >>>> > >> I also find the file-based source connector example a
> > little
> > >> > > >>>> confusing.
> > >> > > >>>> > >> What about that kind of connector causes the offset for
> > the
> > >> > last
> > >> > > >>>> record
> > >> > > >>>> > of
> > >> > > >>>> > >> a file to be treated differently? Is there anything
> > >> different
> > >> > > about
> > >> > > >>>> > >> filtering that record via SMT vs. dropping it altogether
> > >> > because
> > >> > > >>>> of an
> > >> > > >>>> > >> asynchronous producer error with "errors.tolerance" set
> to
> > >> > "all"?
> > >> > > >>>> And
> > >> > > >>>> > >> finally, how would such a connector use the design
> > proposed
> > >> > here?
> > >> > > >>>> > >>
> > >> > > >>>> > >> Finally, I don't disagree that if there are other
> > legitimate
> > >> > use
> > >> > > >>>> cases
> > >> > > >>>> > >> that
> > >> > > >>>> > >> would be helped by addressing KAFKA-3821, we should try
> to
> > >> > solve
> > >> > > >>>> that
> > >> > > >>>> > >> issue
> > >> > > >>>> > >> in the Kafka Connect framework instead of requiring
> > >> individual
> > >> > > >>>> > connectors
> > >> > > >>>> > >> to implement their own solutions. But the cognitive load
> > >> added
> > >> > by
> > >> > > >>>> the
> > >> > > >>>> > >> design proposed here, for connector developers and
> Connect
> > >> > > cluster
> > >> > > >>>> > >> administrators alike, costs too much to justify by
> > pointing
> > >> to
> > >> > an
> > >> > > >>>> > >> already-solved problem encountered by a single group of
> > >> > > connectors
> > >> > > >>>> > (i.e.,
> > >> > > >>>> > >> Debezium). This is why I think it's crucial that we
> > identify
> > >> > > >>>> realistic
> > >> > > >>>> > >> cases where this feature would actually be useful, and
> > right
> > >> > > now, I
> > >> > > >>>> > don't
> > >> > > >>>> > >> think any have been provided (at least, not ones that
> have
> > >> > > already
> > >> > > >>>> been
> > >> > > >>>> > >> addressed or could be addressed with much simpler
> > changes).
> > >> > > >>>> > >>
> > >> > > >>>> > >> Cheers,
> > >> > > >>>> > >>
> > >> > > >>>> > >> Chris
> > >> > > >>>> > >>
> > >> > > >>>> > >> On Tue, Apr 11, 2023 at 7:30 AM Sagar <
> > >> > sagarmeansoc...@gmail.com
> > >> > > >
> > >> > > >>>> > wrote:
> > >> > > >>>> > >>
> > >> > > >>>> > >> > Hi Chris,
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > Thanks for your detailed feedback!
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > nits: I have taken care of them now. Thanks for
> pointing
> > >> > those
> > >> > > >>>> out.
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > non-nits:
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > 6) It seems (based on both the KIP and discussion on
> > >> > > KAFKA-3821)
> > >> > > >>>> that
> > >> > > >>>> > >> the
> > >> > > >>>> > >> > > only use case for being able to emit offsets without
> > >> also
> > >> > > >>>> emitting
> > >> > > >>>> > >> source
> > >> > > >>>> > >> > > records that's been identified so far is for CDC
> > source
> > >> > > >>>> connectors
> > >> > > >>>> > >> like
> > >> > > >>>> > >> > > Debezium.
> > >> > > >>>> > >> >
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > I am aware of atleast one more case where the non
> > >> production
> > >> > of
> > >> > > >>>> > offsets
> > >> > > >>>> > >> > (due to non production of records ) leads to the
> failure
> > >> of
> > >> > > >>>> connectors
> > >> > > >>>> > >> when
> > >> > > >>>> > >> > the source purges the records of interest. This
> happens
> > in
> > >> > File
> > >> > > >>>> based
> > >> > > >>>> > >> > source connectors  (like s3/blob storage ) in which if
> > the
> > >> > last
> > >> > > >>>> record
> > >> > > >>>> > >> from
> > >> > > >>>> > >> > a file is fiterterd due to an SMT, then that
> particular
> > >> file
> > >> > is
> > >> > > >>>> never
> > >> > > >>>> > >> > committed to the source partition and eventually when
> > the
> > >> > file
> > >> > > is
> > >> > > >>>> > >> deleted
> > >> > > >>>> > >> > from the source and the connector is restarted due to
> > some
> > >> > > >>>> reason, it
> > >> > > >>>> > >> > fails.
> > >> > > >>>> > >> > Moreover, I feel the reason this support should be
> there
> > >> in
> > >> > the
> > >> > > >>>> Kafka
> > >> > > >>>> > >> > Connect framework is because this is a restriction of
> > the
> > >> > > >>>> framework
> > >> > > >>>> > and
> > >> > > >>>> > >> > today the framework provides no support for getting
> > around
> > >> > this
> > >> > > >>>> > >> limitation.
> > >> > > >>>> > >> > Every connector has it's own way of handling offsets
> and
> > >> > having
> > >> > > >>>> each
> > >> > > >>>> > >> > connector handle this restriction in its own way can
> > make
> > >> it
> > >> > > >>>> complex.
> > >> > > >>>> > >> > Whether we choose to do it the way this KIP prescribes
> > or
> > >> any
> > >> > > >>>> other
> > >> > > >>>> > way
> > >> > > >>>> > >> is
> > >> > > >>>> > >> > up for debate but IMHO, the framework should provide a
> > >> way of
> > >> > > >>>> > >> > getting around this limitation.
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > 7. If a task produces heartbeat records and source
> > records
> > >> > that
> > >> > > >>>> use
> > >> > > >>>> > the
> > >> > > >>>> > >> > > same source partition, which offset will ultimately
> be
> > >> > > >>>> committed?
> > >> > > >>>> > >> >
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > The idea is to add the records returned by the
> > >> > > >>>> > `produceHeartbeatRecords`
> > >> > > >>>> > >> > to  the same `toSend` list within
> > >> > > >>>> `AbstractWorkerSourceTask#execute`.
> > >> > > >>>> > >> The
> > >> > > >>>> > >> > `produceHeartbeatRecords` would be invoked before we
> > make
> > >> the
> > >> > > >>>> `poll`
> > >> > > >>>> > >> call.
> > >> > > >>>> > >> > Hence, the offsets committed would be in the same
> order
> > in
> > >> > > which
> > >> > > >>>> they
> > >> > > >>>> > >> would
> > >> > > >>>> > >> > be written. Note that, the onus is on the Connector
> > >> > > >>>> implementation to
> > >> > > >>>> > >> not
> > >> > > >>>> > >> > return records which can lead to data loss or data
> going
> > >> out
> > >> > of
> > >> > > >>>> order.
> > >> > > >>>> > >> The
> > >> > > >>>> > >> > framework would just commit based on whatever is
> > supplied.
> > >> > > Also,
> > >> > > >>>> > AFAIK,
> > >> > > >>>> > >> 2
> > >> > > >>>> > >> > `normal` source records can also produce the same
> source
> > >> > > >>>> partitions
> > >> > > >>>> > and
> > >> > > >>>> > >> > they are committed in the order in which they are
> > written.
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > 8. The SourceTask::produceHeartbeatRecords method
> > returns
> > >> a
> > >> > > >>>> > >> > > List<SourceRecord>, and users can control the
> > heartbeat
> > >> > topic
> > >> > > >>>> for a
> > >> > > >>>> > >> > > connector via the (connector- or worker-level)
> > >> > > >>>> > >> "heartbeat.records.topic"
> > >> > > >>>> > >> > > property. Since every constructor for the
> SourceRecord
> > >> > class
> > >> > > >>>> [2]
> > >> > > >>>> > >> > requires a
> > >> > > >>>> > >> > > topic to be supplied, what will happen to that
> topic?
> > >> Will
> > >> > it
> > >> > > >>>> be
> > >> > > >>>> > >> ignored?
> > >> > > >>>> > >> > > If so, I think we should look for a cleaner
> solution.
> > >> > > >>>> > >> >
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > Sorry, I couldn't quite follow which topic will be
> > >> ignored in
> > >> > > >>>> this
> > >> > > >>>> > case.
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > 9. A large concern raised in the discussion for
> > KAFKA-3821
> > >> > was
> > >> > > >>>> the
> > >> > > >>>> > >> allowing
> > >> > > >>>> > >> > > connectors to control the ordering of these special
> > >> > > >>>> "offsets-only"
> > >> > > >>>> > >> > > emissions and the regular source records returned
> from
> > >> > > >>>> > >> SourceTask::poll.
> > >> > > >>>> > >> > > Are we choosing to ignore that concern? If so, can
> you
> > >> add
> > >> > > >>>> this to
> > >> > > >>>> > the
> > >> > > >>>> > >> > > rejected alternatives section along with a
> rationale?
> > >> > > >>>> > >> >
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > One thing to note is that the for every connector, the
> > >> > > condition
> > >> > > >>>> to
> > >> > > >>>> > emit
> > >> > > >>>> > >> > the heartbeat record is totally up to the connector,
> For
> > >> > > >>>> example, for
> > >> > > >>>> > a
> > >> > > >>>> > >> > connector which is tracking transactions for an
> ordered
> > >> log,
> > >> > if
> > >> > > >>>> there
> > >> > > >>>> > >> are
> > >> > > >>>> > >> > open transactions, it might not need to emit heartbeat
> > >> > records
> > >> > > >>>> when
> > >> > > >>>> > the
> > >> > > >>>> > >> > timer expires while for file based connectors, if the
> > same
> > >> > file
> > >> > > >>>> is
> > >> > > >>>> > being
> > >> > > >>>> > >> > processed again and again due to an SMT or some other
> > >> > reasons,
> > >> > > >>>> then it
> > >> > > >>>> > >> can
> > >> > > >>>> > >> > choose to emit that partition. The uber point here is
> > that
> > >> > > every
> > >> > > >>>> > >> connector
> > >> > > >>>> > >> > has it's own requirements and the framework can't
> really
> > >> make
> > >> > > an
> > >> > > >>>> > >> assumption
> > >> > > >>>> > >> > about it. What the KIP is trying to do is to provide a
> > >> > > mechanism
> > >> > > >>>> to
> > >> > > >>>> > the
> > >> > > >>>> > >> > connector to commit new offsets. With this approach,
> as
> > >> far
> > >> > as
> > >> > > I
> > >> > > >>>> can
> > >> > > >>>> > >> think
> > >> > > >>>> > >> > so far, there doesn't seem to be a case of out of
> order
> > >> > > >>>> processing. If
> > >> > > >>>> > >> you
> > >> > > >>>> > >> > have other concerns/thoughts I would be happy to know
> > >> them.
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > 10. If, sometime in the future, we wanted to add
> > >> > > framework-level
> > >> > > >>>> > support
> > >> > > >>>> > >> > > for sending heartbeat records that doesn't require
> > >> > connectors
> > >> > > >>>> to
> > >> > > >>>> > >> > implement
> > >> > > >>>> > >> > > any new APIs...
> > >> > > >>>> > >> >
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > The main purpose of producing heartbeat records is to
> be
> > >> able
> > >> > > to
> > >> > > >>>> emit
> > >> > > >>>> > >> > offsets w/o any new records. We are using heartbeat
> > >> records
> > >> > to
> > >> > > >>>> solve
> > >> > > >>>> > the
> > >> > > >>>> > >> > primary concern of offsets getting stalled. The reason
> > to
> > >> do
> > >> > > >>>> that was
> > >> > > >>>> > >> once
> > >> > > >>>> > >> > we get SourceRecords, then the rest of the code is
> > >> already in
> > >> > > >>>> place to
> > >> > > >>>> > >> > write it to a topic of interest and commit offsets and
> > >> that
> > >> > > >>>> seemed the
> > >> > > >>>> > >> most
> > >> > > >>>> > >> > non invasive in terms of framework level changes. If
> in
> > >> the
> > >> > > >>>> future we
> > >> > > >>>> > >> want
> > >> > > >>>> > >> > to do a framework-only heartbeat record support, then
> > this
> > >> > > would
> > >> > > >>>> > create
> > >> > > >>>> > >> > confusion as you pointed out. Do you think the choice
> of
> > >> the
> > >> > > name
> > >> > > >>>> > >> heartbeat
> > >> > > >>>> > >> > records is creating confusion in this case? Maybe we
> can
> > >> call
> > >> > > >>>> these
> > >> > > >>>> > >> special
> > >> > > >>>> > >> > records something else (not sure what at this point)
> > which
> > >> > > would
> > >> > > >>>> then
> > >> > > >>>> > >> > decouple the 2 logically and implementation wise as
> > well?
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > Thanks!
> > >> > > >>>> > >> > Sagar.
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton
> > >> > > >>>> <chr...@aiven.io.invalid
> > >> > > >>>> > >
> > >> > > >>>> > >> > wrote:
> > >> > > >>>> > >> >
> > >> > > >>>> > >> > > Hi Sagar,
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > Thanks for the KIP! I have some thoughts.
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > Nits:
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > 1. Shouldn't KAFKA-3821 [1] be linked as the Jira
> > >> ticket on
> > >> > > >>>> the KIP?
> > >> > > >>>> > >> Or
> > >> > > >>>> > >> > is
> > >> > > >>>> > >> > > there a different ticket that should be associated
> > with
> > >> it?
> > >> > > >>>> > >> > > 2. The current state is listed as "Draft".
> Considering
> > >> it's
> > >> > > >>>> been
> > >> > > >>>> > >> brought
> > >> > > >>>> > >> > up
> > >> > > >>>> > >> > > for discussion, maybe the KIP should be updated to
> > >> > > >>>> "Discussion"?
> > >> > > >>>> > >> > > 3. Can you add a link for the discussion thread to
> the
> > >> KIP?
> > >> > > >>>> > >> > > 4. The KIP states that "In this process, offsets are
> > >> > written
> > >> > > at
> > >> > > >>>> > >> regular
> > >> > > >>>> > >> > > intervals(driven by `offset.flush.interval.ms`)".
> > This
> > >> > isn't
> > >> > > >>>> > strictly
> > >> > > >>>> > >> > > accurate since, when exactly-once support is
> enabled,
> > >> > offset
> > >> > > >>>> commits
> > >> > > >>>> > >> can
> > >> > > >>>> > >> > > also be performed for each record batch (which is
> the
> > >> > > default)
> > >> > > >>>> or
> > >> > > >>>> > when
> > >> > > >>>> > >> > > explicitly requested by the task instance (if the
> > >> connector
> > >> > > >>>> > implements
> > >> > > >>>> > >> > the
> > >> > > >>>> > >> > > API to define its own transactions and the user has
> > >> > > configured
> > >> > > >>>> it to
> > >> > > >>>> > >> do
> > >> > > >>>> > >> > > so). Maybe better to just say "Offsets are written
> > >> > > >>>> periodically"?
> > >> > > >>>> > >> > > 5. The description for the (per-connector)
> > >> > > >>>> "heartbeat.records.topic
> > >> > > >>>> > "
> > >> > > >>>> > >> > > property states that it is "Only applicable in
> > >> distributed
> > >> > > >>>> mode; in
> > >> > > >>>> > >> > > standalone mode, setting this property will have no
> > >> > effect".
> > >> > > >>>> Is this
> > >> > > >>>> > >> > > correct?
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > Non-nits:
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > 6. It seems (based on both the KIP and discussion on
> > >> > > >>>> KAFKA-3821)
> > >> > > >>>> > that
> > >> > > >>>> > >> the
> > >> > > >>>> > >> > > only use case for being able to emit offsets without
> > >> also
> > >> > > >>>> emitting
> > >> > > >>>> > >> source
> > >> > > >>>> > >> > > records that's been identified so far is for CDC
> > source
> > >> > > >>>> connectors
> > >> > > >>>> > >> like
> > >> > > >>>> > >> > > Debezium. But Debezium already has support for this
> > >> exact
> > >> > > >>>> feature
> > >> > > >>>> > >> > (emitting
> > >> > > >>>> > >> > > heartbeat records that include offsets that cannot
> be
> > >> > > >>>> associated
> > >> > > >>>> > with
> > >> > > >>>> > >> > > other, "regular" source records). Why should we add
> > this
> > >> > > >>>> feature to
> > >> > > >>>> > >> Kafka
> > >> > > >>>> > >> > > Connect when the problem it addresses is already
> > solved
> > >> in
> > >> > > the
> > >> > > >>>> set
> > >> > > >>>> > >> > > connectors that (it seems) would have any need for
> it,
> > >> and
> > >> > > the
> > >> > > >>>> size
> > >> > > >>>> > of
> > >> > > >>>> > >> > that
> > >> > > >>>> > >> > > set is extremely small? If there are other practical
> > use
> > >> > > cases
> > >> > > >>>> for
> > >> > > >>>> > >> > > connectors that would benefit from this feature,
> > please
> > >> let
> > >> > > me
> > >> > > >>>> know.
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > 7. If a task produces heartbeat records and source
> > >> records
> > >> > > >>>> that use
> > >> > > >>>> > >> the
> > >> > > >>>> > >> > > same source partition, which offset will ultimately
> be
> > >> > > >>>> committed?
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > 8. The SourceTask::produceHeartbeatRecords method
> > >> returns a
> > >> > > >>>> > >> > > List<SourceRecord>, and users can control the
> > heartbeat
> > >> > topic
> > >> > > >>>> for a
> > >> > > >>>> > >> > > connector via the (connector- or worker-level)
> > >> > > >>>> > >> "heartbeat.records.topic"
> > >> > > >>>> > >> > > property. Since every constructor for the
> SourceRecord
> > >> > class
> > >> > > >>>> [2]
> > >> > > >>>> > >> > requires a
> > >> > > >>>> > >> > > topic to be supplied, what will happen to that
> topic?
> > >> Will
> > >> > it
> > >> > > >>>> be
> > >> > > >>>> > >> ignored?
> > >> > > >>>> > >> > > If so, I think we should look for a cleaner
> solution.
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > 9. A large concern raised in the discussion for
> > >> KAFKA-3821
> > >> > > was
> > >> > > >>>> the
> > >> > > >>>> > >> > allowing
> > >> > > >>>> > >> > > connectors to control the ordering of these special
> > >> > > >>>> "offsets-only"
> > >> > > >>>> > >> > > emissions and the regular source records returned
> from
> > >> > > >>>> > >> SourceTask::poll.
> > >> > > >>>> > >> > > Are we choosing to ignore that concern? If so, can
> you
> > >> add
> > >> > > >>>> this to
> > >> > > >>>> > the
> > >> > > >>>> > >> > > rejected alternatives section along with a
> rationale?
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > 10. If, sometime in the future, we wanted to add
> > >> > > >>>> framework-level
> > >> > > >>>> > >> support
> > >> > > >>>> > >> > > for sending heartbeat records that doesn't require
> > >> > connectors
> > >> > > >>>> to
> > >> > > >>>> > >> > implement
> > >> > > >>>> > >> > > any new APIs (e.g.,
> > >> SourceTask::produceHeartbeatRecords), a
> > >> > > >>>> lot of
> > >> > > >>>> > >> this
> > >> > > >>>> > >> > > would paint us into a corner design-wise. We'd have
> to
> > >> > think
> > >> > > >>>> > carefully
> > >> > > >>>> > >> > > about which property names would be used, how to
> > account
> > >> > for
> > >> > > >>>> > >> connectors
> > >> > > >>>> > >> > > that have already implemented the
> > >> > > >>>> > SourceTask::produceHeartbeatRecords
> > >> > > >>>> > >> > > method, etc. In general, it seems like we're trying
> to
> > >> > solve
> > >> > > >>>> two
> > >> > > >>>> > >> > completely
> > >> > > >>>> > >> > > different problems with this single KIP: adding
> > >> > > framework-level
> > >> > > >>>> > >> support
> > >> > > >>>> > >> > for
> > >> > > >>>> > >> > > emitting heartbeat records for source connectors,
> and
> > >> > > allowing
> > >> > > >>>> > source
> > >> > > >>>> > >> > > connectors to emit offsets without also emitting
> > source
> > >> > > >>>> records. I
> > >> > > >>>> > >> don't
> > >> > > >>>> > >> > > mind addressing the two at the same time if the
> result
> > >> is
> > >> > > >>>> elegant
> > >> > > >>>> > and
> > >> > > >>>> > >> > > doesn't compromise on the solution for either
> problem,
> > >> but
> > >> > > that
> > >> > > >>>> > >> doesn't
> > >> > > >>>> > >> > > seem to be the case here. Of the two problems, could
> > we
> > >> > > >>>> describe one
> > >> > > >>>> > >> as
> > >> > > >>>> > >> > the
> > >> > > >>>> > >> > > primary and one as the secondary? If so, we might
> > >> consider
> > >> > > >>>> dropping
> > >> > > >>>> > >> the
> > >> > > >>>> > >> > > secondary problm from this KIP and addressing it
> > >> > separately.
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > [1] -
> > https://issues.apache.org/jira/browse/KAFKA-3821
> > >> > > >>>> > >> > > [2] -
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> >
> > >> > > >>>> > >>
> > >> > > >>>> >
> > >> > > >>>>
> > >> > >
> > >> >
> > >>
> >
> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > Cheers,
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > Chris
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > On Sat, Mar 25, 2023 at 11:18 PM Sagar <
> > >> > > >>>> sagarmeansoc...@gmail.com>
> > >> > > >>>> > >> > wrote:
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> > > > Hi John,
> > >> > > >>>> > >> > > >
> > >> > > >>>> > >> > > > Thanks for taking. look at the KIP!
> > >> > > >>>> > >> > > >
> > >> > > >>>> > >> > > > The point about stream time not advancing in case
> of
> > >> > > >>>> infrequent
> > >> > > >>>> > >> updates
> > >> > > >>>> > >> > > is
> > >> > > >>>> > >> > > > an interesting one. I can imagine if the upstream
> > >> > producer
> > >> > > >>>> to a
> > >> > > >>>> > >> Kafka
> > >> > > >>>> > >> > > > Streams application is a Source Connector which
> > isn't
> > >> > > sending
> > >> > > >>>> > >> records
> > >> > > >>>> > >> > > > frequently(due to the nature of the data ingestion
> > for
> > >> > > >>>> example),
> > >> > > >>>> > >> then
> > >> > > >>>> > >> > the
> > >> > > >>>> > >> > > > downstream stream processing can land into the
> > issues
> > >> you
> > >> > > >>>> > described
> > >> > > >>>> > >> > > above.
> > >> > > >>>> > >> > > >
> > >> > > >>>> > >> > > > Which also brings me to the second point you made
> > >> about
> > >> > how
> > >> > > >>>> this
> > >> > > >>>> > >> would
> > >> > > >>>> > >> > be
> > >> > > >>>> > >> > > > used by downstream consumers. IIUC, you are
> > referring
> > >> to
> > >> > > the
> > >> > > >>>> > >> consumers
> > >> > > >>>> > >> > of
> > >> > > >>>> > >> > > > the newly added topic i.e the heartbeat topic. In
> my
> > >> > mind,
> > >> > > >>>> the
> > >> > > >>>> > >> > heartbeat
> > >> > > >>>> > >> > > > topic is an internal topic (similar to
> > >> > > offsets/config/status
> > >> > > >>>> topic
> > >> > > >>>> > >> in
> > >> > > >>>> > >> > > > connect), the main purpose of which is to trick
> the
> > >> > > >>>> framework to
> > >> > > >>>> > >> > produce
> > >> > > >>>> > >> > > > records to the offsets topic and advance the
> > offsets.
> > >> > Since
> > >> > > >>>> every
> > >> > > >>>> > >> > > connector
> > >> > > >>>> > >> > > > could have a different definition of offsets(LSN,
> > >> > BinLogID
> > >> > > >>>> etc for
> > >> > > >>>> > >> > > > example), that logic to determine what the
> heartbeat
> > >> > > records
> > >> > > >>>> > should
> > >> > > >>>> > >> be
> > >> > > >>>> > >> > > > would have to reside in the actual connector.
> > >> > > >>>> > >> > > >
> > >> > > >>>> > >> > > > Now that I think of it, it could very well be
> > >> consumed by
> > >> > > >>>> > downstream
> > >> > > >>>> > >> > > > consumers/ Streams or Flink Applications and be
> > >> further
> > >> > > used
> > >> > > >>>> for
> > >> > > >>>> > >> some
> > >> > > >>>> > >> > > > decision making. A very crude example could be
> let's
> > >> say
> > >> > if
> > >> > > >>>> the
> > >> > > >>>> > >> > heartbeat
> > >> > > >>>> > >> > > > records sent to the new heartbeat topic include
> > >> > timestamps,
> > >> > > >>>> then
> > >> > > >>>> > the
> > >> > > >>>> > >> > > > downstream streams application can use that
> > timestamp
> > >> to
> > >> > > >>>> close any
> > >> > > >>>> > >> time
> > >> > > >>>> > >> > > > windows. Having said that, it still appears to me
> > that
> > >> > it's
> > >> > > >>>> > outside
> > >> > > >>>> > >> the
> > >> > > >>>> > >> > > > scope of the Connect framework and is something
> > which
> > >> is
> > >> > > >>>> difficult
> > >> > > >>>> > >> to
> > >> > > >>>> > >> > > > generalise because of the variety of Sources and
> the
> > >> > > >>>> definitions
> > >> > > >>>> > of
> > >> > > >>>> > >> > > > offsets.
> > >> > > >>>> > >> > > >
> > >> > > >>>> > >> > > > But, I would still be more than happy to add this
> > >> example
> > >> > > if
> > >> > > >>>> you
> > >> > > >>>> > >> think
> > >> > > >>>> > >> > it
> > >> > > >>>> > >> > > > can be useful in getting a better understanding of
> > the
> > >> > idea
> > >> > > >>>> and
> > >> > > >>>> > also
> > >> > > >>>> > >> > its
> > >> > > >>>> > >> > > > utility beyond connect. Please let me know!
> > >> > > >>>> > >> > > >
> > >> > > >>>> > >> > > > Thanks!
> > >> > > >>>> > >> > > > Sagar.
> > >> > > >>>> > >> > > >
> > >> > > >>>> > >> > > >
> > >> > > >>>> > >> > > > On Fri, Mar 24, 2023 at 7:22 PM John Roesler <
> > >> > > >>>> vvcep...@apache.org
> > >> > > >>>> > >
> > >> > > >>>> > >> > > wrote:
> > >> > > >>>> > >> > > >
> > >> > > >>>> > >> > > > > Thanks for the KIP, Sagar!
> > >> > > >>>> > >> > > > >
> > >> > > >>>> > >> > > > > At first glance, this seems like a very useful
> > >> feature.
> > >> > > >>>> > >> > > > >
> > >> > > >>>> > >> > > > > A common pain point in Streams is when upstream
> > >> > producers
> > >> > > >>>> don't
> > >> > > >>>> > >> send
> > >> > > >>>> > >> > > > > regular updates and stream time cannot advance.
> > This
> > >> > > causes
> > >> > > >>>> > >> > > > > stream-time-driven operations to appear to hang,
> > >> like
> > >> > > time
> > >> > > >>>> > windows
> > >> > > >>>> > >> > not
> > >> > > >>>> > >> > > > > closing, suppressions not firing, etc.
> > >> > > >>>> > >> > > > >
> > >> > > >>>> > >> > > > > From your KIP, I have a good idea of how the
> > feature
> > >> > > would
> > >> > > >>>> be
> > >> > > >>>> > >> > > integrated
> > >> > > >>>> > >> > > > > into connect, and it sounds good to me. I don't
> > >> quite
> > >> > see
> > >> > > >>>> how
> > >> > > >>>> > >> > > downstream
> > >> > > >>>> > >> > > > > clients, such as a downstream Streams or Flink
> > >> > > >>>> application, or
> > >> > > >>>> > >> users
> > >> > > >>>> > >> > of
> > >> > > >>>> > >> > > > the
> > >> > > >>>> > >> > > > > Consumer would make use of this feature. Could
> you
> > >> add
> > >> > > some
> > >> > > >>>> > >> examples
> > >> > > >>>> > >> > of
> > >> > > >>>> > >> > > > > that nature?
> > >> > > >>>> > >> > > > >
> > >> > > >>>> > >> > > > > Thank you,
> > >> > > >>>> > >> > > > > -John
> > >> > > >>>> > >> > > > >
> > >> > > >>>> > >> > > > > On Fri, Mar 24, 2023, at 05:23, Sagar wrote:
> > >> > > >>>> > >> > > > > > Hi All,
> > >> > > >>>> > >> > > > > >
> > >> > > >>>> > >> > > > > > Bumping the thread again.
> > >> > > >>>> > >> > > > > >
> > >> > > >>>> > >> > > > > > Sagar.
> > >> > > >>>> > >> > > > > >
> > >> > > >>>> > >> > > > > >
> > >> > > >>>> > >> > > > > > On Fri, Mar 10, 2023 at 4:42 PM Sagar <
> > >> > > >>>> > >> sagarmeansoc...@gmail.com>
> > >> > > >>>> > >> > > > wrote:
> > >> > > >>>> > >> > > > > >
> > >> > > >>>> > >> > > > > >> Hi All,
> > >> > > >>>> > >> > > > > >>
> > >> > > >>>> > >> > > > > >> Bumping this discussion thread again.
> > >> > > >>>> > >> > > > > >>
> > >> > > >>>> > >> > > > > >> Thanks!
> > >> > > >>>> > >> > > > > >> Sagar.
> > >> > > >>>> > >> > > > > >>
> > >> > > >>>> > >> > > > > >> On Thu, Mar 2, 2023 at 3:44 PM Sagar <
> > >> > > >>>> > >> sagarmeansoc...@gmail.com>
> > >> > > >>>> > >> > > > wrote:
> > >> > > >>>> > >> > > > > >>
> > >> > > >>>> > >> > > > > >>> Hi All,
> > >> > > >>>> > >> > > > > >>>
> > >> > > >>>> > >> > > > > >>> I wanted to create a discussion thread for
> > >> KIP-910:
> > >> > > >>>> > >> > > > > >>>
> > >> > > >>>> > >> > > > > >>>
> > >> > > >>>> > >> > > > > >>>
> > >> > > >>>> > >> > > > >
> > >> > > >>>> > >> > > >
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> >
> > >> > > >>>> > >>
> > >> > > >>>> >
> > >> > > >>>>
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> > >> > > >>>> > >> > > > > >>>
> > >> > > >>>> > >> > > > > >>> Thanks!
> > >> > > >>>> > >> > > > > >>> Sagar.
> > >> > > >>>> > >> > > > > >>>
> > >> > > >>>> > >> > > > > >>
> > >> > > >>>> > >> > > > >
> > >> > > >>>> > >> > > >
> > >> > > >>>> > >> > >
> > >> > > >>>> > >> >
> > >> > > >>>> > >>
> > >> > > >>>> > >
> > >> > > >>>> >
> > >> > > >>>>
> > >> > > >>>
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to