C0urante commented on code in PR #12941: URL: https://github.com/apache/kafka/pull/12941#discussion_r1068558694
########## docs/connect.html: ########## @@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin # Tolerate all errors. errors.tolerance=all</pre> + <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4> + + <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector you run.</b> Even if you set all the correct worker properties in the configuration for each node in a cluster, if a connector is not designed to, or cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p> Review Comment: Thanks Matthias, I didn't realize this term could be so controversial. I've filed https://github.com/apache/kafka/pull/13106 as a follow-up to fix the wording used here and elsewhere in Connect. ########## docs/connect.html: ########## @@ -593,6 +701,109 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p> + <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5> + + <h6>Supporting exactly-once</h6> + + <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p> + + <h6>Defining transaction boundaries</h6> + + <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p> + + <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p> + + <p>For example, to commit a transaction at least every ten records:</p> + +<pre class="brush: java;"> +private int recordsSent; + +@Override +public void start(Map<String, String> props) { + this.recordsSent = 0; +} + +@Override +public List<SourceRecord> poll() { + List<SourceRecord> records = fetchRecords(); + boolean shouldCommit = false; + for (SourceRecord record : records) { + if (++this.recordsSent >= 10) { + shouldCommit = true; + } + } + if (shouldCommit) { + this.recordsSent = 0; + this.context.transactionContext().commitTransaction(); + } + return records; +} +</pre> + + <p>Or to commit a transaction for exactly every tenth record:</p> Review Comment: When a connector is configured to define its own transaction boundaries, we never force a transaction commit unless the connector itself requests one. So yes, in this case, we would abort the transaction during shutdown and then start reading from the last successfully-committed offset (which should have been published in the last successfully-committed transaction) once a new instance is brought up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org