yashmayya commented on code in PR #13955:
URL: https://github.com/apache/kafka/pull/13955#discussion_r1255540020
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -651,6 +652,40 @@ public void
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
verifyTopicCreation();
}
+ @Test
+ public void testSendRecordsRetriableException() {
+ createWorkerTask();
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+ expectTaskGetTopic();
+
+ when(transformationChain.apply(eq(record1))).thenReturn(null);
+ when(transformationChain.apply(eq(record2))).thenReturn(null);
+ when(transformationChain.apply(eq(record3))).thenReturn(record3);
+
+ TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0,
null, Collections.emptyList(), Collections.emptyList());
+ TopicDescription topicDesc = new TopicDescription(TOPIC, false,
Collections.singletonList(topicPartitionInfo));
+
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC,
topicDesc));
+
+ when(producer.send(any(), any())).thenThrow(new
RetriableException("Retriable exception")).thenReturn(null);
+
+ workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+ // The producer throws a RetriableException the first time we try to
send the third record
+ assertFalse(workerTask.sendRecords());
+
+ // The next attempt to send the third record should succeed
+ assertTrue(workerTask.sendRecords());
+
+ // Ensure that the first two records that were filtered out by the
transformation chain
+ // aren't re-processed when we retry the call to sendRecords()
+ verify(transformationChain, times(4)).apply(any(SourceRecord.class));
Review Comment:
Transformations / SMTs are applied on `SourceRecord` instances and also
return `SourceRecord`s. Transformed records are converted to `ProducerRecord`s
by converters. Although that does bring up the point that not only are we
re-transforming a record for each retry, we're also re-converting each time. I
agree that this does seem a little strange.
We currently rely on the global `toSend` list to maintain the source records
to be retried between successive calls to `sendRecords` (separated by a backoff
period). I guess we could decouple the transformation + conversion of a batch
of records from the loop where we dispatch the records - i.e. maintain a new
global mapping from pre-transform `SourceRecord`s to transformed + converted
`ProducerRecord`s which is populated first for each polled batch and then
dispatched in `sendRecords` (following the same retry logic that we currently
use). I feel like this optimization refactor could be done as a follow-up /
separate item since it will also require a fair bit of test refactoring which
will add more noise to this PR that (IMO) fixes a more important issue, WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]