C0urante commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1068558694


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" 
href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once 
support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees 
for sink connectors (as of version 0.11.0) and source connectors (as of version 
3.3.0). Please note that <b>support for exactly-once delivery is highly 
dependent on the type of connector you run.</b> Even if you set all the correct 
worker properties in the configuration for each node in a cluster, if a 
connector is not designed to, or cannot take advantage of the capabilities of 
the Kafka Connect framework, exactly-once may not be possible.</p>

Review Comment:
   Thanks Matthias, I didn't realize this term could be so controversial. I've 
filed https://github.com/apache/kafka/pull/13106 as a follow-up to fix the 
wording used here and elsewhere in Connect.



##########
docs/connect.html:
##########
@@ -593,6 +701,109 @@ <h5><a id="connect_resuming" 
href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input 
streams. The <code>OffsetStorageReader</code> interface also allows you to 
issue bulk reads to efficiently load all offsets, then apply them by seeking 
each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" 
href="#connect_exactlyoncesourceconnectors>">Exactly-once source 
connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors";>KIP-618</a>,
 Kafka Connect supports exactly-once source connectors as of version 3.3.0. In 
order for a source connector to take advantage of this support, it must be able 
to provide meaningful source offsets for each record that it emits, and resume 
consumption from the external system at the exact position corresponding to any 
of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new 
Kafka transaction for each batch of records that a source task returns from its 
<code>poll</code> method. However, connectors can also define their own 
transaction boundaries, which can be enabled by users by setting the 
<code>transaction.boundary</code> property to <code>connector</code> in the 
config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a 
<code>TransactionContext</code> from their <code>SourceTaskContext</code>, 
which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }
+    return records;
+}
+</pre>
+
+    <p>Or to commit a transaction for exactly every tenth record:</p>

Review Comment:
   When a connector is configured to define its own transaction boundaries, we 
never force a transaction commit unless the connector itself requests one. So 
yes, in this case, we would abort the transaction during shutdown and then 
start reading from the last successfully-committed offset (which should have 
been published in the last successfully-committed transaction) once a new 
instance is brought up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to