The producer in the `WorkerSourceTask` automatically resends records, but if 
the producer fails to resend the [WorkerSourceTask enqueues the unsent records 
in 
`toSend`](https://github.com/apache/kafka/blob/08e8facdc9fce3a9195f5f646b49f55ffa043c73/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L343-L348)
 and send them again.

It is true that this happens after each call to `poll()`, but if the send fails 
then `toSend` is non-null and upon the next iteration of the loop it will not 
call `poll()` and will then try resending whatever is in `toSend`. This will 
continue to happen as long as `toSend` is not null. However, in the current PR, 
even though this might happen, the loop may still ask for the source partitions 
and offsets and will synchronously commit them using the `offsetWriter`.

So it is possible that a record with a particular offset `o1`, for a source 
partition `p1` fails to send and is retried, but then a connector then sets a 
later offset `o2` for the same partition and the connector commits offset `o2`. 
If the connector were to fail at exactly that point (which is possible), the 
`o2` offset may have been committed without the `o1` record being written.

I understand that in your particular use case, you probably would only set the 
offsets for a particular partition if records were not written recently, but 
that doesn't change the fact that the `WorkerSourceTask` might be attempting to 
resend the previous records for quite some time.

What if your new block of code were only performed if `sendRecords()` 
succeeded? I think there are a couple of issues with that as well. First, the 
offset writer is called synchronously, whereas other calls to commit offsets 
are sent a separate commit thread that calls multiple tasks. Now there are 
multiple threads committing offsets with potential race conditions and 
concurrency issues. 

Second, it still is a complicated API, and will developers truly understand 
when and how they use `getSourcePartitionAndOffset()`? Can I call it to read 
the last offset committed for a particular source partition? The worker doesn't 
ever set the offsets there.

The WorkerSourceTask has a single, ordered pipeline for all records that each 
have their offsets. I still believe the best and most reliable and 
deterministic way to solve this is to use that same pipeline.

[ Full content available at: https://github.com/apache/kafka/pull/5553 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to