Hey All, Bumping this discussion thread again to see how the modified KIP looks like.
Thanks! Sagar. On Mon, May 29, 2023 at 8:12 PM Sagar <sagarmeansoc...@gmail.com> wrote: > Hi, > > Bumping this thread again for further reviews. > > Thanks! > Sagar. > > On Fri, May 12, 2023 at 3:38 PM Sagar <sagarmeansoc...@gmail.com> wrote: > >> Hi All, >> >> Thanks for the comments/reviews. I have updated the KIP >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records >> with a newer approach which shelves the need for an explicit topic. >> >> Please review again and let me know what you think. >> >> Thanks! >> Sagar. >> >> >> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya <yash.ma...@gmail.com> wrote: >> >>> Hi Sagar, >>> >>> Thanks for the KIP! I have a few questions and comments: >>> >>> 1) I agree with Chris' point about the separation of a connector >>> heartbeat >>> mechanism and allowing source connectors to generate offsets without >>> producing data. What is the purpose of the heartbeat topic here and are >>> there any concrete use cases for downstream consumers on this topic? Why >>> can't we instead simply introduce a mechanism to retrieve a list of >>> source >>> partition / source offset pairs from the source tasks? >>> >>> 2) With the currently described mechanism, the new >>> "SourceTask::produceHeartbeatRecords" method returns a >>> "List<SourceRecord>" >>> - what happens with the topic in each of these source records? Chris >>> pointed this out above, but it doesn't seem to have been addressed? The >>> "SourceRecord" class also has a bunch of other fields which will be >>> irrelevant here (partition, key / value schema, key / value data, >>> timestamp, headers). In fact, it seems like only the source partition and >>> source offset are relevant here, so we should either introduce a new >>> abstraction or simply use a data structure like a mapping from source >>> partitions to source offsets (adds to the above point)? >>> >>> 3) I'm not sure I fully follow why the heartbeat timer / interval is >>> needed? What are the downsides of >>> calling "SourceTask::produceHeartbeatRecords" in every execution loop >>> (similar to the existing "SourceTask::poll" method)? Is this only to >>> prevent the generation of a lot of offset records? Since Connect's >>> offsets >>> topics are log compacted (and source partitions are used as keys for each >>> source offset), I'm not sure if such concerns are valid and such a >>> heartbeat timer / interval mechanism is required? >>> >>> 4) The first couple of rejected alternatives state that the use of a null >>> topic / key / value are preferably avoided - but the current proposal >>> would >>> also likely require connectors to use such workarounds (null topic when >>> the >>> heartbeat topic is configured at a worker level and always for the key / >>> value)? >>> >>> 5) The third rejected alternative talks about subclassing the >>> "SourceRecord" class - this presumably means allowing connectors to pass >>> special offset only records via the existing poll mechanism? Why was this >>> considered a more invasive option? Was it because of the backward >>> compatibility issues that would be introduced for plugins using the new >>> public API class that still need to be deployed onto older Connect >>> workers? >>> >>> Thanks, >>> Yash >>> >>> On Fri, Apr 14, 2023 at 6:45 PM Sagar <sagarmeansoc...@gmail.com> wrote: >>> >>> > One thing I forgot to mention in my previous email was that the reason >>> I >>> > chose to include the opt-in behaviour via configs was that the users >>> of the >>> > connector know their workload patterns. If the workload is such that >>> the >>> > connector would receive regular valid updates then there’s ideally no >>> need >>> > for moving offsets since it would update automatically. >>> > >>> > This way they aren’t forced to use this feature and can use it only >>> when >>> > the workload is expected to be batchy or not frequent. >>> > >>> > Thanks! >>> > Sagar. >>> > >>> > >>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar <sagarmeansoc...@gmail.com> >>> wrote: >>> > >>> > > Hi Chris, >>> > > >>> > > Thanks for following up on the response. Sharing my thoughts further: >>> > > >>> > > If we want to add support for connectors to emit offsets without >>> > >> accompanying source records, we could (and IMO should) do that >>> without >>> > >> requiring users to manually enable that feature by adjusting worker >>> or >>> > >> connector configurations. >>> > > >>> > > >>> > > With the current KIP design, I have tried to implement this in an >>> opt-in >>> > > manner via configs. I guess what you are trying to say is that this >>> > doesn't >>> > > need a config of it's own and instead could be part of the poll -> >>> > > transform etc -> produce -> commit cycle. That way, the users don't >>> need >>> > to >>> > > set any config and if the connector supports moving offsets w/o >>> producing >>> > > SourceRecords, it should happen automatically. Is that correct? If >>> that >>> > > is the concern, then I can think of not exposing a config and try to >>> make >>> > > this process automatically. That should ease the load on connector >>> users, >>> > > but your point about cognitive load on Connector developers, I am >>> still >>> > not >>> > > sure how to address that. The offsets are privy to a connector and >>> the >>> > > framework at best can provide hooks to the tasks to update their >>> offsets. >>> > > Connector developers would still have to consider all cases before >>> > updating >>> > > offsets. And if I ignore the heartbeat topic and heartbeat interval >>> ms >>> > > configs, then what the KIP proposes currently isn't much different in >>> > that >>> > > regard. Just that it produces a List of SourceRecord which can be >>> changed >>> > > to a Map of SourcePartition and their offsets if you think that would >>> > > simplify things. Are there other cases in your mind which need >>> > addressing? >>> > > >>> > > Here's my take on the usecases: >>> > > >>> > > 1. Regarding the example about SMTs with Object Storage based >>> > > connectors, it was one of the scenarios identified. We have some >>> > connectors >>> > > that rely on the offsets topic to check if the next batch of files >>> > should >>> > > be processed and because of filtering of the last record from the >>> > files, >>> > > the eof supposedly is never reached and the connector can't >>> commit >>> > offsets >>> > > for that source partition(file). If there was a mechanism to >>> update >>> > offsets >>> > > for such a source file, then with some moderately complex state >>> > tracking, >>> > > the connector can mark that file as processed and proceed. >>> > > 2. There's another use case with the same class of connectors >>> where if >>> > > a file is malformed, then the connector couldn't produce any >>> offsets >>> > > because the file couldn't get processed completely. To handle such >>> > cases, >>> > > the connector developers have introduced a dev/null sort of topic >>> > where >>> > > they produce a record to this corrupted file topic and move the >>> offset >>> > > somehow. This topic ideally isn't needed and with a mechanism to >>> > update >>> > > offsets would have helped in this case as well. >>> > > 3. Coming to CDC based connectors, >>> > > 1. We had a similar issue with Oracle CDC source connector and >>> > > needed to employ the same heartbeat mechanism to get around it. >>> > > 2. MongoDB CDC source Connector has employed the same >>> heartbeat >>> > > mechanism Check `heartbeat.interval.ms` here ( >>> > > >>> > >>> https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/ >>> > > ). >>> > > 3. Another CDC connector for ScyllaDB employs a similar >>> mechanism. >>> > > >>> > >>> https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat >>> > > 4. For CDC based connectors, you could argue that these >>> connectors >>> > > have been able to solve this error then why do we need >>> framework >>> > level >>> > > support. But the point I am trying to make is that this >>> limitation >>> > from the >>> > > framework is forcing CDC connector developers to implement >>> > per-connector >>> > > solutions/hacks(at times). And there could always be more CDC >>> > connectors in >>> > > the pipeline forcing them to take a similar route as well. >>> > > 4. There's also a case at times with CDC source connectors which >>> are >>> > > REST Api / Web Service based(Zendesk Source Connector for >>> example) . >>> > These >>> > > connectors typically use timestamps from the responses as >>> offsets. If >>> > > there's a long period of inactivity wherein the API invocations >>> don't >>> > > return any data, then the offsets won't move and the connector >>> would >>> > keep >>> > > using the same timestamp that it received from the last non-empty >>> > response. >>> > > If this period of inactivity keeps growing, and the API imposes >>> any >>> > limits >>> > > on how far back we can go in terms of window start, then this >>> could >>> > > potentially be a problem. In this case even though the connector >>> was >>> > caught >>> > > up with all the responses, it may need to snapshot again. In this >>> case >>> > > updating offsets can easily help since all the connector needs to >>> do >>> > is to >>> > > move the timestamp which would move the offset inherently. >>> > > >>> > > I still believe that this is something the framework should support >>> OOB >>> > > irrespective of whether the connectors have been able to get around >>> this >>> > > restriction or not. >>> > > >>> > > Lastly, about your comments here: >>> > > >>> > > I'm also not sure that it's worth preserving the current behavior >>> that >>> > >> offsets for records that have been filtered out via SMT are not >>> > committed. >>> > > >>> > > >>> > > Let me know if we need a separate JIRA to track this? This somehow >>> didn't >>> > > look related to this discussion. >>> > > >>> > > Thanks! >>> > > Sagar. >>> > > >>> > > >>> > > On Wed, Apr 12, 2023 at 9:34 PM Chris Egerton >>> <chr...@aiven.io.invalid> >>> > > wrote: >>> > > >>> > >> Hi Sagar, >>> > >> >>> > >> I'm sorry, I'm still not convinced that this design solves the >>> > problem(s) >>> > >> it sets out to solve in the best way possible. I tried to highlight >>> this >>> > >> in >>> > >> my last email: >>> > >> >>> > >> > In general, it seems like we're trying to solve two completely >>> > different >>> > >> problems with this single KIP: adding framework-level support for >>> > emitting >>> > >> heartbeat records for source connectors, and allowing source >>> connectors >>> > to >>> > >> emit offsets without also emitting source records. I don't mind >>> > addressing >>> > >> the two at the same time if the result is elegant and doesn't >>> compromise >>> > >> on >>> > >> the solution for either problem, but that doesn't seem to be the >>> case >>> > >> here. >>> > >> Of the two problems, could we describe one as the primary and one >>> as the >>> > >> secondary? If so, we might consider dropping the secondary problem >>> from >>> > >> this KIP and addressing it separately. >>> > >> >>> > >> If we wanted to add support for heartbeat records, we could (and IMO >>> > >> should) do that without requiring connectors to implement any new >>> > methods >>> > >> and only require adjustments to worker or connector configurations >>> by >>> > >> users >>> > >> in order to enable that feature. >>> > >> >>> > >> If we want to add support for connectors to emit offsets without >>> > >> accompanying source records, we could (and IMO should) do that >>> without >>> > >> requiring users to manually enable that feature by adjusting worker >>> or >>> > >> connector configurations. >>> > >> >>> > >> >>> > >> I'm also not sure that it's worth preserving the current behavior >>> that >>> > >> offsets for records that have been filtered out via SMT are not >>> > committed. >>> > >> I can't think of a case where this would be useful and there are >>> > obviously >>> > >> plenty where it isn't. There's also a slight discrepancy in how >>> these >>> > >> kinds >>> > >> of records are treated by the Connect runtime now; if a record is >>> > dropped >>> > >> because of an SMT, then its offset isn't committed, but if it's >>> dropped >>> > >> because exactly-once support is enabled and the connector chose to >>> abort >>> > >> the batch containing the record, then its offset is still committed. >>> > After >>> > >> thinking carefully about the aborted transaction behavior, we >>> realized >>> > >> that >>> > >> it was fine to commit the offsets for those records, and I believe >>> that >>> > >> the >>> > >> same logic can be applied to any record that we're done trying to >>> send >>> > to >>> > >> Kafka (regardless of whether it was sent correctly, dropped due to >>> > >> producer >>> > >> error, filtered via SMT, etc.). >>> > >> >>> > >> I also find the file-based source connector example a little >>> confusing. >>> > >> What about that kind of connector causes the offset for the last >>> record >>> > of >>> > >> a file to be treated differently? Is there anything different about >>> > >> filtering that record via SMT vs. dropping it altogether because of >>> an >>> > >> asynchronous producer error with "errors.tolerance" set to "all"? >>> And >>> > >> finally, how would such a connector use the design proposed here? >>> > >> >>> > >> Finally, I don't disagree that if there are other legitimate use >>> cases >>> > >> that >>> > >> would be helped by addressing KAFKA-3821, we should try to solve >>> that >>> > >> issue >>> > >> in the Kafka Connect framework instead of requiring individual >>> > connectors >>> > >> to implement their own solutions. But the cognitive load added by >>> the >>> > >> design proposed here, for connector developers and Connect cluster >>> > >> administrators alike, costs too much to justify by pointing to an >>> > >> already-solved problem encountered by a single group of connectors >>> > (i.e., >>> > >> Debezium). This is why I think it's crucial that we identify >>> realistic >>> > >> cases where this feature would actually be useful, and right now, I >>> > don't >>> > >> think any have been provided (at least, not ones that have already >>> been >>> > >> addressed or could be addressed with much simpler changes). >>> > >> >>> > >> Cheers, >>> > >> >>> > >> Chris >>> > >> >>> > >> On Tue, Apr 11, 2023 at 7:30 AM Sagar <sagarmeansoc...@gmail.com> >>> > wrote: >>> > >> >>> > >> > Hi Chris, >>> > >> > >>> > >> > Thanks for your detailed feedback! >>> > >> > >>> > >> > nits: I have taken care of them now. Thanks for pointing those >>> out. >>> > >> > >>> > >> > non-nits: >>> > >> > >>> > >> > 6) It seems (based on both the KIP and discussion on KAFKA-3821) >>> that >>> > >> the >>> > >> > > only use case for being able to emit offsets without also >>> emitting >>> > >> source >>> > >> > > records that's been identified so far is for CDC source >>> connectors >>> > >> like >>> > >> > > Debezium. >>> > >> > >>> > >> > >>> > >> > I am aware of atleast one more case where the non production of >>> > offsets >>> > >> > (due to non production of records ) leads to the failure of >>> connectors >>> > >> when >>> > >> > the source purges the records of interest. This happens in File >>> based >>> > >> > source connectors (like s3/blob storage ) in which if the last >>> record >>> > >> from >>> > >> > a file is fiterterd due to an SMT, then that particular file is >>> never >>> > >> > committed to the source partition and eventually when the file is >>> > >> deleted >>> > >> > from the source and the connector is restarted due to some >>> reason, it >>> > >> > fails. >>> > >> > Moreover, I feel the reason this support should be there in the >>> Kafka >>> > >> > Connect framework is because this is a restriction of the >>> framework >>> > and >>> > >> > today the framework provides no support for getting around this >>> > >> limitation. >>> > >> > Every connector has it's own way of handling offsets and having >>> each >>> > >> > connector handle this restriction in its own way can make it >>> complex. >>> > >> > Whether we choose to do it the way this KIP prescribes or any >>> other >>> > way >>> > >> is >>> > >> > up for debate but IMHO, the framework should provide a way of >>> > >> > getting around this limitation. >>> > >> > >>> > >> > 7. If a task produces heartbeat records and source records that >>> use >>> > the >>> > >> > > same source partition, which offset will ultimately be >>> committed? >>> > >> > >>> > >> > >>> > >> > The idea is to add the records returned by the >>> > `produceHeartbeatRecords` >>> > >> > to the same `toSend` list within >>> `AbstractWorkerSourceTask#execute`. >>> > >> The >>> > >> > `produceHeartbeatRecords` would be invoked before we make the >>> `poll` >>> > >> call. >>> > >> > Hence, the offsets committed would be in the same order in which >>> they >>> > >> would >>> > >> > be written. Note that, the onus is on the Connector >>> implementation to >>> > >> not >>> > >> > return records which can lead to data loss or data going out of >>> order. >>> > >> The >>> > >> > framework would just commit based on whatever is supplied. Also, >>> > AFAIK, >>> > >> 2 >>> > >> > `normal` source records can also produce the same source >>> partitions >>> > and >>> > >> > they are committed in the order in which they are written. >>> > >> > >>> > >> > 8. The SourceTask::produceHeartbeatRecords method returns a >>> > >> > > List<SourceRecord>, and users can control the heartbeat topic >>> for a >>> > >> > > connector via the (connector- or worker-level) >>> > >> "heartbeat.records.topic" >>> > >> > > property. Since every constructor for the SourceRecord class [2] >>> > >> > requires a >>> > >> > > topic to be supplied, what will happen to that topic? Will it be >>> > >> ignored? >>> > >> > > If so, I think we should look for a cleaner solution. >>> > >> > >>> > >> > >>> > >> > Sorry, I couldn't quite follow which topic will be ignored in this >>> > case. >>> > >> > >>> > >> > 9. A large concern raised in the discussion for KAFKA-3821 was the >>> > >> allowing >>> > >> > > connectors to control the ordering of these special >>> "offsets-only" >>> > >> > > emissions and the regular source records returned from >>> > >> SourceTask::poll. >>> > >> > > Are we choosing to ignore that concern? If so, can you add this >>> to >>> > the >>> > >> > > rejected alternatives section along with a rationale? >>> > >> > >>> > >> > >>> > >> > One thing to note is that the for every connector, the condition >>> to >>> > emit >>> > >> > the heartbeat record is totally up to the connector, For example, >>> for >>> > a >>> > >> > connector which is tracking transactions for an ordered log, if >>> there >>> > >> are >>> > >> > open transactions, it might not need to emit heartbeat records >>> when >>> > the >>> > >> > timer expires while for file based connectors, if the same file is >>> > being >>> > >> > processed again and again due to an SMT or some other reasons, >>> then it >>> > >> can >>> > >> > choose to emit that partition. The uber point here is that every >>> > >> connector >>> > >> > has it's own requirements and the framework can't really make an >>> > >> assumption >>> > >> > about it. What the KIP is trying to do is to provide a mechanism >>> to >>> > the >>> > >> > connector to commit new offsets. With this approach, as far as I >>> can >>> > >> think >>> > >> > so far, there doesn't seem to be a case of out of order >>> processing. If >>> > >> you >>> > >> > have other concerns/thoughts I would be happy to know them. >>> > >> > >>> > >> > 10. If, sometime in the future, we wanted to add framework-level >>> > support >>> > >> > > for sending heartbeat records that doesn't require connectors to >>> > >> > implement >>> > >> > > any new APIs... >>> > >> > >>> > >> > >>> > >> > The main purpose of producing heartbeat records is to be able to >>> emit >>> > >> > offsets w/o any new records. We are using heartbeat records to >>> solve >>> > the >>> > >> > primary concern of offsets getting stalled. The reason to do that >>> was >>> > >> once >>> > >> > we get SourceRecords, then the rest of the code is already in >>> place to >>> > >> > write it to a topic of interest and commit offsets and that >>> seemed the >>> > >> most >>> > >> > non invasive in terms of framework level changes. If in the >>> future we >>> > >> want >>> > >> > to do a framework-only heartbeat record support, then this would >>> > create >>> > >> > confusion as you pointed out. Do you think the choice of the name >>> > >> heartbeat >>> > >> > records is creating confusion in this case? Maybe we can call >>> these >>> > >> special >>> > >> > records something else (not sure what at this point) which would >>> then >>> > >> > decouple the 2 logically and implementation wise as well? >>> > >> > >>> > >> > Thanks! >>> > >> > Sagar. >>> > >> > >>> > >> > On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton >>> <chr...@aiven.io.invalid >>> > > >>> > >> > wrote: >>> > >> > >>> > >> > > Hi Sagar, >>> > >> > > >>> > >> > > Thanks for the KIP! I have some thoughts. >>> > >> > > >>> > >> > > Nits: >>> > >> > > >>> > >> > > 1. Shouldn't KAFKA-3821 [1] be linked as the Jira ticket on the >>> KIP? >>> > >> Or >>> > >> > is >>> > >> > > there a different ticket that should be associated with it? >>> > >> > > 2. The current state is listed as "Draft". Considering it's been >>> > >> brought >>> > >> > up >>> > >> > > for discussion, maybe the KIP should be updated to "Discussion"? >>> > >> > > 3. Can you add a link for the discussion thread to the KIP? >>> > >> > > 4. The KIP states that "In this process, offsets are written at >>> > >> regular >>> > >> > > intervals(driven by `offset.flush.interval.ms`)". This isn't >>> > strictly >>> > >> > > accurate since, when exactly-once support is enabled, offset >>> commits >>> > >> can >>> > >> > > also be performed for each record batch (which is the default) >>> or >>> > when >>> > >> > > explicitly requested by the task instance (if the connector >>> > implements >>> > >> > the >>> > >> > > API to define its own transactions and the user has configured >>> it to >>> > >> do >>> > >> > > so). Maybe better to just say "Offsets are written >>> periodically"? >>> > >> > > 5. The description for the (per-connector) >>> "heartbeat.records.topic >>> > " >>> > >> > > property states that it is "Only applicable in distributed >>> mode; in >>> > >> > > standalone mode, setting this property will have no effect". Is >>> this >>> > >> > > correct? >>> > >> > > >>> > >> > > Non-nits: >>> > >> > > >>> > >> > > 6. It seems (based on both the KIP and discussion on KAFKA-3821) >>> > that >>> > >> the >>> > >> > > only use case for being able to emit offsets without also >>> emitting >>> > >> source >>> > >> > > records that's been identified so far is for CDC source >>> connectors >>> > >> like >>> > >> > > Debezium. But Debezium already has support for this exact >>> feature >>> > >> > (emitting >>> > >> > > heartbeat records that include offsets that cannot be associated >>> > with >>> > >> > > other, "regular" source records). Why should we add this >>> feature to >>> > >> Kafka >>> > >> > > Connect when the problem it addresses is already solved in the >>> set >>> > >> > > connectors that (it seems) would have any need for it, and the >>> size >>> > of >>> > >> > that >>> > >> > > set is extremely small? If there are other practical use cases >>> for >>> > >> > > connectors that would benefit from this feature, please let me >>> know. >>> > >> > > >>> > >> > > 7. If a task produces heartbeat records and source records that >>> use >>> > >> the >>> > >> > > same source partition, which offset will ultimately be >>> committed? >>> > >> > > >>> > >> > > 8. The SourceTask::produceHeartbeatRecords method returns a >>> > >> > > List<SourceRecord>, and users can control the heartbeat topic >>> for a >>> > >> > > connector via the (connector- or worker-level) >>> > >> "heartbeat.records.topic" >>> > >> > > property. Since every constructor for the SourceRecord class [2] >>> > >> > requires a >>> > >> > > topic to be supplied, what will happen to that topic? Will it be >>> > >> ignored? >>> > >> > > If so, I think we should look for a cleaner solution. >>> > >> > > >>> > >> > > 9. A large concern raised in the discussion for KAFKA-3821 was >>> the >>> > >> > allowing >>> > >> > > connectors to control the ordering of these special >>> "offsets-only" >>> > >> > > emissions and the regular source records returned from >>> > >> SourceTask::poll. >>> > >> > > Are we choosing to ignore that concern? If so, can you add this >>> to >>> > the >>> > >> > > rejected alternatives section along with a rationale? >>> > >> > > >>> > >> > > 10. If, sometime in the future, we wanted to add framework-level >>> > >> support >>> > >> > > for sending heartbeat records that doesn't require connectors to >>> > >> > implement >>> > >> > > any new APIs (e.g., SourceTask::produceHeartbeatRecords), a lot >>> of >>> > >> this >>> > >> > > would paint us into a corner design-wise. We'd have to think >>> > carefully >>> > >> > > about which property names would be used, how to account for >>> > >> connectors >>> > >> > > that have already implemented the >>> > SourceTask::produceHeartbeatRecords >>> > >> > > method, etc. In general, it seems like we're trying to solve two >>> > >> > completely >>> > >> > > different problems with this single KIP: adding framework-level >>> > >> support >>> > >> > for >>> > >> > > emitting heartbeat records for source connectors, and allowing >>> > source >>> > >> > > connectors to emit offsets without also emitting source >>> records. I >>> > >> don't >>> > >> > > mind addressing the two at the same time if the result is >>> elegant >>> > and >>> > >> > > doesn't compromise on the solution for either problem, but that >>> > >> doesn't >>> > >> > > seem to be the case here. Of the two problems, could we >>> describe one >>> > >> as >>> > >> > the >>> > >> > > primary and one as the secondary? If so, we might consider >>> dropping >>> > >> the >>> > >> > > secondary problm from this KIP and addressing it separately. >>> > >> > > >>> > >> > > [1] - https://issues.apache.org/jira/browse/KAFKA-3821 >>> > >> > > [2] - >>> > >> > > >>> > >> > > >>> > >> > >>> > >> >>> > >>> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html >>> > >> > > >>> > >> > > Cheers, >>> > >> > > >>> > >> > > Chris >>> > >> > > >>> > >> > > On Sat, Mar 25, 2023 at 11:18 PM Sagar < >>> sagarmeansoc...@gmail.com> >>> > >> > wrote: >>> > >> > > >>> > >> > > > Hi John, >>> > >> > > > >>> > >> > > > Thanks for taking. look at the KIP! >>> > >> > > > >>> > >> > > > The point about stream time not advancing in case of >>> infrequent >>> > >> updates >>> > >> > > is >>> > >> > > > an interesting one. I can imagine if the upstream producer to >>> a >>> > >> Kafka >>> > >> > > > Streams application is a Source Connector which isn't sending >>> > >> records >>> > >> > > > frequently(due to the nature of the data ingestion for >>> example), >>> > >> then >>> > >> > the >>> > >> > > > downstream stream processing can land into the issues you >>> > described >>> > >> > > above. >>> > >> > > > >>> > >> > > > Which also brings me to the second point you made about how >>> this >>> > >> would >>> > >> > be >>> > >> > > > used by downstream consumers. IIUC, you are referring to the >>> > >> consumers >>> > >> > of >>> > >> > > > the newly added topic i.e the heartbeat topic. In my mind, the >>> > >> > heartbeat >>> > >> > > > topic is an internal topic (similar to offsets/config/status >>> topic >>> > >> in >>> > >> > > > connect), the main purpose of which is to trick the framework >>> to >>> > >> > produce >>> > >> > > > records to the offsets topic and advance the offsets. Since >>> every >>> > >> > > connector >>> > >> > > > could have a different definition of offsets(LSN, BinLogID >>> etc for >>> > >> > > > example), that logic to determine what the heartbeat records >>> > should >>> > >> be >>> > >> > > > would have to reside in the actual connector. >>> > >> > > > >>> > >> > > > Now that I think of it, it could very well be consumed by >>> > downstream >>> > >> > > > consumers/ Streams or Flink Applications and be further used >>> for >>> > >> some >>> > >> > > > decision making. A very crude example could be let's say if >>> the >>> > >> > heartbeat >>> > >> > > > records sent to the new heartbeat topic include timestamps, >>> then >>> > the >>> > >> > > > downstream streams application can use that timestamp to >>> close any >>> > >> time >>> > >> > > > windows. Having said that, it still appears to me that it's >>> > outside >>> > >> the >>> > >> > > > scope of the Connect framework and is something which is >>> difficult >>> > >> to >>> > >> > > > generalise because of the variety of Sources and the >>> definitions >>> > of >>> > >> > > > offsets. >>> > >> > > > >>> > >> > > > But, I would still be more than happy to add this example if >>> you >>> > >> think >>> > >> > it >>> > >> > > > can be useful in getting a better understanding of the idea >>> and >>> > also >>> > >> > its >>> > >> > > > utility beyond connect. Please let me know! >>> > >> > > > >>> > >> > > > Thanks! >>> > >> > > > Sagar. >>> > >> > > > >>> > >> > > > >>> > >> > > > On Fri, Mar 24, 2023 at 7:22 PM John Roesler < >>> vvcep...@apache.org >>> > > >>> > >> > > wrote: >>> > >> > > > >>> > >> > > > > Thanks for the KIP, Sagar! >>> > >> > > > > >>> > >> > > > > At first glance, this seems like a very useful feature. >>> > >> > > > > >>> > >> > > > > A common pain point in Streams is when upstream producers >>> don't >>> > >> send >>> > >> > > > > regular updates and stream time cannot advance. This causes >>> > >> > > > > stream-time-driven operations to appear to hang, like time >>> > windows >>> > >> > not >>> > >> > > > > closing, suppressions not firing, etc. >>> > >> > > > > >>> > >> > > > > From your KIP, I have a good idea of how the feature would >>> be >>> > >> > > integrated >>> > >> > > > > into connect, and it sounds good to me. I don't quite see >>> how >>> > >> > > downstream >>> > >> > > > > clients, such as a downstream Streams or Flink application, >>> or >>> > >> users >>> > >> > of >>> > >> > > > the >>> > >> > > > > Consumer would make use of this feature. Could you add some >>> > >> examples >>> > >> > of >>> > >> > > > > that nature? >>> > >> > > > > >>> > >> > > > > Thank you, >>> > >> > > > > -John >>> > >> > > > > >>> > >> > > > > On Fri, Mar 24, 2023, at 05:23, Sagar wrote: >>> > >> > > > > > Hi All, >>> > >> > > > > > >>> > >> > > > > > Bumping the thread again. >>> > >> > > > > > >>> > >> > > > > > Sagar. >>> > >> > > > > > >>> > >> > > > > > >>> > >> > > > > > On Fri, Mar 10, 2023 at 4:42 PM Sagar < >>> > >> sagarmeansoc...@gmail.com> >>> > >> > > > wrote: >>> > >> > > > > > >>> > >> > > > > >> Hi All, >>> > >> > > > > >> >>> > >> > > > > >> Bumping this discussion thread again. >>> > >> > > > > >> >>> > >> > > > > >> Thanks! >>> > >> > > > > >> Sagar. >>> > >> > > > > >> >>> > >> > > > > >> On Thu, Mar 2, 2023 at 3:44 PM Sagar < >>> > >> sagarmeansoc...@gmail.com> >>> > >> > > > wrote: >>> > >> > > > > >> >>> > >> > > > > >>> Hi All, >>> > >> > > > > >>> >>> > >> > > > > >>> I wanted to create a discussion thread for KIP-910: >>> > >> > > > > >>> >>> > >> > > > > >>> >>> > >> > > > > >>> >>> > >> > > > > >>> > >> > > > >>> > >> > > >>> > >> > >>> > >> >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records >>> > >> > > > > >>> >>> > >> > > > > >>> Thanks! >>> > >> > > > > >>> Sagar. >>> > >> > > > > >>> >>> > >> > > > > >> >>> > >> > > > > >>> > >> > > > >>> > >> > > >>> > >> > >>> > >> >>> > > >>> > >>> >>