C0urante commented on code in PR #13955:
URL: https://github.com/apache/kafka/pull/13955#discussion_r1254656021
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -651,6 +652,40 @@ public void
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
verifyTopicCreation();
}
+ @Test
+ public void testSendRecordsRetriableException() {
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+ expectTaskGetTopic();
+
+ when(transformationChain.apply(eq(record1))).thenReturn(null);
+ when(transformationChain.apply(eq(record2))).thenReturn(null);
+ when(transformationChain.apply(eq(record3))).thenReturn(record3);
+
+ TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0,
null, Collections.emptyList(), Collections.emptyList());
+ TopicDescription topicDesc = new TopicDescription(TOPIC, false,
Collections.singletonList(topicPartitionInfo));
+
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC,
topicDesc));
+
+ when(producer.send(any(), any())).thenThrow(new
RetriableException("Retriable exception")).thenReturn(null);
+
+ workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+ // The producer throws a RetriableException the first time we try to
send the third record
+ assertFalse(workerTask.sendRecords());
+
+ // The next attempt to send the third record should succeed
+ assertTrue(workerTask.sendRecords());
+
+ // Ensure that the first two records that were filtered out by the
transformation chain
+ // aren't re-processed when we retry the call to sendRecords()
+ verify(transformationChain, times(4)).apply(any(SourceRecord.class));
Review Comment:
Do you think it might be worth it to be more explicit in the verifications
here? I.e., verify that `record1` and `record2` were transformed only once, but
`record3` was transformed twice?
Also, it's a little strange that we're re-transforming a record for each
retry. If it's not too much we may want to refine that logic to only transform
each record at most once.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]