vamossagar12 commented on code in PR #13955:
URL: https://github.com/apache/kafka/pull/13955#discussion_r1255252249


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -651,6 +652,40 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         verifyTopicCreation();
     }
 
+    @Test
+    public void testSendRecordsRetriableException() {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+        expectTaskGetTopic();
+
+        when(transformationChain.apply(eq(record1))).thenReturn(null);
+        when(transformationChain.apply(eq(record2))).thenReturn(null);
+        when(transformationChain.apply(eq(record3))).thenReturn(record3);
+
+        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
+        TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
+        
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, 
topicDesc));
+
+        when(producer.send(any(), any())).thenThrow(new 
RetriableException("Retriable exception")).thenReturn(null);
+
+        workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+        // The producer throws a RetriableException the first time we try to 
send the third record
+        assertFalse(workerTask.sendRecords());
+
+        // The next attempt to send the third record should succeed
+        assertTrue(workerTask.sendRecords());
+
+        // Ensure that the first two records that were filtered out by the 
transformation chain
+        // aren't re-processed when we retry the call to sendRecords()
+        verify(transformationChain, times(4)).apply(any(SourceRecord.class));

Review Comment:
   Yeah I agree. An explicit check for `record1` and `record2` being 
transformed once and `record3` twice would be good.
   Regarding the duplicate re-transformations, I guess it should be achievable 
by storing a mapping between `SourceRecord` and the `ProducerRecord` ie 
pre-transformed and transformed. We can keep clearing the map as and when the 
entire batch of records are processed. In theory, that could have been another 
way of achieving what we are trying to achieve in this PR i.e if SMT outputs a 
record's transformation as null, we can update the mapping and we can skip it 
for subsequent retries. But the solution in this PR is cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to