The producer in the `WorkerSourceTask` automatically resends records, but if the producer fails to resend the [WorkerSourceTask enqueues the unsent records in `toSend`](https://github.com/apache/kafka/blob/08e8facdc9fce3a9195f5f646b49f55ffa043c73/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L343-L348) and send them again.
It is true that this happens after each call to `poll()`, but if the send fails then `toSend` is non-null and upon the next iteration of the loop it will not call `poll()` and will then try resending whatever is in `toSend`. This will continue to happen as long as `toSend` is not null. However, in the current PR, even though this might happen, the loop may still ask for the source partitions and offsets and will synchronously commit them using the `offsetWriter`. So it is possible that a record with a particular offset `o1`, for a source partition `p1` fails to send and is retried, but then a connector then sets a later offset `o2` for the same partition and the connector commits offset `o2`. If the connector were to fail at exactly that point (which is possible), the `o2` offset may have been committed without the `o1` record being written. I understand that in your particular use case, you probably would only set the offsets for a particular partition if records were not written recently, but that doesn't change the fact that the `WorkerSourceTask` might be attempting to resend the previous records for quite some time. What if your new block of code were only performed if `sendRecords()` succeeded? I think there are a couple of issues with that as well. First, the offset writer is called synchronously, whereas other calls to commit offsets are sent a separate commit thread that calls multiple tasks. Now there are multiple threads committing offsets with potential race conditions and concurrency issues. Second, it still is a complicated API, and will developers truly understand when and how they use `getSourcePartitionAndOffset()`? Can I call it to read the last offset committed for a particular source partition? The worker doesn't ever set the offsets there. The WorkerSourceTask has a single, ordered pipeline for all records that each have their offsets. I still believe the best and most reliable and deterministic way to solve this is to use that same pipeline. [ Full content available at: https://github.com/apache/kafka/pull/5553 ] This message was relayed via gitbox.apache.org for [email protected]
