C0urante commented on code in PR #13955:
URL: https://github.com/apache/kafka/pull/13955#discussion_r1258436765


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -651,6 +652,40 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         verifyTopicCreation();
     }
 
+    @Test
+    public void testSendRecordsRetriableException() {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+        expectTaskGetTopic();
+
+        when(transformationChain.apply(eq(record1))).thenReturn(null);
+        when(transformationChain.apply(eq(record2))).thenReturn(null);
+        when(transformationChain.apply(eq(record3))).thenReturn(record3);
+
+        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
+        TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
+        
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, 
topicDesc));
+
+        when(producer.send(any(), any())).thenThrow(new 
RetriableException("Retriable exception")).thenReturn(null);
+
+        workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+        // The producer throws a RetriableException the first time we try to 
send the third record
+        assertFalse(workerTask.sendRecords());
+
+        // The next attempt to send the third record should succeed
+        assertTrue(workerTask.sendRecords());
+
+        // Ensure that the first two records that were filtered out by the 
transformation chain
+        // aren't re-processed when we retry the call to sendRecords()
+        verify(transformationChain, times(4)).apply(any(SourceRecord.class));

Review Comment:
   If I understand the proposal correctly, we're thinking of transforming and 
converting every record in a batch returned from `SourceTask::poll` before 
dispatching any of those records to the producer.
   
   I don't think we should adopt this approach because it changes our behavior 
with poison pill records. If the connector emits a record in the middle of a 
batch that causes deterministic failures during transformation or conversion, 
right now, we dispatch every record before the poison pill to the producer, and 
are able to commit offsets for those records when the task fails. With the 
proposed approach, we wouldn't be able to dispatch any of those records to the 
producer or commit offsets for them.
   
   Especially with 
[KIP-875](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect)
 and the offset alter REST API coming out soon, it'd be best for our users in 
cases like this to be able to know exactly which record the connector is 
choking on, and (optionally) to skip over just that single record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to