[ 
https://issues.apache.org/jira/browse/KAFKA-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16310326#comment-16310326
 ] 

ASF GitHub Bot commented on KAFKA-5185:
---------------------------------------

GeoSmith closed pull request #2989: KAFKA-5185: Adding the RecordMetadata that 
is returned by the producer to the commitRecord method for SourceTask
URL: https://github.com/apache/kafka/pull/2989
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index f3e636bb87a..0a6aa4c4cab 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -18,6 +18,8 @@
 
 import org.apache.kafka.connect.connector.Task;
 
+import org.apache.kafka.clients.producer.RecordMetadata;
+
 import java.util.List;
 import java.util.Map;
 
@@ -90,7 +92,26 @@ public void commit() throws InterruptedException {
      * @param record {@link SourceRecord} that was successfully sent via the 
producer.
      * @throws InterruptedException
      */
+    @Deprecated
     public void commitRecord(SourceRecord record) throws InterruptedException {
         // This space intentionally left blank.
     }
+    
+    /**
+     * <p>
+     * Commit an individual {@link SourceRecord} when the callback from the 
producer client is received, or if a record is filtered by a transformation.
+     * </p>
+     * <p>
+     * SourceTasks are not required to implement this functionality; Kafka 
Connect will record offsets
+     * automatically. This hook is provided for systems that also need to 
store offsets internally
+     * in their own system.
+     * </p>
+     *
+     * @param record {@link SourceRecord} that was successfully sent via the 
producer.
+     * @param recordMetaData {@link RecordMetaData} record metadata returned 
from the producer after it has been sent successfully. If a transformation, 
this will return null
+     * @throws InterruptedException
+     */
+    public void commitRecord(SourceRecord record, RecordMetadata 
recordMetadata) throws InterruptedException {
+        // This space intentionally left blank.
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index ed15b85e515..e4d74cdbde7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -189,7 +189,7 @@ private boolean sendRecords() {
             final SourceRecord record = 
transformationChain.apply(preTransformRecord);
 
             if (record == null) {
-                commitTaskRecord(preTransformRecord);
+                commitTaskRecord(preTransformRecord, null);
                 continue;
             }
 
@@ -232,7 +232,7 @@ public void onCompletion(RecordMetadata recordMetadata, 
Exception e) {
                                     log.trace("Wrote record successfully: 
topic {} partition {} offset {}",
                                             recordMetadata.topic(), 
recordMetadata.partition(),
                                             recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord);
+                                    commitTaskRecord(preTransformRecord, 
recordMetadata);
                                 }
                                 recordSent(producerRecord);
                             }
@@ -252,9 +252,10 @@ public void onCompletion(RecordMetadata recordMetadata, 
Exception e) {
         return true;
     }
 
-    private void commitTaskRecord(SourceRecord record) {
+    private void commitTaskRecord(SourceRecord record, RecordMetadata 
recordMetadata) {
         try {
-            task.commitRecord(record);
+            task.commitRecord(record, recordMetadata);
+            task.commitRecord(record); //marked as depreciated and left for 
backwards compatibility
         } catch (InterruptedException e) {
             log.error("Exception thrown", e);
         } catch (Throwable t) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
index dfd8bac5aa0..ea5414784a1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
@@ -23,6 +23,7 @@
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,7 +122,7 @@ public void start(Map<String, String> props) {
     }
 
     @Override
-    public void commitRecord(SourceRecord record) throws InterruptedException {
+    public void commitRecord(SourceRecord record, RecordMetadata 
recordMetadata) throws InterruptedException {
         Map<String, Object> data = new HashMap<>();
         data.put("name", name);
         data.put("task", id);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 31204f01b2e..22edcaa33b8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -718,7 +718,7 @@ public SourceRecord answer() {
     }
 
     private void expectTaskCommitRecord(boolean anyTimes, boolean succeed) 
throws InterruptedException {
-        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
+        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), 
EasyMock.anyObject(RecordMetadata.class));
         IExpectationSetters<Void> expect = EasyMock.expectLastCall();
         if (!succeed) {
             expect = expect.andThrow(new RuntimeException("Error committing 
record in source task"));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Adding the RecordMetadata that is returned by the producer to the 
> commitRecord method for SourceTask
> ----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5185
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5185
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: George Smith
>
> An improvement request I thought would be useful.
> Added the producers record metadata object to the commitRecord method on the 
> SourceTask class so more data is provided from the producer and it allows 
> anyone overriding and hooking into the commitRecord method to receive more 
> information about where the record was procuded to. 
> Left the old commitRecord method with just the sourcerecord for backwards 
> compatbility even though this would technically be included in a new version 
> of kafka, it would intoduce a breaking change without it. 
> Opened up PR here: https://github.com/apache/kafka/pull/2989



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to