vamossagar12 commented on code in PR #13955:
URL: https://github.com/apache/kafka/pull/13955#discussion_r1255253840
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -651,6 +652,40 @@ public void
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
verifyTopicCreation();
}
+ @Test
+ public void testSendRecordsRetriableException() {
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+ expectTaskGetTopic();
+
+ when(transformationChain.apply(eq(record1))).thenReturn(null);
+ when(transformationChain.apply(eq(record2))).thenReturn(null);
+ when(transformationChain.apply(eq(record3))).thenReturn(record3);
+
+ TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0,
null, Collections.emptyList(), Collections.emptyList());
+ TopicDescription topicDesc = new TopicDescription(TOPIC, false,
Collections.singletonList(topicPartitionInfo));
+
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC,
topicDesc));
+
+ when(producer.send(any(), any())).thenThrow(new
RetriableException("Retriable exception")).thenReturn(null);
+
+ workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+ // The producer throws a RetriableException the first time we try to
send the third record
Review Comment:
nit: For better readability, just mention here that the first 2 records have
been filtered out and only record3 would be sent. Would it be easier to follow
for anyone who's new to the codebase. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]