This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b35c29401ac KAFKA-18073: Prevent dropped records from failed retriable
exceptions (#18146)
b35c29401ac is described below
commit b35c29401acb12c7a2b8a450c5164cddf82361c6
Author: Thomas Thornton <[email protected]>
AuthorDate: Thu Jan 9 13:13:11 2025 -0500
KAFKA-18073: Prevent dropped records from failed retriable exceptions
(#18146)
Reviewers: Greg Harris <[email protected]>
---
.../runtime/errors/RetryWithToleranceOperator.java | 3 +-
.../runtime/AbstractWorkerSourceTaskTest.java | 144 ++++++++++++++++++++-
.../runtime/ExactlyOnceWorkerSourceTaskTest.java | 2 +-
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 122 ++++++++++++++++-
.../runtime/WorkerSinkTaskThreadedTest.java | 2 +-
.../connect/runtime/WorkerSourceTaskTest.java | 4 +-
.../kafka/connect/runtime/WorkerTestUtils.java | 39 ++++++
.../errors/RetryWithToleranceOperatorTest.java | 55 +++++---
8 files changed, 340 insertions(+), 31 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 28886b3557c..2b9ba9fc5b7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -214,8 +214,7 @@ public class RetryWithToleranceOperator<T> implements
AutoCloseable {
errorHandlingMetrics.recordRetry();
} else {
log.trace("Can't retry. start={}, attempt={},
deadline={}", startTime, attempt, deadline);
- context.error(e);
- return null;
+ throw e;
}
if (stopping) {
log.trace("Shutdown has been scheduled. Marking operation
as failed.");
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
index 0cb4db70647..f33e9bc514b 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
@@ -80,6 +80,7 @@ import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
@@ -264,7 +265,7 @@ public class AbstractWorkerSourceTaskTest {
assertArrayEquals(SERIALIZED_KEY, sent.getValue().key());
assertArrayEquals(SERIALIZED_RECORD, sent.getValue().value());
-
+
verifyTaskGetTopic();
verifyTopicCreation();
}
@@ -362,8 +363,8 @@ public class AbstractWorkerSourceTaskTest {
StringConverter stringConverter = new StringConverter();
SampleConverterWithHeaders testConverter = new
SampleConverterWithHeaders();
- createWorkerTask(stringConverter, testConverter, stringConverter,
RetryWithToleranceOperatorTest.noopOperator(),
- Collections::emptyList);
+ createWorkerTask(stringConverter, testConverter, stringConverter,
RetryWithToleranceOperatorTest.noneOperator(),
+ Collections::emptyList, transformationChain);
expectSendRecord(null);
expectApplyTransformationChain();
@@ -706,6 +707,118 @@ public class AbstractWorkerSourceTaskTest {
verify(transformationChain, times(2)).apply(any(), eq(record3));
}
+ @Test
+ public void testSendRecordsFailedTransformationErrorToleranceNone() {
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ RetryWithToleranceOperator<RetriableException>
retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator();
+ TransformationChain<RetriableException, SourceRecord>
transformationChainRetriableException =
+
WorkerTestUtils.getTransformationChain(retryWithToleranceOperator, List.of(new
RetriableException("Test"), record1));
+ createWorkerTask(transformationChainRetriableException,
retryWithToleranceOperator);
+
+ expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+
+ TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0,
null, Collections.emptyList(), Collections.emptyList());
+ TopicDescription topicDesc = new TopicDescription(TOPIC, false,
Collections.singletonList(topicPartitionInfo));
+
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC,
topicDesc));
+
+ workerTask.toSend = Arrays.asList(record1);
+
+ // The transformation errored out so the error should be re-raised by
sendRecords with error tolerance None
+ Exception exception = assertThrows(ConnectException.class,
workerTask::sendRecords);
+ assertTrue(exception.getMessage().contains("Tolerance exceeded"));
+
+ // Ensure the transformation was called
+ verify(transformationChainRetriableException, times(1)).apply(any(),
eq(record1));
+
+ // The second transform call will succeed, batch should succeed at
sending the one record (none were skipped)
+ assertTrue(workerTask.sendRecords());
+ verifySendRecord(1);
+ }
+
+ @Test
+ public void testSendRecordsFailedTransformationErrorToleranceAll() {
+ RetryWithToleranceOperator<RetriableException>
retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator();
+ TransformationChain<RetriableException, SourceRecord>
transformationChainRetriableException = WorkerTestUtils.getTransformationChain(
+ retryWithToleranceOperator,
+ List.of(new RetriableException("Test")));
+
+ createWorkerTask(transformationChainRetriableException,
retryWithToleranceOperator);
+
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+
+ TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0,
null, Collections.emptyList(), Collections.emptyList());
+ TopicDescription topicDesc = new TopicDescription(TOPIC, false,
Collections.singletonList(topicPartitionInfo));
+
+ workerTask.toSend = Arrays.asList(record1);
+
+ // The transformation errored out so the error should be ignored & the
record skipped with error tolerance all
+ assertTrue(workerTask.sendRecords());
+
+ // Ensure the transformation was called
+ verify(transformationChainRetriableException, times(1)).apply(any(),
eq(record1));
+ }
+
+ @Test
+ public void testSendRecordsConversionExceptionErrorToleranceNone() {
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ RetryWithToleranceOperator<RetriableException>
retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator();
+ List<Object> results = Stream.of(record1, record2, record3)
+ .collect(Collectors.toList());
+ TransformationChain<RetriableException, SourceRecord> chain =
WorkerTestUtils.getTransformationChain(
+ retryWithToleranceOperator,
+ results);
+ createWorkerTask(chain, retryWithToleranceOperator);
+
+ // When we try to convert the key/value of each record, throw an
exception
+ throwExceptionWhenConvertKey(emptyHeaders(), TOPIC);
+
+ TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0,
null, Collections.emptyList(), Collections.emptyList());
+ TopicDescription topicDesc = new TopicDescription(TOPIC, false,
Collections.singletonList(topicPartitionInfo));
+
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC,
topicDesc));
+
+ workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+ // Send records should fail when errors.tolerance is none and the
conversion call fails
+ Exception exception = assertThrows(ConnectException.class,
workerTask::sendRecords);
+ assertTrue(exception.getMessage().contains("Tolerance exceeded"));
+ assertThrows(ConnectException.class, workerTask::sendRecords);
+ assertThrows(ConnectException.class, workerTask::sendRecords);
+
+ // Set the conversion call to succeed, batch should succeed at sending
all three records (none were skipped)
+ expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+ assertTrue(workerTask.sendRecords());
+ verifySendRecord(3);
+ }
+
+ @Test
+ public void testSendRecordsConversionExceptionErrorToleranceAll() {
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3,
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+ RetryWithToleranceOperator<RetriableException>
retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator();
+ List<Object> results = Stream.of(record1, record2, record3)
+ .collect(Collectors.toList());
+ TransformationChain<RetriableException, SourceRecord> chain =
WorkerTestUtils.getTransformationChain(
+ retryWithToleranceOperator,
+ results);
+ createWorkerTask(chain, retryWithToleranceOperator);
+
+ // When we try to convert the key/value of each record, throw an
exception
+ throwExceptionWhenConvertKey(emptyHeaders(), TOPIC);
+
+ workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+ // With errors.tolerance to all, the faiiled conversion should simply
skip the record, and record successful batch
+ assertTrue(workerTask.sendRecords());
+ }
+
private void expectSendRecord(Headers headers) {
if (headers != null)
expectConvertHeadersAndKeyValue(headers, TOPIC);
@@ -806,6 +919,20 @@ public class AbstractWorkerSourceTaskTest {
assertEquals(valueConverter.fromConnectData(topic, headers,
RECORD_SCHEMA, RECORD), SERIALIZED_RECORD);
}
+ private void throwExceptionWhenConvertKey(Headers headers, String topic) {
+ if (headers.iterator().hasNext()) {
+ when(headerConverter.fromConnectHeader(anyString(), anyString(),
eq(Schema.STRING_SCHEMA),
+ anyString()))
+ .thenAnswer((Answer<byte[]>) invocation -> {
+ String headerValue = invocation.getArgument(3,
String.class);
+ return headerValue.getBytes(StandardCharsets.UTF_8);
+ });
+ }
+
+ when(keyConverter.fromConnectData(eq(topic), any(Headers.class),
eq(KEY_SCHEMA), eq(KEY)))
+ .thenThrow(new RetriableException("Failed to convert key"));
+ }
+
private void expectApplyTransformationChain() {
when(transformationChain.apply(any(), any(SourceRecord.class)))
.thenAnswer(AdditionalAnswers.returnsSecondArg());
@@ -817,12 +944,19 @@ public class AbstractWorkerSourceTaskTest {
return new RecordHeaders();
}
+ private void createWorkerTask(TransformationChain transformationChain,
RetryWithToleranceOperator toleranceOperator) {
+ createWorkerTask(keyConverter, valueConverter, headerConverter,
toleranceOperator, Collections::emptyList,
+ transformationChain);
+ }
+
private void createWorkerTask() {
- createWorkerTask(keyConverter, valueConverter, headerConverter,
RetryWithToleranceOperatorTest.noopOperator(), Collections::emptyList);
+ createWorkerTask(
+ keyConverter, valueConverter, headerConverter,
RetryWithToleranceOperatorTest.noneOperator(), Collections::emptyList,
transformationChain);
}
private void createWorkerTask(Converter keyConverter, Converter
valueConverter, HeaderConverter headerConverter,
- RetryWithToleranceOperator<SourceRecord>
retryWithToleranceOperator, Supplier<List<ErrorReporter<SourceRecord>>>
errorReportersSupplier) {
+ RetryWithToleranceOperator<SourceRecord>
retryWithToleranceOperator, Supplier<List<ErrorReporter<SourceRecord>>>
errorReportersSupplier,
+ TransformationChain transformationChain) {
workerTask = new AbstractWorkerSourceTask(
taskId, sourceTask, statusListener, TargetState.STARTED,
keyConverter, valueConverter, headerConverter, transformationChain,
sourceTaskContext, producer, admin,
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter,
offsetStore,
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index be3dc2401ad..4ee0f61572c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -278,7 +278,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
private void createWorkerTask(TargetState initialState, Converter
keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask,
statusListener, initialState, keyConverter, valueConverter, headerConverter,
transformationChain, producer, admin,
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter,
offsetStore,
- config, clusterConfigState, metrics, errorHandlingMetrics,
plugins.delegatingLoader(), time,
RetryWithToleranceOperatorTest.noopOperator(), statusBackingStore,
+ config, clusterConfigState, metrics, errorHandlingMetrics,
plugins.delegatingLoader(), time,
RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore,
sourceConfig, Runnable::run, preProducerCheck,
postProducerCheck, Collections::emptyList);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index b3e0189f987..4e91183fd31 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -90,6 +90,7 @@ import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
+import static
org.apache.kafka.connect.runtime.WorkerTestUtils.getTransformationChain;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -192,13 +193,18 @@ public class WorkerSinkTaskTest {
createTask(initialState, keyConverter, valueConverter,
headerConverter);
}
+ private void createTask(TargetState initialState, TransformationChain
transformationChain, RetryWithToleranceOperator toleranceOperator) {
+ createTask(initialState, keyConverter, valueConverter,
headerConverter, toleranceOperator, Collections::emptyList,
transformationChain);
+ }
+
private void createTask(TargetState initialState, Converter keyConverter,
Converter valueConverter, HeaderConverter headerConverter) {
- createTask(initialState, keyConverter, valueConverter,
headerConverter, RetryWithToleranceOperatorTest.noopOperator(),
Collections::emptyList);
+ createTask(initialState, keyConverter, valueConverter,
headerConverter, RetryWithToleranceOperatorTest.noneOperator(),
Collections::emptyList, transformationChain);
}
private void createTask(TargetState initialState, Converter keyConverter,
Converter valueConverter, HeaderConverter headerConverter,
RetryWithToleranceOperator<ConsumerRecord<byte[],
byte[]>> retryWithToleranceOperator,
- Supplier<List<ErrorReporter<ConsumerRecord<byte[],
byte[]>>>> errorReportersSupplier) {
+ Supplier<List<ErrorReporter<ConsumerRecord<byte[],
byte[]>>>> errorReportersSupplier,
+ TransformationChain transformationChain) {
workerTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, metrics,
keyConverter, valueConverter, errorHandlingMetrics,
headerConverter,
@@ -854,6 +860,103 @@ public class WorkerSinkTaskTest {
verify(sinkTask).close(any(Collection.class));
}
+ @Test
+ public void testRaisesFailedRetriableExceptionFromConvert() {
+ createTask(initialState);
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ verifyInitializeTask();
+
+ expectPollInitialAssignment()
+ .thenAnswer(expectConsumerPoll(1))
+ .thenAnswer(invocation -> {
+ // stop the task during its second iteration
+ workerTask.stop();
+ return new ConsumerRecords<>(Map.of(), Map.of());
+ });
+ throwExceptionOnConversion(null, new RecordHeaders());
+
+ workerTask.iteration();
+
+ assertThrows(ConnectException.class, workerTask::execute);
+ }
+
+ @Test
+ public void testSkipsFailedRetriableExceptionFromConvert() {
+ createTask(initialState, keyConverter, valueConverter, headerConverter,
+ RetryWithToleranceOperatorTest.allOperator(),
Collections::emptyList, transformationChain);
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ verifyInitializeTask();
+
+ expectPollInitialAssignment()
+ .thenAnswer(expectConsumerPoll(1))
+ .thenAnswer(invocation -> {
+ // stop the task during its second iteration
+ workerTask.stop();
+ return new ConsumerRecords<>(Map.of(), Map.of());
+ });
+ throwExceptionOnConversion(null, new RecordHeaders());
+
+ workerTask.iteration();
+ workerTask.execute();
+
+ verify(sinkTask, times(3)).put(Collections.emptyList());
+ }
+
+ @Test
+ public void testRaisesFailedRetriableExceptionFromTransform() {
+ RetryWithToleranceOperator<RetriableException>
retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator();
+ TransformationChain<RetriableException, SinkRecord>
transformationChainRetriableException = getTransformationChain(
+ retryWithToleranceOperator, List.of(new
RetriableException("Test")));
+ createTask(initialState, transformationChainRetriableException,
retryWithToleranceOperator);
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ verifyInitializeTask();
+
+ expectPollInitialAssignment()
+ .thenAnswer(expectConsumerPoll(1))
+ .thenAnswer(invocation -> {
+ // stop the task during its second iteration
+ workerTask.stop();
+ return new ConsumerRecords<>(Map.of(), Map.of());
+ });
+ expectConversion(null, new RecordHeaders());
+
+ workerTask.iteration();
+
+ assertThrows(ConnectException.class, workerTask::execute);
+ }
+
+ @Test
+ public void testSkipsFailedRetriableExceptionFromTransform() {
+ RetryWithToleranceOperator<RetriableException>
retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator();
+ TransformationChain<RetriableException, SinkRecord>
transformationChainRetriableException = getTransformationChain(
+ retryWithToleranceOperator, List.of(new
RetriableException("Test")));
+ createTask(initialState, transformationChainRetriableException,
retryWithToleranceOperator);
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ verifyInitializeTask();
+
+ expectPollInitialAssignment()
+ .thenAnswer(expectConsumerPoll(1))
+ .thenAnswer(invocation -> {
+ // stop the task during its second iteration
+ workerTask.stop();
+ return new ConsumerRecords<>(Map.of(), Map.of());
+ });
+ expectConversion(null, new RecordHeaders());
+
+ workerTask.iteration();
+ workerTask.execute();
+
+ verify(sinkTask, times(3)).put(Collections.emptyList());
+ }
+
@Test
public void testRequestCommit() {
createTask(initialState);
@@ -1758,7 +1861,7 @@ public class WorkerSinkTaskTest {
taskId, sinkTask, statusListener, TargetState.PAUSED,
workerConfig, ClusterConfigState.EMPTY, metrics,
keyConverter, valueConverter, errorHandlingMetrics,
headerConverter,
transformationChain, mockConsumer, pluginLoader, time,
- RetryWithToleranceOperatorTest.noopOperator(), null,
statusBackingStore, Collections::emptyList);
+ RetryWithToleranceOperatorTest.noneOperator(), null,
statusBackingStore, Collections::emptyList);
mockConsumer.updateBeginningOffsets(
new HashMap<>() {{
put(TOPIC_PARTITION, 0L);
@@ -1852,6 +1955,19 @@ public class WorkerSinkTaskTest {
expectTransformation(topicPrefix);
}
+ private void expectConversion(final String topicPrefix, final Headers
headers) {
+ when(keyConverter.toConnectData(TOPIC, headers,
RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+ when(valueConverter.toConnectData(TOPIC, headers,
RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
+
+ for (Header header : headers) {
+ when(headerConverter.toConnectHeader(TOPIC, header.key(),
header.value())).thenReturn(new SchemaAndValue(VALUE_SCHEMA, new
String(header.value())));
+ }
+ }
+
+ private void throwExceptionOnConversion(final String topicPrefix, final
Headers headers) {
+ when(keyConverter.toConnectData(TOPIC, headers,
RAW_KEY)).thenThrow(new RetriableException("Failed to convert"));
+ }
+
@SuppressWarnings("unchecked")
private void expectTransformation(final String topicPrefix) {
when(transformationChain.apply(any(ProcessingContext.class),
any(SinkRecord.class))).thenAnswer((Answer<SinkRecord>)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 67978760e7b..2ed01a747a7 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -177,7 +177,7 @@ public class WorkerSinkTaskThreadedTest {
workerTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, metrics, keyConverter,
valueConverter, errorHandlingMetrics, headerConverter,
transformationChain,
- consumer, pluginLoader, time,
RetryWithToleranceOperatorTest.noopOperator(), null, statusBackingStore,
+ consumer, pluginLoader, time,
RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore,
Collections::emptyList);
recordsReturned = 0;
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 3ddbf164494..a04b3bc7caa 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -231,7 +231,7 @@ public class WorkerSourceTaskTest {
}
private void createWorkerTask() {
- createWorkerTask(TargetState.STARTED,
RetryWithToleranceOperatorTest.noopOperator());
+ createWorkerTask(TargetState.STARTED,
RetryWithToleranceOperatorTest.noneOperator());
}
private void createWorkerTaskWithErrorToleration() {
@@ -239,7 +239,7 @@ public class WorkerSourceTaskTest {
}
private void createWorkerTask(TargetState initialState) {
- createWorkerTask(initialState,
RetryWithToleranceOperatorTest.noopOperator());
+ createWorkerTask(initialState,
RetryWithToleranceOperatorTest.noneOperator());
}
private void createWorkerTask(TargetState initialState,
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator) {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
index 06b0e3fb55c..06c3a42b64f 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
@@ -16,11 +16,18 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.mockito.Mockito;
+import org.mockito.stubbing.OngoingStubbing;
+
import java.util.AbstractMap.SimpleEntry;
import java.util.Collections;
import java.util.HashMap;
@@ -31,6 +38,9 @@ import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class WorkerTestUtils {
@@ -155,4 +165,33 @@ public class WorkerTestUtils {
assertEquals(expectedDelay, assignment.delay(),
"Wrong rebalance delay in " + assignment);
}
+
+ public static <T, R extends ConnectRecord<R>> TransformationChain<T, R>
getTransformationChain(
+ RetryWithToleranceOperator<T> toleranceOperator,
+ List<Object> results) {
+ Transformation<R> transformation = mock(Transformation.class);
+ OngoingStubbing<R> stub = when(transformation.apply(any()));
+ for (Object result: results) {
+ if (result instanceof Exception) {
+ stub = stub.thenThrow((Exception) result);
+ } else {
+ stub = stub.thenReturn((R) result);
+ }
+ }
+ return buildTransformationChain(transformation, toleranceOperator);
+ }
+
+ public static <T, R extends ConnectRecord<R>> TransformationChain<T, R>
buildTransformationChain(
+ Transformation<R> transformation,
+ RetryWithToleranceOperator<T> toleranceOperator) {
+ Predicate<R> predicate = mock(Predicate.class);
+ when(predicate.test(any())).thenReturn(true);
+ TransformationStage<R> stage = new TransformationStage(
+ predicate,
+ false,
+ transformation);
+ TransformationChain<T, R> realTransformationChainRetriableException =
new TransformationChain(List.of(stage), toleranceOperator);
+ TransformationChain<T, R> transformationChainRetriableException =
Mockito.spy(realTransformationChainRetriableException);
+ return transformationChainRetriableException;
+ }
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index dfa4aa353fe..23c4bc25553 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -97,10 +97,10 @@ public class RetryWithToleranceOperatorTest {
put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
TestConverter.class.getName());
}};
- public static <T> RetryWithToleranceOperator<T> noopOperator() {
+ public static <T> RetryWithToleranceOperator<T> noneOperator() {
return genericOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, NONE, new
ErrorHandlingMetrics(
- new ConnectorTaskId("noop-connector", -1),
- new ConnectMetrics("noop-worker", new
TestableWorkerConfig(PROPERTIES),
+ new ConnectorTaskId("errors-none-tolerate-connector", -1),
+ new ConnectMetrics("errors-none-tolerate-worker", new
TestableWorkerConfig(PROPERTIES),
Time.SYSTEM, "test-cluster")));
}
@@ -147,56 +147,77 @@ public class RetryWithToleranceOperatorTest {
@Test
public void testHandleExceptionInTransformations() {
- testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception());
+ testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception(), ALL);
}
+ @Test
+ public void testHandleRetriableExceptionInTransformationsToleranceNone() {
+ assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.TRANSFORMATION, new
RetriableException("Test"), NONE));
+ }
+
+
@Test
public void testHandleExceptionInHeaderConverter() {
- testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception());
+ testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception(),
ALL);
+ }
+
+ @Test
+ public void testHandleRetriableExceptionInHeaderConverterToleranceNone() {
+ assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.HEADER_CONVERTER, new
RetriableException("Test"), NONE));
}
@Test
public void testHandleExceptionInValueConverter() {
- testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception());
+ testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception(),
ALL);
+ }
+
+ @Test
+ public void testHandleRetriableExceptionInValueConverterToleranceNone() {
+ assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.VALUE_CONVERTER, new
RetriableException("Test"), NONE));
}
@Test
public void testHandleExceptionInKeyConverter() {
- testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception());
+ testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception(), ALL);
+ }
+
+ @Test
+ public void testHandleRetriableExceptionInKeyConverterToleranceNone() {
+ assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.KEY_CONVERTER, new RetriableException("Test"),
NONE));
}
@Test
public void testHandleExceptionInTaskPut() {
- testHandleExceptionInStage(Stage.TASK_PUT, new
org.apache.kafka.connect.errors.RetriableException("Test"));
+ testHandleExceptionInStage(Stage.TASK_PUT, new
org.apache.kafka.connect.errors.RetriableException("Test"), ALL);
}
@Test
public void testHandleExceptionInTaskPoll() {
- testHandleExceptionInStage(Stage.TASK_POLL, new
org.apache.kafka.connect.errors.RetriableException("Test"));
+ testHandleExceptionInStage(Stage.TASK_POLL, new
org.apache.kafka.connect.errors.RetriableException("Test"), ALL);
}
@Test
public void testThrowExceptionInTaskPut() {
- assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.TASK_PUT, new Exception()));
+ assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.TASK_PUT, new Exception(), ALL));
}
@Test
public void testThrowExceptionInTaskPoll() {
- assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.TASK_POLL, new Exception()));
+ assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.TASK_POLL, new Exception(), ALL));
}
@Test
public void testThrowExceptionInKafkaConsume() {
- assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception()));
+ assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception(), ALL));
}
@Test
public void testThrowExceptionInKafkaProduce() {
- assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception()));
+ assertThrows(ConnectException.class, () ->
testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception(), ALL));
}
- private void testHandleExceptionInStage(Stage type, Exception ex) {
- RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>>
retryWithToleranceOperator = setupExecutor();
+ private void testHandleExceptionInStage(Stage type, Exception ex,
ToleranceType toleranceType) {
+ RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>>
retryWithToleranceOperator = setupExecutor(toleranceType);
ProcessingContext<ConsumerRecord<byte[], byte[]>> context = new
ProcessingContext<>(consumerRecord);
Operation<?> exceptionThrower = () -> {
throw ex;
@@ -205,8 +226,8 @@ public class RetryWithToleranceOperatorTest {
assertTrue(context.failed());
}
- private <T> RetryWithToleranceOperator<T> setupExecutor() {
- return genericOperator(0, ALL, errorHandlingMetrics);
+ private <T> RetryWithToleranceOperator<T> setupExecutor(ToleranceType
toleranceType) {
+ return genericOperator(0, toleranceType, errorHandlingMetrics);
}
@Test