[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172876#comment-16172876
 ] 

Per Steffensen edited comment on KAFKA-5716 at 9/20/17 11:13 AM:
-----------------------------------------------------------------

bq. The idiomatic pattern for source connectors is to rely upon the source 
partition and source offsets included in each source record, and to have 
Connect periodically commit those offsets to offset storage (e.g., Kafka in 
distributed mode) as well as during graceful shutdown of the connector. Connect 
then uses the committed offsets upon restart to inform the connector of the 
last persisted offsets the connector provided, and then the connector can 
restart from that point.

Yes, I completely get that.

bq. In this scenario, the connector doesn't need to acknowledge or be notified 
of the committed offsets since Connect is completely managing the offsets for 
the connector.

Guess it depends

bq. I'd be interested in hearing more about your use case, where IIUC your 
connector needs to notify the source system that progress has completed. Can 
you describe the source system and why you're using this approach?

I believe it would be to much explanation here. I can give a few "hints", but 
maybe you will not be able to follow without more details.

First of all, I am starting to realize that I may be using Kafka-Connect for 
something that it was not completely intended for. I start to believe that it 
is intended to "expose data in some source-system via Kafka, but the data stays 
in the source-system". What I am using it for can in some cases be described as 
"transfer data from some temporary storage (source-system) to Kafka, and when 
it has been safely transferred it is deleted from the source-system", and in 
other cases as just "advanced Kafka-to-Kafka ETL". Even though Kafka-Connect 
(source-connectors) is intended for the first thing, it does a very nice job 
for me on the second thing :-) Maybe the intended use for Kafka-Connect could 
change to include both :-) :-) :-)

Shortly about two of the source-connectors I have

1. TCP streaming indirectly to Kafka

>From a 3rd-party we receive data through TCP. Unknown number of concurrent 
>TCP-streams. We have only one chance to get this data, so we better make sure 
>we "receive and store it". We could write the TCP-streams directly to Kafka, 
>but for several good reasons we do write the stream to local files. Each 
>stream gets chopped into several files of a particular length, starting on the 
>next file, when the previous file has reached that length. Now, the job is to 
>get each of those streams onto Kafka. Using Kafka-Connect, which does the job 
>nicely, and bla bla bla I cannot afford to delete the original file before all 
>the data in it has been written to target Kafka, and the connect-offsets (used 
>to keep track of where in the files I got to) has been flushed, so that I will 
>not be asked to replay from a none-existing file. Lots of details missing, but 
>maybe you get the picture.
As a note, there may be significantly more TCP-streams than I have partitions 
in my target Kafka-topic, so the streams are "multiplexed" into the partitions, 
several streams sharing the same partition. Each stream has its own unique key 
(used as Kafka-message-key), so that one stream can be distinguished from the 
other, and so that all messages from the same stream goes to the same partition 
(using default key-hash-modulo partitions-selector)

2. Parsing packets from streams multiplexed into the partitions of a Kafka topic

So we have multiplexed streams in the partitions of a Kafka topic. From here I 
use Kafka-Connect as an advanced ETL (including the translate) on 
Kafka-to-Kafka. No, Kafka-Streams will not do the job. But something based on 
Kafka-Connect will :-) For this Kafka-Connect-ETL it is not enough to only use 
normal Kafka offset on the ingoing Kafka topic, because it has to keep track of 
where each multiplexed sub-stream of each partition got to. It does so relative 
to the normal committed offset on the ingoing Kafka topic. I order for this to 
work properly the order of things has to be
* Outgoing packets must have been sent and acknowledged by outgoing Kafka 
(commitRecord)
* Sub-stream offsets (connect-offsets) relative to the normal Kafka offset has 
to be safely stored (commit)
* Normal Kafka offset can be moved

Losts of details missing, but you will see that I have to know when 
connect-offstes have been written and flushed, in order to know when I can 
acknowledge the normal Kafka-offset, which they are relative to.

I consider "multiplexing more data-streams into less Kafka-partitions" as a 
"problem" so general, that I hope to get the time to open-source the system. I 
believe others will be able to benefit.

bq. Does the source not allow you to replay from any point? Are you using this 
mechanism to notify the source system that it can garbage collect events that 
your connector is reading?

Depends...

--------------

Really hope we can keep {{commit}} around, at least just unchanged (as if 
KAFKA-5716 was never reported). I would still like to see the small 
"improvement" though
{code}
public void commit(something) throws InterruptedException {
    commit();
}
public void commit() throws InterruptedException {
    // This space intentionally left blank.
}
{code}
It should be easy to make it. Only thing is the backwards-compatibility. 
Depending on the "strictness" of that, it will be or wont be "just go do".


was (Author: steff1193):
bq. The idiomatic pattern for source connectors is to rely upon the source 
partition and source offsets included in each source record, and to have 
Connect periodically commit those offsets to offset storage (e.g., Kafka in 
distributed mode) as well as during graceful shutdown of the connector. Connect 
then uses the committed offsets upon restart to inform the connector of the 
last persisted offsets the connector provided, and then the connector can 
restart from that point.

Yes, I completely get that.

bq. In this scenario, the connector doesn't need to acknowledge or be notified 
of the committed offsets since Connect is completely managing the offsets for 
the connector.

Guess it depends

bq. I'd be interested in hearing more about your use case, where IIUC your 
connector needs to notify the source system that progress has completed. Can 
you describe the source system and why you're using this approach?

I believe it would be to much explanation here. I can give a few "hints", but 
maybe you will not be able to follow without more details.

First of all, I am starting to realize that I may be using Kafka-Connect for 
something that it was not completely intended for. I start to believe that it 
is intended to "expose data in some source-system via Kafka, but the data stays 
in the source-system". What I am using it for can in some cases be described as 
"transfer data from some temporary storage (source-system) to Kafka, and when 
it has been safely transferred it is deleted from the source-system", and in 
other cases as just "advanced Kafka-to-Kafka ETL". Even though Kafka-Connect 
(source-connectors) is intended for the first thing, it does a very nice job 
for me on the second thing :-) Maybe the intended use for Kafka-Connect could 
change to include both :-) :-) :-)

Shortly about two of the source-connectors I have

1. TCP streaming indirectly to Kafka

>From a 3rd-party we receive data through TCP. Unknown number of concurrent 
>TCP-streams. We have only one chance to get this data, so we better make sure 
>we "receive and store it". We could write the TCP-streams directly to Kafka, 
>but for several good reasons we do write the stream to local files. Each 
>stream gets chopped into several files of a particular length, starting on the 
>next file, when the previous file has reached that length. Now, the job is to 
>get each of those streams onto Kafka. Using Kafka-Connect, which does the job 
>nicely, and bla bla bla I cannot afford to delete the original file before all 
>the data in it has been written to target Kafka, and the connect-offsets (used 
>to keep track of where in the files I got to) has been flushed, so that I will 
>not be asked to replay from a none-existing file. Lots of details missing, but 
>maybe you get the picture.
As a note, there may be significantly more TCP-streams than I have partitions 
in my target Kafka-topic, so the streams are "multiplexed" into the partitions, 
several streams sharing the same partition. Each stream has its own unique key 
(used as Kafka-message-key), so that one stream can be distinguished from the 
other, and so that all messages from the same stream goes to the same partition 
(using default key-hash-modulo partitions-selector)

2. Parsing packets from streams multiplexed into the partitions of a Kafka topic

So we have multiplexed streams in the partitions of a Kafka topic. From here I 
use Kafka-Connect as an advanced ETL (including the translate) on 
Kafka-to-Kafka. No, Kafka-Streams will not do the job. But something based on 
Kafka-Connect will :-) For this Kafka-Connect-ETL it is not enough to only use 
normal Kafka offset on the ingoing Kafka topic, because it has to keep track of 
where each multiplexed sub-stream of each partition got to. It does so relative 
to the normal committed offset on the ingoing Kafka topic. I order for this to 
work properly the order of things has to be
* Outgoing packets must have been sent and acknowledged by outgoing Kafka 
(commitRecord)
* Sub-stream offsets (connect-offsets) relative to the normal Kafka offset has 
to be safely stored (commit)
* Normal Kafka offset can be moved

Losts of details missing, but you will see that I have to know when 
connect-offstes have been written and flushed, in order to know when I can 
acknowledge the normal Kafka-offset, which they are relative to.

I consider "multiplexing more data-streams into less Kafka-partitions" as a 
"problem" so general, that I hope to get the time to open-source the system. I 
believe others will be able to benefit.

bq. Does the source not allow you to replay from any point? Are you using this 
mechanism to notify the source system that it can garbage collect events that 
your connector is reading?

Depends...

--------------

Really hope we can keep {{commit}} around, at least just unchanged (as if 
KAFKA-5716 was never reported). I would still like to see the small 
"improvement" though
{code}
public void commit(something) throws InterruptedException {
    commit();
}
public void commit() throws InterruptedException {
    // This space intentionally left blank.
}
{code}
It should be easy to make it. Only thing is the backwards-compatibility. 
Depending on the "strictness" of that, it is or so not just "go do".

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5716
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Per Steffensen
>            Priority: Minor
>         Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to