[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410487#comment-16410487
 ] 

Gunnar Morling commented on KAFKA-3821:
---------------------------------------

Hey [~ewencp], thanks for your extensive reply! I agree this discussion 
shouldn't be centered around saving or not saving that one allocation. I just 
added this to point out that the receiver approach leaves Kafka Connect more 
freedom in terms of implementing this. I.e. that way, in theory, you could even 
do crazy things such as storing incoming records off-heap. But yeah, most 
likely you'd just use a list yourselves.

But that's not why I was suggesting this. The more substantial advantage I see 
is that it allows to add further methods down the road without breaking 
existing implementors of this contract. [~rhauch] e.g. mentioned methods for TX 
handling. Assuming these calls must be done in the correct ordering when 
submitting records, such methods couldn't really be added to 
{{ConnectorSourceTask}} itself. It's the same for offset handling which is the 
original use case we're after here.

On the offset handling itself, there's two aspects to this. One is that we'd 
like to submit an offset once a snapshot is completed. Currently, we're doing 
what you described with

{quote}
would another option be having Debezium read 1 record forward to determine 
before returning the record and/or constructing its offset whether this is the 
final record of the snapshot
{quote}

But this leads back to complexity of this contract for implementors. Having a 
dedicated way for submitting the offset once the snapshot is done is arguably 
simpler to implement (and reason about when reading the code) than doing this 
"read forward" and delayed processing of it.

In terms of uniqueness of offsets, they _are_ unique also during snapshotting 
(binlog position), but we need a way to say that a snapshot has been completed.

I see though how that could be considered as a "style thing" mostly. There's 
another case though where we'd benefit - substantially - from being able to 
submit offsets explicitly. This is where a connector supports whitelisting of 
captured tables and no changes are done to the whitelisted tables in a while (a 
[comment 
above|https://issues.apache.org/jira/browse/KAFKA-3821?focusedCommentId=15973506&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15973506]
 touches on this). In this case the offset in the event source progresses (e.g. 
MySQL's binlog position), but as no record is of interest as per the whitelist 
config, we have no way to ever submit these offsets. Then, after a connector 
restart, we'd be forced to re-read much larger portions of the binlog than 
actually required.

We currently work around this in Debezium by regularly emitting records to a 
heartbeat topic. This allows us to submit these offsets, also if no changes to 
the whitelisted tables are applied.

Now the original idea above was to emit specific subclasses of {{SourceRecord}} 
for such "offset-only" records, but I wanted to bring up the receiver parameter 
idea because a) it feels less hackish to me and b) opens the door for further 
API additions as described above.

I hope this makes sense; what's proposed is the best coming to my mind right 
for the issues we try resolve (while keeping the API reasonably abstract so to 
cater for other use cases, too). Happy about any alternative proposals of 
course. Thanks for this great discussion!

> Allow Kafka Connect source tasks to produce offset without writing to topics
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-3821
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3821
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 0.9.0.1
>            Reporter: Randall Hauch
>            Priority: Major
>              Labels: needs-kip
>             Fix For: 1.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to