This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b35c29401ac KAFKA-18073: Prevent dropped records from failed retriable 
exceptions (#18146)
b35c29401ac is described below

commit b35c29401acb12c7a2b8a450c5164cddf82361c6
Author: Thomas Thornton <[email protected]>
AuthorDate: Thu Jan 9 13:13:11 2025 -0500

    KAFKA-18073: Prevent dropped records from failed retriable exceptions 
(#18146)
    
    Reviewers: Greg Harris <[email protected]>
---
 .../runtime/errors/RetryWithToleranceOperator.java |   3 +-
 .../runtime/AbstractWorkerSourceTaskTest.java      | 144 ++++++++++++++++++++-
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   |   2 +-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 122 ++++++++++++++++-
 .../runtime/WorkerSinkTaskThreadedTest.java        |   2 +-
 .../connect/runtime/WorkerSourceTaskTest.java      |   4 +-
 .../kafka/connect/runtime/WorkerTestUtils.java     |  39 ++++++
 .../errors/RetryWithToleranceOperatorTest.java     |  55 +++++---
 8 files changed, 340 insertions(+), 31 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 28886b3557c..2b9ba9fc5b7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -214,8 +214,7 @@ public class RetryWithToleranceOperator<T> implements 
AutoCloseable {
                     errorHandlingMetrics.recordRetry();
                 } else {
                     log.trace("Can't retry. start={}, attempt={}, 
deadline={}", startTime, attempt, deadline);
-                    context.error(e);
-                    return null;
+                    throw e;
                 }
                 if (stopping) {
                     log.trace("Shutdown has been scheduled. Marking operation 
as failed.");
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
index 0cb4db70647..f33e9bc514b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
@@ -80,6 +80,7 @@ import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
@@ -264,7 +265,7 @@ public class AbstractWorkerSourceTaskTest {
 
         assertArrayEquals(SERIALIZED_KEY, sent.getValue().key());
         assertArrayEquals(SERIALIZED_RECORD, sent.getValue().value());
-        
+
         verifyTaskGetTopic();
         verifyTopicCreation();
     }
@@ -362,8 +363,8 @@ public class AbstractWorkerSourceTaskTest {
         StringConverter stringConverter = new StringConverter();
         SampleConverterWithHeaders testConverter = new 
SampleConverterWithHeaders();
 
-        createWorkerTask(stringConverter, testConverter, stringConverter, 
RetryWithToleranceOperatorTest.noopOperator(),
-                Collections::emptyList);
+        createWorkerTask(stringConverter, testConverter, stringConverter, 
RetryWithToleranceOperatorTest.noneOperator(),
+                Collections::emptyList, transformationChain);
 
         expectSendRecord(null);
         expectApplyTransformationChain();
@@ -706,6 +707,118 @@ public class AbstractWorkerSourceTaskTest {
         verify(transformationChain, times(2)).apply(any(), eq(record3));
     }
 
+    @Test
+    public void testSendRecordsFailedTransformationErrorToleranceNone() {
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        RetryWithToleranceOperator<RetriableException> 
retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator();
+        TransformationChain<RetriableException, SourceRecord> 
transformationChainRetriableException =
+                
WorkerTestUtils.getTransformationChain(retryWithToleranceOperator, List.of(new 
RetriableException("Test"), record1));
+        createWorkerTask(transformationChainRetriableException, 
retryWithToleranceOperator);
+
+        expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+
+        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
+        TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
+        
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, 
topicDesc));
+
+        workerTask.toSend = Arrays.asList(record1);
+
+        // The transformation errored out so the error should be re-raised by 
sendRecords with error tolerance None
+        Exception exception = assertThrows(ConnectException.class, 
workerTask::sendRecords);
+        assertTrue(exception.getMessage().contains("Tolerance exceeded"));
+
+        // Ensure the transformation was called
+        verify(transformationChainRetriableException, times(1)).apply(any(), 
eq(record1));
+
+        // The second transform call will succeed, batch should succeed at 
sending the one record (none were skipped)
+        assertTrue(workerTask.sendRecords());
+        verifySendRecord(1);
+    }
+
+    @Test
+    public void testSendRecordsFailedTransformationErrorToleranceAll() {
+        RetryWithToleranceOperator<RetriableException> 
retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator();
+        TransformationChain<RetriableException, SourceRecord> 
transformationChainRetriableException = WorkerTestUtils.getTransformationChain(
+                retryWithToleranceOperator,
+                List.of(new RetriableException("Test")));
+
+        createWorkerTask(transformationChainRetriableException, 
retryWithToleranceOperator);
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+
+        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
+        TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
+
+        workerTask.toSend = Arrays.asList(record1);
+
+        // The transformation errored out so the error should be ignored & the 
record skipped with error tolerance all
+        assertTrue(workerTask.sendRecords());
+
+        // Ensure the transformation was called
+        verify(transformationChainRetriableException, times(1)).apply(any(), 
eq(record1));
+    }
+
+    @Test
+    public void testSendRecordsConversionExceptionErrorToleranceNone() {
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        RetryWithToleranceOperator<RetriableException> 
retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator();
+        List<Object> results = Stream.of(record1, record2, record3)
+                .collect(Collectors.toList());
+        TransformationChain<RetriableException, SourceRecord> chain = 
WorkerTestUtils.getTransformationChain(
+                retryWithToleranceOperator,
+                results);
+        createWorkerTask(chain, retryWithToleranceOperator);
+
+        // When we try to convert the key/value of each record, throw an 
exception
+        throwExceptionWhenConvertKey(emptyHeaders(), TOPIC);
+
+        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
+        TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
+        
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, 
topicDesc));
+
+        workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+        // Send records should fail when errors.tolerance is none and the 
conversion call fails
+        Exception exception = assertThrows(ConnectException.class, 
workerTask::sendRecords);
+        assertTrue(exception.getMessage().contains("Tolerance exceeded"));
+        assertThrows(ConnectException.class, workerTask::sendRecords);
+        assertThrows(ConnectException.class, workerTask::sendRecords);
+
+        // Set the conversion call to succeed, batch should succeed at sending 
all three records (none were skipped)
+        expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+        assertTrue(workerTask.sendRecords());
+        verifySendRecord(3);
+    }
+
+    @Test
+    public void testSendRecordsConversionExceptionErrorToleranceAll() {
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        RetryWithToleranceOperator<RetriableException> 
retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator();
+        List<Object> results = Stream.of(record1, record2, record3)
+                .collect(Collectors.toList());
+        TransformationChain<RetriableException, SourceRecord> chain = 
WorkerTestUtils.getTransformationChain(
+                retryWithToleranceOperator,
+                results);
+        createWorkerTask(chain, retryWithToleranceOperator);
+
+        // When we try to convert the key/value of each record, throw an 
exception
+        throwExceptionWhenConvertKey(emptyHeaders(), TOPIC);
+
+        workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+        // With errors.tolerance to all, the faiiled conversion should simply 
skip the record, and record successful batch
+        assertTrue(workerTask.sendRecords());
+    }
+
     private void expectSendRecord(Headers headers) {
         if (headers != null)
             expectConvertHeadersAndKeyValue(headers, TOPIC);
@@ -806,6 +919,20 @@ public class AbstractWorkerSourceTaskTest {
         assertEquals(valueConverter.fromConnectData(topic, headers, 
RECORD_SCHEMA, RECORD), SERIALIZED_RECORD);
     }
 
+    private void throwExceptionWhenConvertKey(Headers headers, String topic) {
+        if (headers.iterator().hasNext()) {
+            when(headerConverter.fromConnectHeader(anyString(), anyString(), 
eq(Schema.STRING_SCHEMA),
+                    anyString()))
+                    .thenAnswer((Answer<byte[]>) invocation -> {
+                        String headerValue = invocation.getArgument(3, 
String.class);
+                        return headerValue.getBytes(StandardCharsets.UTF_8);
+                    });
+        }
+
+        when(keyConverter.fromConnectData(eq(topic), any(Headers.class), 
eq(KEY_SCHEMA), eq(KEY)))
+                .thenThrow(new RetriableException("Failed to convert key"));
+    }
+
     private void expectApplyTransformationChain() {
         when(transformationChain.apply(any(), any(SourceRecord.class)))
                 .thenAnswer(AdditionalAnswers.returnsSecondArg());
@@ -817,12 +944,19 @@ public class AbstractWorkerSourceTaskTest {
         return new RecordHeaders();
     }
 
+    private void createWorkerTask(TransformationChain transformationChain, 
RetryWithToleranceOperator toleranceOperator) {
+        createWorkerTask(keyConverter, valueConverter, headerConverter, 
toleranceOperator, Collections::emptyList,
+                transformationChain);
+    }
+
     private void createWorkerTask() {
-        createWorkerTask(keyConverter, valueConverter, headerConverter, 
RetryWithToleranceOperatorTest.noopOperator(), Collections::emptyList);
+        createWorkerTask(
+                keyConverter, valueConverter, headerConverter, 
RetryWithToleranceOperatorTest.noneOperator(), Collections::emptyList, 
transformationChain);
     }
 
     private void createWorkerTask(Converter keyConverter, Converter 
valueConverter, HeaderConverter headerConverter,
-                                  RetryWithToleranceOperator<SourceRecord> 
retryWithToleranceOperator, Supplier<List<ErrorReporter<SourceRecord>>> 
errorReportersSupplier) {
+                                  RetryWithToleranceOperator<SourceRecord> 
retryWithToleranceOperator, Supplier<List<ErrorReporter<SourceRecord>>> 
errorReportersSupplier,
+                                  TransformationChain transformationChain) {
         workerTask = new AbstractWorkerSourceTask(
                 taskId, sourceTask, statusListener, TargetState.STARTED, 
keyConverter, valueConverter, headerConverter, transformationChain,
                 sourceTaskContext, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, 
offsetStore,
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index be3dc2401ad..4ee0f61572c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -278,7 +278,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
     private void createWorkerTask(TargetState initialState, Converter 
keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
         workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, 
statusListener, initialState, keyConverter, valueConverter, headerConverter,
                 transformationChain, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, 
offsetStore,
-                config, clusterConfigState, metrics, errorHandlingMetrics, 
plugins.delegatingLoader(), time, 
RetryWithToleranceOperatorTest.noopOperator(), statusBackingStore,
+                config, clusterConfigState, metrics, errorHandlingMetrics, 
plugins.delegatingLoader(), time, 
RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore,
                 sourceConfig, Runnable::run, preProducerCheck, 
postProducerCheck, Collections::emptyList);
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index b3e0189f987..4e91183fd31 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -90,6 +90,7 @@ import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
+import static 
org.apache.kafka.connect.runtime.WorkerTestUtils.getTransformationChain;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -192,13 +193,18 @@ public class WorkerSinkTaskTest {
         createTask(initialState, keyConverter, valueConverter, 
headerConverter);
     }
 
+    private void createTask(TargetState initialState, TransformationChain 
transformationChain, RetryWithToleranceOperator toleranceOperator) {
+        createTask(initialState, keyConverter, valueConverter, 
headerConverter, toleranceOperator, Collections::emptyList, 
transformationChain);
+    }
+
     private void createTask(TargetState initialState, Converter keyConverter, 
Converter valueConverter, HeaderConverter headerConverter) {
-        createTask(initialState, keyConverter, valueConverter, 
headerConverter, RetryWithToleranceOperatorTest.noopOperator(), 
Collections::emptyList);
+        createTask(initialState, keyConverter, valueConverter, 
headerConverter, RetryWithToleranceOperatorTest.noneOperator(), 
Collections::emptyList, transformationChain);
     }
 
     private void createTask(TargetState initialState, Converter keyConverter, 
Converter valueConverter, HeaderConverter headerConverter,
                             RetryWithToleranceOperator<ConsumerRecord<byte[], 
byte[]>> retryWithToleranceOperator,
-                            Supplier<List<ErrorReporter<ConsumerRecord<byte[], 
byte[]>>>> errorReportersSupplier) {
+                            Supplier<List<ErrorReporter<ConsumerRecord<byte[], 
byte[]>>>> errorReportersSupplier,
+                            TransformationChain transformationChain) {
         workerTask = new WorkerSinkTask(
                 taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics,
                 keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
@@ -854,6 +860,103 @@ public class WorkerSinkTaskTest {
         verify(sinkTask).close(any(Collection.class));
     }
 
+    @Test
+    public void testRaisesFailedRetriableExceptionFromConvert() {
+        createTask(initialState);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        expectPollInitialAssignment()
+                .thenAnswer(expectConsumerPoll(1))
+                .thenAnswer(invocation -> {
+                    // stop the task during its second iteration
+                    workerTask.stop();
+                    return new ConsumerRecords<>(Map.of(), Map.of());
+                });
+        throwExceptionOnConversion(null, new RecordHeaders());
+
+        workerTask.iteration();
+
+        assertThrows(ConnectException.class, workerTask::execute);
+    }
+
+    @Test
+    public void testSkipsFailedRetriableExceptionFromConvert() {
+        createTask(initialState, keyConverter, valueConverter, headerConverter,
+                RetryWithToleranceOperatorTest.allOperator(), 
Collections::emptyList, transformationChain);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        expectPollInitialAssignment()
+                .thenAnswer(expectConsumerPoll(1))
+                .thenAnswer(invocation -> {
+                    // stop the task during its second iteration
+                    workerTask.stop();
+                    return new ConsumerRecords<>(Map.of(), Map.of());
+                });
+        throwExceptionOnConversion(null, new RecordHeaders());
+
+        workerTask.iteration();
+        workerTask.execute();
+
+        verify(sinkTask, times(3)).put(Collections.emptyList());
+    }
+
+    @Test
+    public void testRaisesFailedRetriableExceptionFromTransform() {
+        RetryWithToleranceOperator<RetriableException> 
retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator();
+        TransformationChain<RetriableException, SinkRecord> 
transformationChainRetriableException = getTransformationChain(
+                retryWithToleranceOperator, List.of(new 
RetriableException("Test")));
+        createTask(initialState, transformationChainRetriableException, 
retryWithToleranceOperator);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        expectPollInitialAssignment()
+                .thenAnswer(expectConsumerPoll(1))
+                .thenAnswer(invocation -> {
+                    // stop the task during its second iteration
+                    workerTask.stop();
+                    return new ConsumerRecords<>(Map.of(), Map.of());
+                });
+        expectConversion(null, new RecordHeaders());
+
+        workerTask.iteration();
+
+        assertThrows(ConnectException.class, workerTask::execute);
+    }
+
+    @Test
+    public void testSkipsFailedRetriableExceptionFromTransform() {
+        RetryWithToleranceOperator<RetriableException> 
retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator();
+        TransformationChain<RetriableException, SinkRecord> 
transformationChainRetriableException = getTransformationChain(
+                retryWithToleranceOperator, List.of(new 
RetriableException("Test")));
+        createTask(initialState, transformationChainRetriableException, 
retryWithToleranceOperator);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        expectPollInitialAssignment()
+                .thenAnswer(expectConsumerPoll(1))
+                .thenAnswer(invocation -> {
+                    // stop the task during its second iteration
+                    workerTask.stop();
+                    return new ConsumerRecords<>(Map.of(), Map.of());
+                });
+        expectConversion(null, new RecordHeaders());
+
+        workerTask.iteration();
+        workerTask.execute();
+
+        verify(sinkTask, times(3)).put(Collections.emptyList());
+    }
+
     @Test
     public void testRequestCommit() {
         createTask(initialState);
@@ -1758,7 +1861,7 @@ public class WorkerSinkTaskTest {
                 taskId, sinkTask, statusListener, TargetState.PAUSED, 
workerConfig, ClusterConfigState.EMPTY, metrics,
                 keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
                 transformationChain, mockConsumer, pluginLoader, time,
-                RetryWithToleranceOperatorTest.noopOperator(), null, 
statusBackingStore, Collections::emptyList);
+                RetryWithToleranceOperatorTest.noneOperator(), null, 
statusBackingStore, Collections::emptyList);
         mockConsumer.updateBeginningOffsets(
                 new HashMap<>() {{
                     put(TOPIC_PARTITION, 0L);
@@ -1852,6 +1955,19 @@ public class WorkerSinkTaskTest {
         expectTransformation(topicPrefix);
     }
 
+    private void expectConversion(final String topicPrefix, final Headers 
headers) {
+        when(keyConverter.toConnectData(TOPIC, headers, 
RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+        when(valueConverter.toConnectData(TOPIC, headers, 
RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
+
+        for (Header header : headers) {
+            when(headerConverter.toConnectHeader(TOPIC, header.key(), 
header.value())).thenReturn(new SchemaAndValue(VALUE_SCHEMA, new 
String(header.value())));
+        }
+    }
+
+    private void throwExceptionOnConversion(final String topicPrefix, final 
Headers headers) {
+        when(keyConverter.toConnectData(TOPIC, headers, 
RAW_KEY)).thenThrow(new RetriableException("Failed to convert"));
+    }
+
     @SuppressWarnings("unchecked")
     private void expectTransformation(final String topicPrefix) {
         when(transformationChain.apply(any(ProcessingContext.class), 
any(SinkRecord.class))).thenAnswer((Answer<SinkRecord>)
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 67978760e7b..2ed01a747a7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -177,7 +177,7 @@ public class WorkerSinkTaskThreadedTest {
         workerTask = new WorkerSinkTask(
                 taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics, keyConverter,
                 valueConverter, errorHandlingMetrics, headerConverter, 
transformationChain,
-                consumer, pluginLoader, time, 
RetryWithToleranceOperatorTest.noopOperator(), null, statusBackingStore,
+                consumer, pluginLoader, time, 
RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore,
                 Collections::emptyList);
         recordsReturned = 0;
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 3ddbf164494..a04b3bc7caa 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -231,7 +231,7 @@ public class WorkerSourceTaskTest {
     }
 
     private void createWorkerTask() {
-        createWorkerTask(TargetState.STARTED, 
RetryWithToleranceOperatorTest.noopOperator());
+        createWorkerTask(TargetState.STARTED, 
RetryWithToleranceOperatorTest.noneOperator());
     }
 
     private void createWorkerTaskWithErrorToleration() {
@@ -239,7 +239,7 @@ public class WorkerSourceTaskTest {
     }
 
     private void createWorkerTask(TargetState initialState) {
-        createWorkerTask(initialState, 
RetryWithToleranceOperatorTest.noopOperator());
+        createWorkerTask(initialState, 
RetryWithToleranceOperatorTest.noneOperator());
     }
 
     private void createWorkerTask(TargetState initialState, 
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
index 06b0e3fb55c..06c3a42b64f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
@@ -16,11 +16,18 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.storage.AppliedConnectorConfig;
 import org.apache.kafka.connect.storage.ClusterConfigState;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
+import org.mockito.Mockito;
+import org.mockito.stubbing.OngoingStubbing;
+
 import java.util.AbstractMap.SimpleEntry;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,6 +38,9 @@ import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class WorkerTestUtils {
 
@@ -155,4 +165,33 @@ public class WorkerTestUtils {
         assertEquals(expectedDelay, assignment.delay(),
                 "Wrong rebalance delay in " + assignment);
     }
+
+    public static <T, R extends ConnectRecord<R>> TransformationChain<T, R> 
getTransformationChain(
+            RetryWithToleranceOperator<T> toleranceOperator,
+            List<Object> results) {
+        Transformation<R> transformation = mock(Transformation.class);
+        OngoingStubbing<R> stub = when(transformation.apply(any()));
+        for (Object result: results) {
+            if (result instanceof Exception) {
+                stub = stub.thenThrow((Exception) result);
+            } else {
+                stub = stub.thenReturn((R) result);
+            }
+        }
+        return buildTransformationChain(transformation, toleranceOperator);
+    }
+
+    public static <T, R extends ConnectRecord<R>> TransformationChain<T, R> 
buildTransformationChain(
+            Transformation<R> transformation,
+            RetryWithToleranceOperator<T> toleranceOperator) {
+        Predicate<R> predicate = mock(Predicate.class);
+        when(predicate.test(any())).thenReturn(true);
+        TransformationStage<R> stage = new TransformationStage(
+                predicate,
+                false,
+                transformation);
+        TransformationChain<T, R> realTransformationChainRetriableException = 
new TransformationChain(List.of(stage), toleranceOperator);
+        TransformationChain<T, R> transformationChainRetriableException = 
Mockito.spy(realTransformationChainRetriableException);
+        return transformationChainRetriableException;
+    }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index dfa4aa353fe..23c4bc25553 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -97,10 +97,10 @@ public class RetryWithToleranceOperatorTest {
             put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
         }};
 
-    public static <T> RetryWithToleranceOperator<T> noopOperator() {
+    public static <T> RetryWithToleranceOperator<T> noneOperator() {
         return genericOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, NONE, new 
ErrorHandlingMetrics(
-                new ConnectorTaskId("noop-connector", -1),
-                new ConnectMetrics("noop-worker", new 
TestableWorkerConfig(PROPERTIES),
+                new ConnectorTaskId("errors-none-tolerate-connector", -1),
+                new ConnectMetrics("errors-none-tolerate-worker", new 
TestableWorkerConfig(PROPERTIES),
                         Time.SYSTEM, "test-cluster")));
     }
 
@@ -147,56 +147,77 @@ public class RetryWithToleranceOperatorTest {
 
     @Test
     public void testHandleExceptionInTransformations() {
-        testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception());
+        testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception(), ALL);
     }
 
+    @Test
+    public void testHandleRetriableExceptionInTransformationsToleranceNone() {
+        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.TRANSFORMATION, new 
RetriableException("Test"), NONE));
+    }
+
+
     @Test
     public void testHandleExceptionInHeaderConverter() {
-        testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception());
+        testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception(), 
ALL);
+    }
+
+    @Test
+    public void testHandleRetriableExceptionInHeaderConverterToleranceNone() {
+        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.HEADER_CONVERTER, new 
RetriableException("Test"), NONE));
     }
 
     @Test
     public void testHandleExceptionInValueConverter() {
-        testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception());
+        testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception(), 
ALL);
+    }
+
+    @Test
+    public void testHandleRetriableExceptionInValueConverterToleranceNone() {
+        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.VALUE_CONVERTER, new 
RetriableException("Test"), NONE));
     }
 
     @Test
     public void testHandleExceptionInKeyConverter() {
-        testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception());
+        testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception(), ALL);
+    }
+
+    @Test
+    public void testHandleRetriableExceptionInKeyConverterToleranceNone() {
+        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.KEY_CONVERTER, new RetriableException("Test"), 
NONE));
     }
 
     @Test
     public void testHandleExceptionInTaskPut() {
-        testHandleExceptionInStage(Stage.TASK_PUT, new 
org.apache.kafka.connect.errors.RetriableException("Test"));
+        testHandleExceptionInStage(Stage.TASK_PUT, new 
org.apache.kafka.connect.errors.RetriableException("Test"), ALL);
     }
 
     @Test
     public void testHandleExceptionInTaskPoll() {
-        testHandleExceptionInStage(Stage.TASK_POLL, new 
org.apache.kafka.connect.errors.RetriableException("Test"));
+        testHandleExceptionInStage(Stage.TASK_POLL, new 
org.apache.kafka.connect.errors.RetriableException("Test"), ALL);
     }
 
     @Test
     public void testThrowExceptionInTaskPut() {
-        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.TASK_PUT, new Exception()));
+        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.TASK_PUT, new Exception(), ALL));
     }
 
     @Test
     public void testThrowExceptionInTaskPoll() {
-        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.TASK_POLL, new Exception()));
+        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.TASK_POLL, new Exception(), ALL));
     }
 
     @Test
     public void testThrowExceptionInKafkaConsume() {
-        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception()));
+        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception(), ALL));
     }
 
     @Test
     public void testThrowExceptionInKafkaProduce() {
-        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception()));
+        assertThrows(ConnectException.class, () -> 
testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception(), ALL));
     }
 
-    private void testHandleExceptionInStage(Stage type, Exception ex) {
-        RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>> 
retryWithToleranceOperator = setupExecutor();
+    private void testHandleExceptionInStage(Stage type, Exception ex, 
ToleranceType toleranceType) {
+        RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>> 
retryWithToleranceOperator = setupExecutor(toleranceType);
         ProcessingContext<ConsumerRecord<byte[], byte[]>> context = new 
ProcessingContext<>(consumerRecord);
         Operation<?> exceptionThrower = () -> {
             throw ex;
@@ -205,8 +226,8 @@ public class RetryWithToleranceOperatorTest {
         assertTrue(context.failed());
     }
 
-    private <T> RetryWithToleranceOperator<T> setupExecutor() {
-        return genericOperator(0, ALL, errorHandlingMetrics);
+    private <T> RetryWithToleranceOperator<T> setupExecutor(ToleranceType 
toleranceType) {
+        return genericOperator(0, toleranceType, errorHandlingMetrics);
     }
 
     @Test


Reply via email to