[ 
https://issues.apache.org/jira/browse/KAFKA-6738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495549#comment-16495549
 ] 

ASF GitHub Bot commented on KAFKA-6738:
---------------------------------------

ewencp closed pull request #5065: KAFKA-6738: Implement error handling for 
source and sink tasks
URL: https://github.com/apache/kafka/pull/5065
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 5bbe148f644..d1e97b2db63 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -64,15 +64,20 @@
      * @param time     the time; may not be null
      */
     public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
+        this(workerId, time, 
config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG),
+                
config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+                
config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG),
+                
config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
 MetricsReporter.class));
+    }
+
+    public ConnectMetrics(String workerId, Time time, int numSamples, long 
sampleWindowMs, String metricsRecordingLevel,
+                          List<MetricsReporter> reporters) {
         this.workerId = workerId;
         this.time = time;
 
-        MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
-                                                      
.timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                                                                  
TimeUnit.MILLISECONDS).recordLevel(
-                        
Sensor.RecordingLevel.forName(config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG)));
-        List<MetricsReporter> reporters = 
config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
-                                                                        
MetricsReporter.class);
+        MetricConfig metricConfig = new MetricConfig().samples(numSamples)
+                .timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel(
+                        Sensor.RecordingLevel.forName(metricsRecordingLevel));
         reporters.add(new JmxReporter(JMX_PREFIX));
         this.metrics = new Metrics(metricConfig, reporters, time);
         LOG.debug("Registering Connect metrics with JMX for worker '{}'", 
workerId);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
index d78576ec3ee..04699ea9ecb 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -34,6 +34,7 @@
     public static final String SINK_TASK_GROUP_NAME = "sink-task-metrics";
     public static final String WORKER_GROUP_NAME = "connect-worker-metrics";
     public static final String WORKER_REBALANCE_GROUP_NAME = 
"connect-worker-rebalance-metrics";
+    public static final String TASK_ERROR_HANDLING_GROUP_NAME = 
"task-error-metrics";
 
     private final List<MetricNameTemplate> allTemplates = new ArrayList<>();
     public final MetricNameTemplate connectorStatus;
@@ -93,6 +94,14 @@
     public final MetricNameTemplate rebalanceTimeMax;
     public final MetricNameTemplate rebalanceTimeAvg;
     public final MetricNameTemplate rebalanceTimeSinceLast;
+    public final MetricNameTemplate recordProcessingFailures;
+    public final MetricNameTemplate recordProcessingErrors;
+    public final MetricNameTemplate recordsSkipped;
+    public final MetricNameTemplate retries;
+    public final MetricNameTemplate errorsLogged;
+    public final MetricNameTemplate dlqProduceRequests;
+    public final MetricNameTemplate dlqProduceFailures;
+    public final MetricNameTemplate lastErrorTimestamp;
 
     public ConnectMetricsRegistry() {
         this(new LinkedHashSet<String>());
@@ -294,6 +303,28 @@ public ConnectMetricsRegistry(Set<String> tags) {
                                           "The average time in milliseconds 
spent by this worker to rebalance.", rebalanceTags);
         rebalanceTimeSinceLast = 
createTemplate("time-since-last-rebalance-ms", WORKER_REBALANCE_GROUP_NAME,
                                                 "The time in milliseconds 
since this worker completed the most recent rebalance.", rebalanceTags);
+
+        /***** Task Error Handling Metrics *****/
+        Set<String> taskErrorHandlingTags = new LinkedHashSet<>(tags);
+        taskErrorHandlingTags.add(CONNECTOR_TAG_NAME);
+        taskErrorHandlingTags.add(TASK_TAG_NAME);
+
+        recordProcessingFailures = createTemplate("total-record-failures", 
TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of record processing failures in this task.", 
taskErrorHandlingTags);
+        recordProcessingErrors = createTemplate("total-record-errors", 
TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of record processing errors in this task. ", 
taskErrorHandlingTags);
+        recordsSkipped = createTemplate("total-records-skipped", 
TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of records skipped due to errors.", 
taskErrorHandlingTags);
+        retries = createTemplate("total-retries", 
TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of operations retried.", taskErrorHandlingTags);
+        errorsLogged = createTemplate("total-errors-logged", 
TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of errors that were logged.", 
taskErrorHandlingTags);
+        dlqProduceRequests = 
createTemplate("deadletterqueue-produce-requests", 
TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of attempted writes to the dead letter queue.", 
taskErrorHandlingTags);
+        dlqProduceFailures = 
createTemplate("deadletterqueue-produce-failures", 
TASK_ERROR_HANDLING_GROUP_NAME,
+                "The number of failed writes to the dead letter queue.", 
taskErrorHandlingTags);
+        lastErrorTimestamp = createTemplate("last-error-timestamp", 
TASK_ERROR_HANDLING_GROUP_NAME,
+                "The epoch timestamp when this task last encountered an 
error.", taskErrorHandlingTags);
     }
 
     private MetricNameTemplate createTemplate(String name, String group, 
String doc, Set<String> tags) {
@@ -337,4 +368,8 @@ public String workerGroupName() {
     public String workerRebalanceGroupName() {
         return WORKER_REBALANCE_GROUP_NAME;
     }
+
+    public String taskErrorHandlingGroupName() {
+        return TASK_ERROR_HANDLING_GROUP_NAME;
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
index e1d8b1f2e85..3680905be2a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -17,25 +17,32 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.transforms.Transformation;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
 public class TransformationChain<R extends ConnectRecord<R>> {
 
     private final List<Transformation<R>> transformations;
+    private final RetryWithToleranceOperator retryWithToleranceOperator;
 
-    public TransformationChain(List<Transformation<R>> transformations) {
+    public TransformationChain(List<Transformation<R>> transformations, 
RetryWithToleranceOperator retryWithToleranceOperator) {
         this.transformations = transformations;
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
     }
 
     public R apply(R record) {
         if (transformations.isEmpty()) return record;
 
-        for (Transformation<R> transformation : transformations) {
-            record = transformation.apply(record);
+        for (final Transformation<R> transformation : transformations) {
+            final R current = record;
+
+            // execute the operation
+            record = retryWithToleranceOperator.execute(() -> 
transformation.apply(current), Stage.TRANSFORMATION, transformation.getClass());
+
             if (record == null) break;
         }
 
@@ -61,8 +68,4 @@ public int hashCode() {
         return Objects.hash(transformations);
     }
 
-    public static <R extends ConnectRecord<R>> TransformationChain<R> noOp() {
-        return new 
TransformationChain<R>(Collections.<Transformation<R>>emptyList());
-    }
-
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1c6465855ff..c58eddfb2f7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -30,6 +30,11 @@
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
+import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.LogReporter;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -448,26 +453,71 @@ private WorkerTask buildWorkerTask(ConnectorConfig 
connConfig,
                                        Converter valueConverter,
                                        HeaderConverter headerConverter,
                                        ClassLoader loader) {
+        ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator();
+        
retryWithToleranceOperator.configure(connConfig.originalsWithPrefix("errors."));
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
         // Decide which type of worker task we need based on the type of task.
         if (task instanceof SourceTask) {
-            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(connConfig.<SourceRecord>transformations());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
connConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(connConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
             OffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
-                    headerConverter, transformationChain, producer, 
offsetReader, offsetWriter, config, metrics, loader, time);
+                    headerConverter, transformationChain, producer, 
offsetReader, offsetWriter, config, metrics, loader,
+                    time, retryWithToleranceOperator);
         } else if (task instanceof SinkTask) {
-            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connConfig.<SinkRecord>transformations());
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
connConfig, errorHandlingMetrics));
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, metrics, keyConverter,
-                    valueConverter, headerConverter, transformationChain, 
loader, time);
+                    valueConverter, headerConverter, transformationChain, 
loader, time,
+                    retryWithToleranceOperator);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or 
SinkTask", task);
             throw new ConnectException("Tasks must be a subclass of either 
SourceTask or SinkTask");
         }
     }
 
+    ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
+        return new ErrorHandlingMetrics(id, metrics);
+    }
+
+    private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, 
ConnectorConfig connConfig,
+                                                    ErrorHandlingMetrics 
errorHandlingMetrics) {
+        ArrayList<ErrorReporter> reporters = new ArrayList<>();
+        LogReporter logReporter = new LogReporter(id);
+        
logReporter.configure(connConfig.originalsWithPrefix(LogReporter.PREFIX + "."));
+        logReporter.metrics(errorHandlingMetrics);
+        reporters.add(logReporter);
+
+        // check if topic for dead letter queue exists
+        String topic = connConfig.getString(DeadLetterQueueReporter.PREFIX + 
"." + DeadLetterQueueReporter.DLQ_TOPIC_NAME);
+        if (topic != null && !topic.isEmpty()) {
+            DeadLetterQueueReporter reporter = 
DeadLetterQueueReporter.createAndSetup(config, connConfig, producerProps);
+            
reporter.configure(connConfig.originalsWithPrefix(DeadLetterQueueReporter.PREFIX
 + "."));
+            reporters.add(reporter);
+        }
+
+        return reporters;
+    }
+
+    private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId id, 
ConnectorConfig connConfig,
+                                                      ErrorHandlingMetrics 
errorHandlingMetrics) {
+        List<ErrorReporter> reporters = new ArrayList<>();
+        LogReporter logReporter = new LogReporter(id);
+        
logReporter.configure(connConfig.originalsWithPrefix(LogReporter.PREFIX + "."));
+        logReporter.metrics(errorHandlingMetrics);
+        reporters.add(logReporter);
+
+        return reporters;
+    }
+
     private void stopTask(ConnectorTaskId taskId) {
         WorkerTask task = tasks.get(taskId);
         if (task == null) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index e629798e9e5..3296007b364 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -40,6 +40,8 @@
 import org.apache.kafka.connect.header.ConnectHeaders;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -100,8 +102,9 @@ public WorkerSinkTask(ConnectorTaskId id,
                           HeaderConverter headerConverter,
                           TransformationChain<SinkRecord> transformationChain,
                           ClassLoader loader,
-                          Time time) {
-        super(id, statusListener, initialState, loader, connectMetrics);
+                          Time time,
+                          RetryWithToleranceOperator 
retryWithToleranceOperator) {
+        super(id, statusListener, initialState, loader, connectMetrics, 
retryWithToleranceOperator);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -477,42 +480,52 @@ private void convertMessages(ConsumerRecords<byte[], 
byte[]> msgs) {
         for (ConsumerRecord<byte[], byte[]> msg : msgs) {
             log.trace("{} Consuming and converting message in topic '{}' 
partition {} at offset {} and timestamp {}",
                     this, msg.topic(), msg.partition(), msg.offset(), 
msg.timestamp());
-            SchemaAndValue keyAndSchema = toConnectData(keyConverter, "key", 
msg, msg.key());
-            SchemaAndValue valueAndSchema = toConnectData(valueConverter, 
"value", msg, msg.value());
-            Headers headers = convertHeadersFor(msg);
-            Long timestamp = 
ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
-            SinkRecord origRecord = new SinkRecord(msg.topic(), 
msg.partition(),
-                    keyAndSchema.schema(), keyAndSchema.value(),
-                    valueAndSchema.schema(), valueAndSchema.value(),
-                    msg.offset(),
-                    timestamp,
-                    msg.timestampType(),
-                    headers);
-            log.trace("{} Applying transformations to record in topic '{}' 
partition {} at offset {} and timestamp {} with key {} and value {}",
-                    this, msg.topic(), msg.partition(), msg.offset(), 
timestamp, keyAndSchema.value(), valueAndSchema.value());
-            SinkRecord transRecord = transformationChain.apply(origRecord);
+
+            retryWithToleranceOperator.consumerRecord(msg);
+
+            SinkRecord transRecord = convertAndTransformRecord(msg);
+
             origOffsets.put(
-                    new TopicPartition(origRecord.topic(), 
origRecord.kafkaPartition()),
-                    new OffsetAndMetadata(origRecord.kafkaOffset() + 1)
+                    new TopicPartition(msg.topic(), msg.partition()),
+                    new OffsetAndMetadata(msg.offset() + 1)
             );
             if (transRecord != null) {
                 messageBatch.add(transRecord);
             } else {
-                log.trace("{} Transformations returned null, so dropping 
record in topic '{}' partition {} at offset {} and timestamp {} with key {} and 
value {}",
-                        this, msg.topic(), msg.partition(), msg.offset(), 
timestamp, keyAndSchema.value(), valueAndSchema.value());
+                log.trace(
+                        "{} Converters and transformations returned null, 
possibly because of too many retries, so " +
+                                "dropping record in topic '{}' partition {} at 
offset {}",
+                        this, msg.topic(), msg.partition(), msg.offset()
+                );
             }
         }
         sinkTaskMetricsGroup.recordConsumedOffsets(origOffsets);
     }
 
-    private SchemaAndValue toConnectData(Converter converter, String 
converterName, ConsumerRecord<byte[], byte[]> msg, byte[] data) {
-        try {
-            return converter.toConnectData(msg.topic(), data);
-        } catch (Throwable e) {
-            String str = String.format("Error converting message %s in topic 
'%s' partition %d at offset %d and timestamp %d",
-                    converterName, msg.topic(), msg.partition(), msg.offset(), 
msg.timestamp());
-            throw new ConnectException(str, e);
+    private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], 
byte[]> msg) {
+        SchemaAndValue keyAndSchema = retryWithToleranceOperator.execute(() -> 
keyConverter.toConnectData(msg.topic(), msg.key()),
+                Stage.KEY_CONVERTER, keyConverter.getClass());
+
+        SchemaAndValue valueAndSchema = retryWithToleranceOperator.execute(() 
-> valueConverter.toConnectData(msg.topic(), msg.value()),
+                Stage.VALUE_CONVERTER, valueConverter.getClass());
+
+        Headers headers = retryWithToleranceOperator.execute(() -> 
convertHeadersFor(msg), Stage.HEADER_CONVERTER, headerConverter.getClass());
+
+        if (retryWithToleranceOperator.failed()) {
+            return null;
         }
+
+        Long timestamp = 
ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
+        SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(),
+                keyAndSchema.schema(), keyAndSchema.value(),
+                valueAndSchema.schema(), valueAndSchema.value(),
+                msg.offset(),
+                timestamp,
+                msg.timestampType(),
+                headers);
+        log.trace("{} Applying transformations to record in topic '{}' 
partition {} at offset {} and timestamp {} with key {} and value {}",
+                this, msg.topic(), msg.partition(), msg.offset(), timestamp, 
keyAndSchema.value(), valueAndSchema.value());
+        return transformationChain.apply(origRecord);
     }
 
     private Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> record) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index f17475dacfc..e7b92a4d403 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -34,6 +34,8 @@
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -103,8 +105,10 @@ public WorkerSourceTask(ConnectorTaskId id,
                             WorkerConfig workerConfig,
                             ConnectMetrics connectMetrics,
                             ClassLoader loader,
-                            Time time) {
-        super(id, statusListener, initialState, loader, connectMetrics);
+                            Time time,
+                            RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+        super(id, statusListener, initialState, loader, connectMetrics, 
retryWithToleranceOperator);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -209,7 +213,7 @@ public void execute() {
                 if (toSend == null) {
                     log.trace("{} Nothing to send to Kafka. Polling source for 
additional records", this);
                     long start = time.milliseconds();
-                    toSend = task.poll();
+                    toSend = poll();
                     if (toSend != null) {
                         recordPollReturned(toSend.size(), time.milliseconds() 
- start);
                     }
@@ -231,6 +235,44 @@ public void execute() {
         }
     }
 
+    protected List<SourceRecord> poll() throws InterruptedException {
+        try {
+            return task.poll();
+        } catch (RetriableException e) {
+            log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
+            // Do nothing. Let the framework poll whenever it's ready.
+            return null;
+        }
+    }
+
+    /**
+     * Convert the source record into a producer record.
+     *
+     * @param record the transformed record
+     * @return the producer record which can sent over to Kafka. A null is 
returned if the input is null or
+     * if an error was encountered during any of the converter stages.
+     */
+    private ProducerRecord<byte[], byte[]> 
convertTransformedRecord(SourceRecord record) {
+        if (record == null) {
+            return null;
+        }
+
+        RecordHeaders headers = retryWithToleranceOperator.execute(() -> 
convertHeaderFor(record), Stage.HEADER_CONVERTER, headerConverter.getClass());
+
+        byte[] key = retryWithToleranceOperator.execute(() -> 
keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key()),
+                Stage.KEY_CONVERTER, keyConverter.getClass());
+
+        byte[] value = retryWithToleranceOperator.execute(() -> 
valueConverter.fromConnectData(record.topic(), record.valueSchema(), 
record.value()),
+                Stage.VALUE_CONVERTER, valueConverter.getClass());
+
+        if (retryWithToleranceOperator.failed()) {
+            return null;
+        }
+
+        return new ProducerRecord<>(record.topic(), record.kafkaPartition(),
+                ConnectUtils.checkAndConvertTimestamp(record.timestamp()), 
key, value, headers);
+    }
+
     /**
      * Try to send a batch of records. If a send fails and is retriable, this 
saves the remainder of the batch so it can
      * be retried after backing off. If a send fails and is not retriable, 
this will throw a ConnectException.
@@ -241,19 +283,16 @@ private boolean sendRecords() {
         recordBatch(toSend.size());
         final SourceRecordWriteCounter counter = new 
SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
         for (final SourceRecord preTransformRecord : toSend) {
-            final SourceRecord record = 
transformationChain.apply(preTransformRecord);
 
-            if (record == null) {
+            retryWithToleranceOperator.sourceRecord(preTransformRecord);
+            final SourceRecord record = 
transformationChain.apply(preTransformRecord);
+            final ProducerRecord<byte[], byte[]> producerRecord = 
convertTransformedRecord(record);
+            if (producerRecord == null || retryWithToleranceOperator.failed()) 
{
                 counter.skipRecord();
                 commitTaskRecord(preTransformRecord);
                 continue;
             }
 
-            RecordHeaders headers = convertHeaderFor(record);
-            byte[] key = keyConverter.fromConnectData(record.topic(), 
record.keySchema(), record.key());
-            byte[] value = valueConverter.fromConnectData(record.topic(), 
record.valueSchema(), record.value());
-            final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(record.topic(), record.kafkaPartition(),
-                    ConnectUtils.checkAndConvertTimestamp(record.timestamp()), 
key, value, headers);
             log.trace("{} Appending record with key {}, value {}", this, 
record.key(), record.value());
             // We need this queued first since the callback could happen 
immediately (even synchronously in some cases).
             // Because of this we need to be careful about handling retries -- 
we always save the previously attempted
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index d563f9bdede..6ea1ddd96d1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -28,6 +28,7 @@
 import org.apache.kafka.connect.runtime.AbstractStatus.State;
 import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
@@ -60,11 +61,14 @@
     private volatile boolean stopping;   // indicates whether the Worker has 
asked the task to stop
     private volatile boolean cancelled;  // indicates whether the Worker has 
cancelled the task (e.g. because of slow shutdown)
 
+    protected final RetryWithToleranceOperator retryWithToleranceOperator;
+
     public WorkerTask(ConnectorTaskId id,
                       TaskStatus.Listener statusListener,
                       TargetState initialState,
                       ClassLoader loader,
-                      ConnectMetrics connectMetrics) {
+                      ConnectMetrics connectMetrics,
+                      RetryWithToleranceOperator retryWithToleranceOperator) {
         this.id = id;
         this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, 
statusListener);
         this.statusListener = taskMetricsGroup;
@@ -73,6 +77,7 @@ public WorkerTask(ConnectorTaskId id,
         this.stopping = false;
         this.cancelled = false;
         this.taskMetricsGroup.recordState(this.targetState);
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
     }
 
     public ConnectorTaskId id() {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
new file mode 100644
index 00000000000..454a619ec76
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static java.util.Collections.singleton;
+
+/**
+ * Write the original consumed record into a dead letter queue. The dead 
letter queue is a Kafka topic located
+ * on the same cluster used by the worker to maintain internal topics. Each 
connector is typically configured
+ * with its own Kafka topic dead letter queue. By default, the topic name is 
not set, and if the
+ * connector config doesn't specify one, this feature is disabled.
+ */
+public class DeadLetterQueueReporter implements ErrorReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(DeadLetterQueueReporter.class);
+
+    private static final short DLQ_MAX_DESIRED_REPLICATION_FACTOR = 3;
+    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+
+    public static final String PREFIX = "errors.deadletterqueue";
+
+    public static final String DLQ_TOPIC_NAME = "topic.name";
+    public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic 
where these messages are written to.";
+    public static final String DLQ_TOPIC_DEFAULT = "";
+
+    private DeadLetterQueueReporterConfig config;
+    private KafkaProducer<byte[], byte[]> kafkaProducer;
+    private ErrorHandlingMetrics errorHandlingMetrics;
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(DLQ_TOPIC_NAME, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, 
ConfigDef.Importance.HIGH, DLQ_TOPIC_NAME_DOC);
+
+    public static DeadLetterQueueReporter createAndSetup(WorkerConfig 
workerConfig,
+                                                         ConnectorConfig 
connConfig, Map<String, Object> producerProps) {
+        String topic = connConfig.getString(PREFIX + "." + DLQ_TOPIC_NAME);
+
+        try (AdminClient admin = AdminClient.create(workerConfig.originals())) 
{
+            if (!admin.listTopics().names().get().contains(topic)) {
+                log.error("Topic {} doesn't exist. Will attempt to create 
topic.", topic);
+                NewTopic schemaTopicRequest = new NewTopic(topic, 
DLQ_NUM_DESIRED_PARTITIONS, DLQ_MAX_DESIRED_REPLICATION_FACTOR);
+                admin.createTopics(singleton(schemaTopicRequest)).all().get();
+            }
+        } catch (InterruptedException e) {
+            throw new ConnectException("Could not initialize dead letter queue 
with topic=" + topic, e);
+        } catch (ExecutionException e) {
+            if (!(e.getCause() instanceof TopicExistsException)) {
+                throw new ConnectException("Could not initialize dead letter 
queue with topic=" + topic, e);
+            }
+        }
+
+        KafkaProducer<byte[], byte[]> dlqProducer = new 
KafkaProducer<>(producerProps);
+        return new DeadLetterQueueReporter(dlqProducer);
+    }
+
+    /**
+     * Initialize the dead letter queue reporter with a {@link KafkaProducer}.
+     *
+     * @param kafkaProducer a Kafka Producer to produce the original consumed 
records.
+     */
+    // Visible for testing
+    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer) {
+        this.kafkaProducer = kafkaProducer;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        config = new DeadLetterQueueReporterConfig(configs);
+    }
+
+    @Override
+    public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
+        this.errorHandlingMetrics = errorHandlingMetrics;
+    }
+
+    /**
+     * Write the raw records into a Kafka topic.
+     *
+     * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
+     */
+    public void report(ProcessingContext context) {
+        if (config.topic().isEmpty()) {
+            return;
+        }
+
+        errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
+
+        ConsumerRecord<byte[], byte[]> originalMessage = 
context.consumerRecord();
+        if (originalMessage == null) {
+            errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
+            return;
+        }
+
+        ProducerRecord<byte[], byte[]> producerRecord;
+        if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) {
+            producerRecord = new ProducerRecord<>(config.topic(), null,
+                    originalMessage.key(), originalMessage.value(), 
originalMessage.headers());
+        } else {
+            producerRecord = new ProducerRecord<>(config.topic(), null, 
originalMessage.timestamp(),
+                    originalMessage.key(), originalMessage.value(), 
originalMessage.headers());
+        }
+
+        this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
+            if (exception != null) {
+                log.error("Could not produce message to dead letter queue. 
topic=" + config.topic(), exception);
+                errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
+            }
+        });
+    }
+
+    static class DeadLetterQueueReporterConfig extends AbstractConfig {
+        public DeadLetterQueueReporterConfig(Map<?, ?> originals) {
+            super(CONFIG_DEF, originals, true);
+        }
+
+        /**
+         * @return name of the dead letter queue topic.
+         */
+        public String topic() {
+            return getString(DLQ_TOPIC_NAME);
+        }
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
new file mode 100644
index 00000000000..c589012d02c
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.ConnectMetrics;
+import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.ArrayList;
+
+/**
+ * Contains various sensors used for monitoring errors.
+ */
+public class ErrorHandlingMetrics {
+
+    private final Time time = new SystemTime();
+
+    private final ConnectMetrics.MetricGroup metricGroup;
+
+    // metrics
+    private final Sensor recordProcessingFailures;
+    private final Sensor recordProcessingErrors;
+    private final Sensor recordsSkipped;
+    private final Sensor retries;
+    private final Sensor errorsLogged;
+    private final Sensor dlqProduceRequests;
+    private final Sensor dlqProduceFailures;
+    private long lastErrorTime = 0;
+
+    // for testing only
+    public ErrorHandlingMetrics() {
+        this(new ConnectorTaskId("noop-connector", -1),
+                new ConnectMetrics("noop-worker", new SystemTime(), 2, 3000, 
Sensor.RecordingLevel.INFO.toString(),
+                        new ArrayList<>()));
+    }
+
+    public ErrorHandlingMetrics(ConnectorTaskId id, ConnectMetrics 
connectMetrics) {
+
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = 
connectMetrics.group(registry.taskErrorHandlingGroupName(),
+                registry.connectorTagName(), id.connector(), 
registry.taskTagName(), Integer.toString(id.task()));
+
+        // prevent collisions by removing any previously created metrics in 
this group.
+        metricGroup.close();
+
+        recordProcessingFailures = metricGroup.sensor("total-record-failures");
+        
recordProcessingFailures.add(metricGroup.metricName(registry.recordProcessingFailures),
 new Total());
+
+        recordProcessingErrors = metricGroup.sensor("total-record-errors");
+        
recordProcessingErrors.add(metricGroup.metricName(registry.recordProcessingErrors),
 new Total());
+
+        recordsSkipped = metricGroup.sensor("total-records-skipped");
+        recordsSkipped.add(metricGroup.metricName(registry.recordsSkipped), 
new Total());
+
+        retries = metricGroup.sensor("total-retries");
+        retries.add(metricGroup.metricName(registry.retries), new Total());
+
+        errorsLogged = metricGroup.sensor("total-errors-logged");
+        errorsLogged.add(metricGroup.metricName(registry.errorsLogged), new 
Total());
+
+        dlqProduceRequests = 
metricGroup.sensor("deadletterqueue-produce-requests");
+        
dlqProduceRequests.add(metricGroup.metricName(registry.dlqProduceRequests), new 
Total());
+
+        dlqProduceFailures = 
metricGroup.sensor("deadletterqueue-produce-failures");
+        
dlqProduceFailures.add(metricGroup.metricName(registry.dlqProduceFailures), new 
Total());
+
+        metricGroup.addValueMetric(registry.lastErrorTimestamp, now -> 
lastErrorTime);
+    }
+
+    /**
+     * Increment the number of failed operations (retriable and non-retriable).
+     */
+    public void recordFailure() {
+        recordProcessingFailures.record();
+    }
+
+    /**
+     * Increment the number of operations which could not be successfully 
executed.
+     */
+    public void recordError() {
+        recordProcessingErrors.record();
+    }
+
+    /**
+     * Increment the number of records skipped.
+     */
+    public void recordSkipped() {
+        recordsSkipped.record();
+    }
+
+    /**
+     * The number of retries made while executing operations.
+     */
+    public void recordRetry() {
+        retries.record();
+    }
+
+    /**
+     * The number of errors logged by the {@link LogReporter}.
+     */
+    public void recordErrorLogged() {
+        errorsLogged.record();
+    }
+
+    /**
+     * The number of produce requests to the {@link DeadLetterQueueReporter}.
+     */
+    public void recordDeadLetterQueueProduceRequest() {
+        dlqProduceRequests.record();
+    }
+
+    /**
+     * The number of produce requests to the {@link DeadLetterQueueReporter} 
which failed to be successfully produced into Kafka.
+     */
+    public void recordDeadLetterQueueProduceFailed() {
+        dlqProduceFailures.record();
+    }
+
+    /**
+     * Record the time of error.
+     */
+    public void recordErrorTimestamp() {
+        this.lastErrorTime = time.milliseconds();
+    }
+
+    /**
+     * @return the metric group for this class.
+     */
+    public ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
new file mode 100644
index 00000000000..e71b6bc8ba9
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.common.Configurable;
+
+/**
+ * Report an error using the information contained in the {@link 
ProcessingContext}.
+ */
+public interface ErrorReporter extends Configurable {
+
+    /**
+     * Report an error.
+     *
+     * @param context the processing context (cannot be null).
+     */
+    void report(ProcessingContext context);
+
+    /**
+     * Provides the container for error handling metrics to implementations. 
This method will be called once the error
+     * reporter object is instantiated.
+     *
+     * @param errorHandlingMetrics metrics for error handling (cannot be null).
+     */
+    void metrics(ErrorHandlingMetrics errorHandlingMetrics);
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
new file mode 100644
index 00000000000..1d2c08fd18b
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Writes errors and their context to application logs.
+ */
+public class LogReporter implements ErrorReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LogReporter.class);
+
+    public static final String PREFIX = "errors.log";
+
+    public static final String LOG_ENABLE = "enable";
+    public static final String LOG_ENABLE_DOC = "If true, log to application 
logs the errors and the information describing where they occurred.";
+    public static final boolean LOG_ENABLE_DEFAULT = false;
+
+    public static final String LOG_INCLUDE_MESSAGES = "include.messages";
+    public static final String LOG_INCLUDE_MESSAGES_DOC = "If true, include in 
the application log the Connect key, value, and other details of records that 
resulted in errors and failures.";
+    public static final boolean LOG_INCLUDE_MESSAGES_DEFAULT = false;
+
+    private final ConnectorTaskId id;
+
+    private LogReporterConfig config;
+    private ErrorHandlingMetrics errorHandlingMetrics;
+
+    public LogReporter(ConnectorTaskId id) {
+        this.id = id;
+    }
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+                .define(LOG_ENABLE, ConfigDef.Type.BOOLEAN, 
LOG_ENABLE_DEFAULT, ConfigDef.Importance.MEDIUM, LOG_ENABLE_DOC)
+                .define(LOG_INCLUDE_MESSAGES, ConfigDef.Type.BOOLEAN, 
LOG_INCLUDE_MESSAGES_DEFAULT, ConfigDef.Importance.MEDIUM, 
LOG_INCLUDE_MESSAGES_DOC);
+
+    /**
+     * Log error context.
+     *
+     * @param context the processing context.
+     */
+    @Override
+    public void report(ProcessingContext context) {
+        if (!config.isEnabled()) {
+            return;
+        }
+
+        if (!context.failed()) {
+            return;
+        }
+
+        log.error(message(context), context.error());
+        errorHandlingMetrics.recordErrorLogged();
+    }
+
+    @Override
+    public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
+        this.errorHandlingMetrics = errorHandlingMetrics;
+    }
+
+    // Visible for testing
+    String message(ProcessingContext context) {
+        return String.format("Error encountered in task %s. %s", 
String.valueOf(id), context.toString(config.canLogMessages()));
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        config = new LogReporterConfig(configs);
+    }
+
+    private static class LogReporterConfig extends AbstractConfig {
+        public LogReporterConfig(Map<?, ?> originals) {
+            super(CONFIG_DEF, originals, true);
+        }
+
+        /**
+         * @return true, if logging of error context is desired; false 
otherwise.
+         */
+        public boolean isEnabled() {
+            return getBoolean(LOG_ENABLE);
+        }
+
+        /**
+         * @return if false, the connect record which caused the exception is 
not logged.
+         */
+        public boolean canLogMessages() {
+            return getBoolean(LOG_INCLUDE_MESSAGES);
+        }
+    }
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Operation.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Operation.java
new file mode 100644
index 00000000000..3e0f792c096
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Operation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A recoverable operation evaluated in the connector pipeline.
+ *
+ * @param <V> return type of the result of the operation.
+ */
+public interface Operation<V> extends Callable<V> {
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
new file mode 100644
index 00000000000..f826d74fe70
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * Contains all the metadata related to the currently evaluating operation. 
Only one instance of this class is meant
+ * to exist per task in a JVM.
+ */
+class ProcessingContext {
+
+    private Collection<ErrorReporter> reporters = Collections.emptyList();
+
+    private ConsumerRecord<byte[], byte[]> consumedMessage;
+    private SourceRecord sourceRecord;
+
+    /**
+     * The following fields need to be reset every time a new record is seen.
+     */
+
+    private Stage position;
+    private Class<?> klass;
+    private int attempt;
+    private Throwable error;
+
+    /**
+     * Reset the internal fields before executing operations on a new record.
+     */
+    private void reset() {
+        attempt = 0;
+        position = null;
+        klass = null;
+        error = null;
+    }
+
+    /**
+     * Set the record consumed from Kafka in a sink connector.
+     *
+     * @param consumedMessage the record
+     */
+    public void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) 
{
+        this.consumedMessage = consumedMessage;
+        reset();
+    }
+
+    /**
+     * @return the record consumed from Kafka. could be null
+     */
+    public ConsumerRecord<byte[], byte[]> consumerRecord() {
+        return consumedMessage;
+    }
+
+    /**
+     * @return the source record being processed.
+     */
+    public SourceRecord sourceRecord() {
+        return sourceRecord;
+    }
+
+    /**
+     * Set the source record being processed in the connect pipeline.
+     *
+     * @param record the source record
+     */
+    public void sourceRecord(SourceRecord record) {
+        this.sourceRecord = record;
+        reset();
+    }
+
+    /**
+     * Set the stage in the connector pipeline which is currently executing.
+     *
+     * @param position the stage
+     */
+    public void position(Stage position) {
+        this.position = position;
+    }
+
+    /**
+     * @return the stage in the connector pipeline which is currently 
executing.
+     */
+    public Stage stage() {
+        return position;
+    }
+
+    /**
+     * @return the class which is going to execute the current operation.
+     */
+    public Class<?> executingClass() {
+        return klass;
+    }
+
+    /**
+     * @param klass set the class which is currently executing.
+     */
+    public void executingClass(Class<?> klass) {
+        this.klass = klass;
+    }
+
+    /**
+     * A helper method to set both the stage and the class.
+     *
+     * @param stage the stage
+     * @param klass the class which will execute the operation in this stage.
+     */
+    public void currentContext(Stage stage, Class<?> klass) {
+        position(stage);
+        executingClass(klass);
+    }
+
+    /**
+     * Report errors. Should be called only if an error was encountered while 
executing the operation.
+     */
+    public void report() {
+        for (ErrorReporter reporter: reporters) {
+            reporter.report(this);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return toString(false);
+    }
+
+    public String toString(boolean includeMessage) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("Executing stage '");
+        builder.append(stage().name());
+        builder.append("' with class '");
+        builder.append(executingClass() == null ? "null" : 
executingClass().getName());
+        builder.append('\'');
+        if (includeMessage && sourceRecord() != null) {
+            builder.append(", where source record is = ");
+            builder.append(sourceRecord());
+        } else if (includeMessage && consumerRecord() != null) {
+            ConsumerRecord<byte[], byte[]> msg = consumerRecord();
+            builder.append(", where consumed record is ");
+            builder.append("{topic='").append(msg.topic()).append('\'');
+            builder.append(", partition=").append(msg.partition());
+            builder.append(", offset=").append(msg.offset());
+            if (msg.timestampType() == TimestampType.CREATE_TIME || 
msg.timestampType() == TimestampType.LOG_APPEND_TIME) {
+                builder.append(", timestamp=").append(msg.timestamp());
+                builder.append(", timestampType=").append(msg.timestampType());
+            }
+            builder.append("}");
+        }
+        builder.append('.');
+        return builder.toString();
+    }
+
+    /**
+     * @param attempt the number of attempts made to execute the current 
operation.
+     */
+    public void attempt(int attempt) {
+        this.attempt = attempt;
+    }
+
+    /**
+     * @return the number of attempts made to execute the current operation.
+     */
+    public int attempt() {
+        return attempt;
+    }
+
+    /**
+     * @return the error (if any) which was encountered while processing the 
current stage.
+     */
+    public Throwable error() {
+        return error;
+    }
+
+    /**
+     * The error (if any) which was encountered while processing the current 
stage.
+     *
+     * @param error the error
+     */
+    public void error(Throwable error) {
+        this.error = error;
+    }
+
+    /**
+     * @return true, if the last operation encountered an error; false 
otherwise
+     */
+    public boolean failed() {
+        return error() != null;
+    }
+
+    /**
+     * Set the error reporters for this connector.
+     *
+     * @param reporters the error reporters (should not be null).
+     */
+    public void reporters(Collection<ErrorReporter> reporters) {
+        Objects.requireNonNull(reporters);
+        this.reporters = reporters;
+    }
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
new file mode 100644
index 00000000000..941abf3d3d2
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+/**
+ * Attempt to recover a failed operation with retries and tolerance limits.
+ * <p>
+ *
+ * A retry is attempted if the operation throws a {@link RetriableException}. 
Retries are accompanied by exponential backoffs, starting with
+ * {@link #RETRIES_DELAY_MIN_MS}, up to what is specified with {@link 
RetryWithToleranceOperatorConfig#retryDelayMax()}.
+ * Including the first attempt and future retries, the total time taken to 
evaluate the operation should be within
+ * {@link RetryWithToleranceOperatorConfig#retryDelayMax()} millis.
+ * <p>
+ *
+ * This executor will tolerate failures, as specified by {@link 
RetryWithToleranceOperatorConfig#toleranceLimit()}.
+ * For transformations and converters, all exceptions are tolerated. For 
others operations, only {@link RetriableException} are tolerated.
+ * <p>
+ *
+ * There are three outcomes to executing an operation. It might succeed, in 
which case the result is returned to the caller.
+ * If it fails, this class does one of these two things: (1) if the failure 
occurred due to a tolerable exception, then
+ * set appropriate error reason in the {@link ProcessingContext} and return 
null, or (2) if the exception is not tolerated,
+ * then it is wrapped into a ConnectException and rethrown to the caller.
+ * <p>
+ */
+public class RetryWithToleranceOperator {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RetryWithToleranceOperator.class);
+
+    public static final String RETRY_TIMEOUT = "retry.timeout";
+    public static final String RETRY_TIMEOUT_DOC = "The total duration in 
milliseconds a failed operation will be retried for.";
+    public static final long RETRY_TIMEOUT_DEFAULT = 0;
+
+    public static final String RETRY_DELAY_MAX_MS = "retry.delay.max.ms";
+    public static final String RETRY_DELAY_MAX_MS_DOC = "The maximum duration 
between two consecutive retries (in milliseconds).";
+    public static final long RETRY_DELAY_MAX_MS_DEFAULT = 60000;
+
+    public static final long RETRIES_DELAY_MIN_MS = 300;
+
+    public static final String TOLERANCE_LIMIT = "allowed.max";
+    public static final String TOLERANCE_LIMIT_DOC = "Fail the task if we 
exceed specified number of errors overall.";
+    public static final String TOLERANCE_LIMIT_DEFAULT = "none";
+
+    // for testing only
+    public static final RetryWithToleranceOperator NOOP_OPERATOR = new 
RetryWithToleranceOperator();
+    static {
+        NOOP_OPERATOR.configure(Collections.emptyMap());
+        NOOP_OPERATOR.metrics(new ErrorHandlingMetrics());
+    }
+
+    private static final Map<Stage, Class<? extends Exception>> 
TOLERABLE_EXCEPTIONS = new HashMap<>();
+    static {
+        TOLERABLE_EXCEPTIONS.put(Stage.TRANSFORMATION, Exception.class);
+        TOLERABLE_EXCEPTIONS.put(Stage.HEADER_CONVERTER, Exception.class);
+        TOLERABLE_EXCEPTIONS.put(Stage.KEY_CONVERTER, Exception.class);
+        TOLERABLE_EXCEPTIONS.put(Stage.VALUE_CONVERTER, Exception.class);
+    }
+
+    private long totalFailures = 0;
+    private final Time time;
+    private RetryWithToleranceOperatorConfig config;
+    private ErrorHandlingMetrics errorHandlingMetrics;
+
+    protected ProcessingContext context = new ProcessingContext();
+
+    public RetryWithToleranceOperator() {
+        this(new SystemTime());
+    }
+
+    // Visible for testing
+    public RetryWithToleranceOperator(Time time) {
+        this.time = time;
+    }
+
+    static ConfigDef getConfigDef() {
+        return new ConfigDef()
+                .define(RETRY_TIMEOUT, ConfigDef.Type.LONG, 
RETRY_TIMEOUT_DEFAULT, ConfigDef.Importance.HIGH, RETRY_TIMEOUT_DOC)
+                .define(RETRY_DELAY_MAX_MS, ConfigDef.Type.LONG, 
RETRY_DELAY_MAX_MS_DEFAULT, atLeast(1), ConfigDef.Importance.MEDIUM, 
RETRY_DELAY_MAX_MS_DOC)
+                .define(TOLERANCE_LIMIT, ConfigDef.Type.STRING, 
TOLERANCE_LIMIT_DEFAULT, in("none", "all"), ConfigDef.Importance.HIGH, 
TOLERANCE_LIMIT_DOC);
+    }
+
+    /**
+     * Execute the recoverable operation. If the operation is already in a 
failed state, then simply return
+     * with the existing failure.
+     *
+     * @param operation the recoverable operation
+     * @param <V> return type of the result of the operation.
+     * @return result of the operation
+     */
+    public <V> V execute(Operation<V> operation, Stage stage, Class<?> 
executingClass) {
+        context.currentContext(stage, executingClass);
+
+        if (context.failed()) {
+            log.debug("ProcessingContext is already in failed state. Ignoring 
requested operation.");
+            return null;
+        }
+
+        try {
+            Class<? extends Exception> ex = 
TOLERABLE_EXCEPTIONS.getOrDefault(context.stage(), RetriableException.class);
+            return execAndHandleError(operation, ex);
+        } finally {
+            if (context.failed()) {
+                errorHandlingMetrics.recordError();
+                context.report();
+            }
+        }
+    }
+
+    /**
+     * Attempt to execute an operation. Retry if a {@link RetriableException} 
is raised. Re-throw everything else.
+     *
+     * @param operation the operation to be executed.
+     * @param <V> the return type of the result of the operation.
+     * @return the result of the operation.
+     * @throws Exception rethrow if a non-retriable Exception is thrown by the 
operation
+     */
+    protected <V> V execAndRetry(Operation<V> operation) throws Exception {
+        int attempt = 0;
+        long startTime = time.milliseconds();
+        long deadline = startTime + config.retryTimeout();
+        do {
+            try {
+                attempt++;
+                return operation.call();
+            } catch (RetriableException e) {
+                log.trace("Caught a retriable exception while executing {} 
operation with {}", context.stage(), context.executingClass());
+                errorHandlingMetrics.recordFailure();
+                if (checkRetry(startTime)) {
+                    backoff(attempt, deadline);
+                    if (Thread.currentThread().isInterrupted()) {
+                        log.trace("Thread was interrupted. Marking operation 
as failed.");
+                        context.error(e);
+                        return null;
+                    }
+                    errorHandlingMetrics.recordRetry();
+                } else {
+                    log.trace("Can't retry. start={}, attempt={}, 
deadline={}", startTime, attempt, deadline);
+                    context.error(e);
+                    return null;
+                }
+            } finally {
+                context.attempt(attempt);
+            }
+        } while (true);
+    }
+
+    /**
+     * Execute a given operation multiple times (if needed), and tolerate 
certain exceptions.
+     *
+     * @param operation the operation to be executed.
+     * @param tolerated the class of exceptions which can be tolerated.
+     * @param <V> The return type of the result of the operation.
+     * @return the result of the operation
+     */
+    // Visible for testing
+    protected <V> V execAndHandleError(Operation<V> operation, Class<? extends 
Exception> tolerated) {
+        try {
+            V result = execAndRetry(operation);
+            if (context.failed()) {
+                markAsFailed();
+                errorHandlingMetrics.recordSkipped();
+            }
+            return result;
+        } catch (Exception e) {
+            errorHandlingMetrics.recordFailure();
+            markAsFailed();
+            context.error(e);
+
+            if (!tolerated.isAssignableFrom(e.getClass())) {
+                throw new ConnectException("Unhandled exception in error 
handler", e);
+            }
+
+            if (!withinToleranceLimits()) {
+                throw new ConnectException("Tolerance exceeded in error 
handler", e);
+            }
+
+            errorHandlingMetrics.recordSkipped();
+            return null;
+        }
+    }
+
+    // Visible for testing
+    void markAsFailed() {
+        errorHandlingMetrics.recordErrorTimestamp();
+        totalFailures++;
+    }
+
+    // Visible for testing
+    boolean withinToleranceLimits() {
+        switch (config.toleranceLimit()) {
+            case NONE:
+                if (totalFailures > 0) return false;
+            case ALL:
+                return true;
+            default:
+                throw new ConfigException("Unknown tolerance type: {}", 
config.toleranceLimit());
+        }
+    }
+
+    // Visible for testing
+    boolean checkRetry(long startTime) {
+        return (time.milliseconds() - startTime) < config.retryTimeout();
+    }
+
+    // Visible for testing
+    void backoff(int attempt, long deadline) {
+        int numRetry = attempt - 1;
+        long delay = RETRIES_DELAY_MIN_MS << numRetry;
+        if (delay > config.retryDelayMax()) {
+            delay = 
ThreadLocalRandom.current().nextLong(config.retryDelayMax());
+        }
+        if (delay + time.milliseconds() > deadline) {
+            delay = deadline - time.milliseconds();
+        }
+        log.debug("Sleeping for {} millis", delay);
+        time.sleep(delay);
+    }
+
+    public void configure(Map<String, ?> configs) {
+        config = new RetryWithToleranceOperatorConfig(configs);
+    }
+
+    public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
+        this.errorHandlingMetrics = errorHandlingMetrics;
+    }
+
+    static class RetryWithToleranceOperatorConfig extends AbstractConfig {
+        public RetryWithToleranceOperatorConfig(Map<?, ?> originals) {
+            super(getConfigDef(), originals, true);
+        }
+
+        /**
+         * @return the total time an operation can take to succeed (including 
the first attempt and retries).
+         */
+        public long retryTimeout() {
+            return getLong(RETRY_TIMEOUT);
+        }
+
+        /**
+         * @return the maximum delay between two subsequent retries in 
milliseconds.
+         */
+        public long retryDelayMax() {
+            return getLong(RETRY_DELAY_MAX_MS);
+        }
+
+        /**
+         * @return determine how many errors to tolerate.
+         */
+        public ToleranceType toleranceLimit() {
+            return ToleranceType.fromString(getString(TOLERANCE_LIMIT));
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "RetryWithToleranceOperator{" +
+                "config=" + config +
+                '}';
+    }
+
+    /**
+     * Set the error reporters for this connector.
+     *
+     * @param reporters the error reporters (should not be null).
+     */
+    public void reporters(List<ErrorReporter> reporters) {
+        this.context.reporters(reporters);
+    }
+
+    /**
+     * Set the source record being processed in the connect pipeline.
+     *
+     * @param preTransformRecord the source record
+     */
+    public void sourceRecord(SourceRecord preTransformRecord) {
+        this.context.sourceRecord(preTransformRecord);
+    }
+
+    /**
+     * Set the record consumed from Kafka in a sink connector.
+     *
+     * @param consumedMessage the record
+     */
+    public void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) 
{
+        this.context.consumerRecord(consumedMessage);
+    }
+
+    /**
+     * @return true, if the last operation encountered an error; false 
otherwise
+     */
+    public boolean failed() {
+        return this.context.failed();
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java
new file mode 100644
index 00000000000..b9aa1f21bc6
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/Stage.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+/**
+ * A logical stage in a Connect pipeline.
+ */
+public enum Stage {
+
+    /**
+     * When calling the poll() method on a SourceConnector
+     */
+    TASK_POLL,
+
+    /**
+     * When calling the put() method on a SinkConnector
+     */
+    TASK_PUT,
+
+    /**
+     * When running any transformation operation on a record
+     */
+    TRANSFORMATION,
+
+    /**
+     * When using the key converter to serialize/deserialize keys in 
ConnectRecords
+     */
+    KEY_CONVERTER,
+
+    /**
+     * When using the value converter to serialize/deserialize values in 
ConnectRecords
+     */
+    VALUE_CONVERTER,
+
+    /**
+     * When using the header converter to serialize/deserialize headers in 
ConnectRecords
+     */
+    HEADER_CONVERTER,
+
+    /**
+     * When producing to Kafka topic
+     */
+    KAFKA_PRODUCE,
+
+    /**
+     * When consuming from a Kafka topic
+     */
+    KAFKA_CONSUME
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java
new file mode 100644
index 00000000000..79956ac7963
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import java.util.Locale;
+
+/**
+ * The different levels of error tolerance.
+ */
+public enum ToleranceType {
+
+    /**
+     * Tolerate no errors.
+     */
+    NONE,
+
+    /**
+     * Tolerate all errors.
+     */
+    ALL;
+
+    public static ToleranceType fromString(String typeStr) {
+        return "ALL".equals(typeStr.toUpperCase(Locale.ROOT)) ? 
ToleranceType.ALL : ToleranceType.NONE;
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
new file mode 100644
index 00000000000..5a8bcc5a6cc
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.LogReporter;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({WorkerSinkTask.class, WorkerSourceTask.class})
+@PowerMockIgnore("javax.management.*")
+public class ErrorHandlingTaskTest {
+
+    private static final String TOPIC = "test";
+    private static final int PARTITION1 = 12;
+    private static final int PARTITION2 = 13;
+    private static final long FIRST_OFFSET = 45;
+
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
+
+    static {
+        TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
+        TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSinkTask.class.getName());
+    }
+
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
+
+    private static final Map<String, String> OPERATION_EXECUTOR_PROPS = new 
HashMap<>();
+
+    static {
+        
OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
+        // wait up to 1 minute for an operation
+        OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.RETRY_TIMEOUT, 
"60000");
+        // wait up 5 seconds between subsequent retries
+        
OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, 
"5000");
+    }
+
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private TargetState initialState = TargetState.STARTED;
+    private Time time;
+    private MockConnectMetrics metrics;
+    @SuppressWarnings("unused")
+    @Mock
+    private SinkTask sinkTask;
+    @SuppressWarnings("unused")
+    @Mock
+    private SourceTask sourceTask;
+    private Capture<WorkerSinkTaskContext> sinkTaskContext = 
EasyMock.newCapture();
+    private WorkerConfig workerConfig;
+    @Mock
+    private PluginClassLoader pluginLoader;
+    @SuppressWarnings("unused")
+    @Mock
+    private HeaderConverter headerConverter;
+    private WorkerSinkTask workerSinkTask;
+    private WorkerSourceTask workerSourceTask;
+    @SuppressWarnings("unused")
+    @Mock
+    private KafkaConsumer<byte[], byte[]> consumer;
+    @SuppressWarnings("unused")
+    @Mock
+    private KafkaProducer<byte[], byte[]> producer;
+
+    @Mock
+    OffsetStorageReader offsetReader;
+    @Mock
+    OffsetStorageWriter offsetWriter;
+
+    private Capture<ConsumerRebalanceListener> rebalanceListener = 
EasyMock.newCapture();
+    @SuppressWarnings("unused")
+    @Mock
+    private TaskStatus.Listener statusListener;
+
+    private ErrorHandlingMetrics errorHandlingMetrics;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, 0, 0);
+        metrics = new MockConnectMetrics();
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", 
"/tmp/connect.offsets");
+        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
+        workerConfig = new StandaloneConfig(workerProps);
+        errorHandlingMetrics = new ErrorHandlingMetrics(taskId, metrics);
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) {
+            metrics.stop();
+        }
+    }
+
+    @Test
+    public void testErrorHandlingInSinkTasks() throws Exception {
+        LogReporter reporter = new LogReporter(taskId);
+        Map<String, Object> reportProps = new HashMap<>();
+        reportProps.put(LogReporter.LOG_ENABLE, "true");
+        reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true");
+        reporter.configure(reportProps);
+        reporter.metrics(errorHandlingMetrics);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
+        retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+        createSinkTask(initialState, retryWithToleranceOperator);
+
+        expectInitializeTask();
+
+        // valid json
+        ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(TOPIC, 
PARTITION1, FIRST_OFFSET, null, "{\"a\": 10}".getBytes());
+        // bad json
+        ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(TOPIC, 
PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes());
+
+        
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record1));
+        
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record2));
+
+        sinkTask.put(EasyMock.anyObject());
+        EasyMock.expectLastCall().times(2);
+
+        PowerMock.replayAll();
+
+        workerSinkTask.initialize(TASK_CONFIG);
+        workerSinkTask.initializeAndStart();
+        workerSinkTask.iteration();
+
+        workerSinkTask.iteration();
+
+        // two records were consumed from Kafka
+        assertSinkMetricValue("sink-record-read-total", 2.0);
+        // only one was written to the task
+        assertSinkMetricValue("sink-record-send-total", 1.0);
+        // one record completely failed (converter issues)
+        assertErrorHandlingMetricValue("total-record-errors", 1.0);
+        // 2 failures in the transformation, and 1 in the converter
+        assertErrorHandlingMetricValue("total-record-failures", 3.0);
+        // one record completely failed (converter issues), and thus was 
skipped
+        assertErrorHandlingMetricValue("total-records-skipped", 1.0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testErrorHandlingInSourceTasks() throws Exception {
+        LogReporter reporter = new LogReporter(taskId);
+        Map<String, Object> reportProps = new HashMap<>();
+        reportProps.put(LogReporter.LOG_ENABLE, "true");
+        reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true");
+        reporter.configure(reportProps);
+        reporter.metrics(errorHandlingMetrics);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
+        retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+        createSourceTask(initialState, retryWithToleranceOperator);
+
+        // valid json
+        Schema valSchema = SchemaBuilder.struct().field("val", 
Schema.INT32_SCHEMA).build();
+        Struct struct1 = new Struct(valSchema).put("val", 1234);
+        SourceRecord record1 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct1);
+        Struct struct2 = new Struct(valSchema).put("val", 6789);
+        SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
+
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+
+        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+
+        offsetWriter.offset(EasyMock.anyObject(), EasyMock.anyObject());
+        EasyMock.expectLastCall().times(2);
+        sourceTask.initialize(EasyMock.anyObject());
+        EasyMock.expectLastCall();
+
+        sourceTask.start(EasyMock.anyObject());
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
+        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
+        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andReturn(null).times(2);
+
+        PowerMock.replayAll();
+
+        workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.execute();
+
+        // two records were consumed from Kafka
+        assertSourceMetricValue("source-record-poll-total", 2.0);
+        // only one was written to the task
+        assertSourceMetricValue("source-record-write-total", 0.0);
+        // one record completely failed (converter issues)
+        assertErrorHandlingMetricValue("total-record-errors", 0.0);
+        // 2 failures in the transformation, and 1 in the converter
+        assertErrorHandlingMetricValue("total-record-failures", 4.0);
+        // one record completely failed (converter issues), and thus was 
skipped
+        assertErrorHandlingMetricValue("total-records-skipped", 0.0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testErrorHandlingInSourceTasksWthBadConverter() throws 
Exception {
+        LogReporter reporter = new LogReporter(taskId);
+        Map<String, Object> reportProps = new HashMap<>();
+        reportProps.put(LogReporter.LOG_ENABLE, "true");
+        reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true");
+        reporter.configure(reportProps);
+        reporter.metrics(errorHandlingMetrics);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
+        retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+        createSourceTask(initialState, retryWithToleranceOperator, 
badConverter());
+
+        // valid json
+        Schema valSchema = SchemaBuilder.struct().field("val", 
Schema.INT32_SCHEMA).build();
+        Struct struct1 = new Struct(valSchema).put("val", 1234);
+        SourceRecord record1 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct1);
+        Struct struct2 = new Struct(valSchema).put("val", 6789);
+        SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
+
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
+        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+
+        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+
+        offsetWriter.offset(EasyMock.anyObject(), EasyMock.anyObject());
+        EasyMock.expectLastCall().times(2);
+        sourceTask.initialize(EasyMock.anyObject());
+        EasyMock.expectLastCall();
+
+        sourceTask.start(EasyMock.anyObject());
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
+        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
+        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andReturn(null).times(2);
+
+        PowerMock.replayAll();
+
+        workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.execute();
+
+        // two records were consumed from Kafka
+        assertSourceMetricValue("source-record-poll-total", 2.0);
+        // only one was written to the task
+        assertSourceMetricValue("source-record-write-total", 0.0);
+        // one record completely failed (converter issues)
+        assertErrorHandlingMetricValue("total-record-errors", 0.0);
+        // 2 failures in the transformation, and 1 in the converter
+        assertErrorHandlingMetricValue("total-record-failures", 8.0);
+        // one record completely failed (converter issues), and thus was 
skipped
+        assertErrorHandlingMetricValue("total-records-skipped", 0.0);
+
+        PowerMock.verifyAll();
+    }
+
+    private void assertSinkMetricValue(String name, double expected) {
+        ConnectMetrics.MetricGroup sinkTaskGroup = 
workerSinkTask.sinkTaskMetricsGroup().metricGroup();
+        double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, 
name);
+        assertEquals(expected, measured, 0.001d);
+    }
+
+    private void assertSourceMetricValue(String name, double expected) {
+        ConnectMetrics.MetricGroup sinkTaskGroup = 
workerSourceTask.sourceTaskMetricsGroup().metricGroup();
+        double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, 
name);
+        assertEquals(expected, measured, 0.001d);
+    }
+
+    private void assertErrorHandlingMetricValue(String name, double expected) {
+        ConnectMetrics.MetricGroup sinkTaskGroup = 
errorHandlingMetrics.metricGroup();
+        double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, 
name);
+        assertEquals(expected, measured, 0.001d);
+    }
+
+    private void expectInitializeTask() throws Exception {
+        PowerMock.expectPrivate(workerSinkTask, 
"createConsumer").andReturn(consumer);
+        consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), 
EasyMock.capture(rebalanceListener));
+        PowerMock.expectLastCall();
+
+        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
+        PowerMock.expectLastCall();
+        sinkTask.start(TASK_PROPS);
+        PowerMock.expectLastCall();
+    }
+
+    private void createSinkTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator) {
+        JsonConverter converter = new JsonConverter();
+        Map<String, Object> oo = 
workerConfig.originalsWithPrefix("value.converter.");
+        oo.put("converter.type", "value");
+        oo.put("schemas.enable", "false");
+        converter.configure(oo);
+
+        TransformationChain<SinkRecord> sinkTransforms = new 
TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()), 
retryWithToleranceOperator);
+
+        workerSinkTask = PowerMock.createPartialMock(
+                WorkerSinkTask.class, new String[]{"createConsumer"},
+                taskId, sinkTask, statusListener, initialState, workerConfig, 
metrics, converter, converter,
+                headerConverter, sinkTransforms, pluginLoader, time, 
retryWithToleranceOperator);
+    }
+
+    private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator) {
+        JsonConverter converter = new JsonConverter();
+        Map<String, Object> oo = 
workerConfig.originalsWithPrefix("value.converter.");
+        oo.put("converter.type", "value");
+        oo.put("schemas.enable", "false");
+        converter.configure(oo);
+
+        createSourceTask(initialState, retryWithToleranceOperator, converter);
+    }
+
+    private Converter badConverter() {
+        FaultyConverter converter = new FaultyConverter();
+        Map<String, Object> oo = 
workerConfig.originalsWithPrefix("value.converter.");
+        oo.put("converter.type", "value");
+        oo.put("schemas.enable", "false");
+        converter.configure(oo);
+        return converter;
+    }
+
+    private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
+        TransformationChain<SourceRecord> sourceTransforms = new 
TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()), 
retryWithToleranceOperator);
+
+        workerSourceTask = PowerMock.createPartialMock(
+                WorkerSourceTask.class, new String[]{"commitOffsets", 
"isStopping"},
+                taskId, sourceTask, statusListener, initialState, converter, 
converter, headerConverter, sourceTransforms,
+                producer, offsetReader, offsetWriter, workerConfig, metrics, 
pluginLoader, time, retryWithToleranceOperator);
+    }
+
+    private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], 
byte[]> record) {
+        return new ConsumerRecords<>(Collections.singletonMap(
+                new TopicPartition(record.topic(), record.partition()), 
singletonList(record)));
+    }
+
+    private abstract static class TestSinkTask extends SinkTask {
+    }
+
+    static class FaultyConverter extends JsonConverter {
+        private static final Logger log = 
LoggerFactory.getLogger(FaultyConverter.class);
+        private int invocations = 0;
+
+        public byte[] fromConnectData(String topic, Schema schema, Object 
value) {
+            if (value == null) {
+                return super.fromConnectData(topic, schema, null);
+            }
+            invocations++;
+            if (invocations % 3 == 0) {
+                log.debug("Succeeding record: {} where invocations={}", value, 
invocations);
+                return super.fromConnectData(topic, schema, value);
+            } else {
+                log.debug("Failing record: {} at invocations={}", value, 
invocations);
+                throw new RetriableException("Bad invocations " + invocations 
+ " for mod 3");
+            }
+        }
+    }
+
+    static class FaultyPassthrough<R extends ConnectRecord<R>> implements 
Transformation<R> {
+
+        private static final Logger log = 
LoggerFactory.getLogger(FaultyPassthrough.class);
+
+        private static final String MOD_CONFIG = "mod";
+        private static final int MOD_CONFIG_DEFAULT = 3;
+
+        public static final ConfigDef CONFIG_DEF = new ConfigDef()
+                .define(MOD_CONFIG, ConfigDef.Type.INT, MOD_CONFIG_DEFAULT, 
ConfigDef.Importance.MEDIUM, "Pass records without failure only if timestamp % 
mod == 0");
+
+        private int mod = MOD_CONFIG_DEFAULT;
+
+        private int invocations = 0;
+
+        @Override
+        public R apply(R record) {
+            invocations++;
+            if (invocations % mod == 0) {
+                log.debug("Succeeding record: {} where invocations={}", 
record, invocations);
+                return record;
+            } else {
+                log.debug("Failing record: {} at invocations={}", record, 
invocations);
+                throw new RetriableException("Bad invocations " + invocations 
+ " for mod " + mod);
+            }
+        }
+
+        @Override
+        public ConfigDef config() {
+            return CONFIG_DEF;
+        }
+
+        @Override
+        public void close() {
+            log.info("Shutting down transform");
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
+            mod = Math.max(config.getInt(MOD_CONFIG), 2);
+            log.info("Configuring {}. Setting mod to {}", this.getClass(), 
mod);
+        }
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 9568e787cc5..ff8507c0945 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -27,20 +27,21 @@
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.apache.kafka.common.utils.MockTime;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
@@ -161,7 +162,10 @@ public void setUp() {
     private void createTask(TargetState initialState) {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, 
metrics, keyConverter, valueConverter, headerConverter, transformationChain, 
pluginLoader, time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, 
metrics,
+                keyConverter, valueConverter, headerConverter,
+                transformationChain, pluginLoader, time,
+                RetryWithToleranceOperator.NOOP_OPERATOR);
     }
 
     @After
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index b09f8477f45..61d8778d11f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -28,6 +28,7 @@
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -137,7 +138,9 @@ public void setup() {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
                 taskId, sinkTask, statusListener, initialState, workerConfig, 
metrics, keyConverter,
-                valueConverter, headerConverter, TransformationChain.noOp(), 
pluginLoader, time);
+                valueConverter, headerConverter,
+                new TransformationChain(Collections.emptyList(), 
RetryWithToleranceOperator.NOOP_OPERATOR),
+                pluginLoader, time, RetryWithToleranceOperator.NOOP_OPERATOR);
 
         recordsReturned = 0;
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 25c2cb10352..bed51219bbe 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -26,6 +26,7 @@
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import 
org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -146,7 +147,8 @@ private void createWorkerTask() {
 
     private void createWorkerTask(TargetState initialState) {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, 
initialState, keyConverter, valueConverter, headerConverter,
-                transformationChain, producer, offsetReader, offsetWriter, 
config, metrics, plugins.delegatingLoader(), Time.SYSTEM);
+                transformationChain, producer, offsetReader, offsetWriter, 
config, metrics, plugins.delegatingLoader(), Time.SYSTEM,
+                RetryWithToleranceOperator.NOOP_OPERATOR);
     }
 
     @Test
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 78f2836f42b..de0ba8a9ddf 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.common.utils.MockTime;
@@ -56,10 +57,12 @@
     private ConnectMetrics metrics;
     @Mock private TaskStatus.Listener statusListener;
     @Mock private ClassLoader loader;
+    RetryWithToleranceOperator retryWithToleranceOperator;
 
     @Before
     public void setup() {
         metrics = new MockConnectMetrics();
+        retryWithToleranceOperator = new RetryWithToleranceOperator();
     }
 
     @After
@@ -77,9 +80,10 @@ public void standardStartup() {
                         TaskStatus.Listener.class,
                         TargetState.class,
                         ClassLoader.class,
-                        ConnectMetrics.class
+                        ConnectMetrics.class,
+                        RetryWithToleranceOperator.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics, retryWithToleranceOperator)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -123,9 +127,10 @@ public void stopBeforeStarting() {
                         TaskStatus.Listener.class,
                         TargetState.class,
                         ClassLoader.class,
-                        ConnectMetrics.class
+                        ConnectMetrics.class,
+                        RetryWithToleranceOperator.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics, retryWithToleranceOperator)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -162,9 +167,10 @@ public void cancelBeforeStopping() throws Exception {
                         TaskStatus.Listener.class,
                         TargetState.class,
                         ClassLoader.class,
-                        ConnectMetrics.class
+                        ConnectMetrics.class,
+                        RetryWithToleranceOperator.class
                 )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics)
+                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics, retryWithToleranceOperator)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f062436f0e0..d29eef5ed69 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
@@ -33,6 +34,7 @@
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -47,7 +49,6 @@
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.util.ThreadedTest;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -485,14 +486,15 @@ public void testAddRemoveTask() throws Exception {
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
-                EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
+                EasyMock.eq(new TransformationChain(Collections.emptyList(), 
RetryWithToleranceOperator.NOOP_OPERATOR)),
                 anyObject(KafkaProducer.class),
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 EasyMock.eq(config),
                 anyObject(ConnectMetrics.class),
                 anyObject(ClassLoader.class),
-                anyObject(Time.class))
+                anyObject(Time.class),
+                anyObject(RetryWithToleranceOperator.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
@@ -622,14 +624,15 @@ public void testCleanupTasksOnStop() throws Exception {
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
-                EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
+                EasyMock.eq(new TransformationChain(Collections.emptyList(), 
RetryWithToleranceOperator.NOOP_OPERATOR)),
                 anyObject(KafkaProducer.class),
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 anyObject(WorkerConfig.class),
                 anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
-                anyObject(Time.class))
+                anyObject(Time.class),
+                anyObject(RetryWithToleranceOperator.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
@@ -713,14 +716,15 @@ public void testConverterOverrides() throws Exception {
                 EasyMock.capture(keyConverter),
                 EasyMock.capture(valueConverter),
                 EasyMock.capture(headerConverter),
-                EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
+                EasyMock.eq(new TransformationChain(Collections.emptyList(), 
RetryWithToleranceOperator.NOOP_OPERATOR)),
                 anyObject(KafkaProducer.class),
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 anyObject(WorkerConfig.class),
                 anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
-                anyObject(Time.class))
+                anyObject(Time.class),
+                anyObject(RetryWithToleranceOperator.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
new file mode 100644
index 00000000000..f6a0507909c
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.ConnectMetrics;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class ErrorReporterTest {
+
+    private static final String TOPIC = "test-topic";
+    private static final String DLQ_TOPIC = "test-topic-errors";
+    private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 
0);
+
+    @Mock
+    KafkaProducer<byte[], byte[]> producer;
+
+    @Mock
+    Future<RecordMetadata> metadata;
+
+    private HashMap<String, Object> config;
+    private ErrorHandlingMetrics errorHandlingMetrics;
+    private MockConnectMetrics metrics;
+
+    @Before
+    public void setup() {
+        config = new HashMap<>();
+        metrics = new MockConnectMetrics();
+        errorHandlingMetrics = new ErrorHandlingMetrics(new 
ConnectorTaskId("connector-", 1), metrics);
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) {
+            metrics.stop();
+        }
+    }
+
+    @Test
+    public void testDLQConfigWithEmptyTopicName() {
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer);
+        deadLetterQueueReporter.configure(config);
+        deadLetterQueueReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andThrow(new RuntimeException());
+        replay(producer);
+
+        // since topic name is empty, this method should be a NOOP.
+        // if it attempts to log to the DLQ via the producer, the send mock 
will throw a RuntimeException.
+        deadLetterQueueReporter.report(context);
+    }
+
+    @Test
+    public void testDLQConfigWithValidTopicName() {
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer);
+        
deadLetterQueueReporter.configure(config(DeadLetterQueueReporter.DLQ_TOPIC_NAME,
 DLQ_TOPIC));
+        deadLetterQueueReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andReturn(metadata);
+        replay(producer);
+
+        deadLetterQueueReporter.report(context);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testReportDLQTwice() {
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer);
+        
deadLetterQueueReporter.configure(config(DeadLetterQueueReporter.DLQ_TOPIC_NAME,
 DLQ_TOPIC));
+        deadLetterQueueReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andReturn(metadata).times(2);
+        replay(producer);
+
+        deadLetterQueueReporter.report(context);
+        deadLetterQueueReporter.report(context);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testLogOnDisabledLogReporter() {
+        LogReporter logReporter = new LogReporter(TASK_ID);
+        logReporter.configure(config);
+        logReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+        context.error(new RuntimeException());
+
+        // reporting a context without an error should not cause any errors.
+        logReporter.report(context);
+        assertErrorHandlingMetricValue("total-errors-logged", 0.0);
+    }
+
+    @Test
+    public void testLogOnEnabledLogReporter() {
+        LogReporter logReporter = new LogReporter(TASK_ID);
+        logReporter.configure(config(LogReporter.LOG_ENABLE, "true"));
+        logReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+        context.error(new RuntimeException());
+
+        // reporting a context without an error should not cause any errors.
+        logReporter.report(context);
+        assertErrorHandlingMetricValue("total-errors-logged", 1.0);
+    }
+
+    @Test
+    public void testLogMessageWithNoRecords() {
+        LogReporter logReporter = new LogReporter(TASK_ID);
+        logReporter.configure(config(LogReporter.LOG_ENABLE, "true"));
+        logReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        String msg = logReporter.message(context);
+        assertEquals("Error encountered in task job-0. Executing stage 
'KEY_CONVERTER' with class " +
+                "'org.apache.kafka.connect.json.JsonConverter'.", msg);
+    }
+
+    @Test
+    public void testLogMessageWithSinkRecords() {
+        LogReporter logReporter = new LogReporter(TASK_ID);
+        logReporter.configure(config(LogReporter.LOG_ENABLE, "true"));
+        logReporter.configure(config(LogReporter.LOG_INCLUDE_MESSAGES, 
"true"));
+        logReporter.metrics(errorHandlingMetrics);
+
+        ProcessingContext context = processingContext();
+
+        String msg = logReporter.message(context);
+        assertEquals("Error encountered in task job-0. Executing stage 
'KEY_CONVERTER' with class " +
+                "'org.apache.kafka.connect.json.JsonConverter', where consumed 
record is {topic='test-topic', " +
+                "partition=5, offset=100}.", msg);
+    }
+
+    private ProcessingContext processingContext() {
+        ProcessingContext context = new ProcessingContext();
+        context.consumerRecord(new ConsumerRecord<>(TOPIC, 5, 100, new 
byte[]{'a', 'b'}, new byte[]{'x'}));
+        context.currentContext(Stage.KEY_CONVERTER, JsonConverter.class);
+        return context;
+    }
+
+    private Map<String, Object> config(String key, Object val) {
+        config.put(key, val);
+        return config;
+    }
+
+    private void assertErrorHandlingMetricValue(String name, double expected) {
+        ConnectMetrics.MetricGroup sinkTaskGroup = 
errorHandlingMetrics.metricGroup();
+        double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, 
name);
+        assertEquals(expected, measured, 0.001d);
+    }
+
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
new file mode 100644
index 00000000000..751510d5fa6
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.RetryWithToleranceOperatorConfig;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ProcessingContext.class})
+@PowerMockIgnore("javax.management.*")
+public class RetryWithToleranceOperatorTest {
+
+    @SuppressWarnings("unused")
+    @Mock
+    private Operation<String> mockOperation;
+
+    @Mock
+    ErrorHandlingMetrics errorHandlingMetrics;
+
+    @Test
+    public void testHandleExceptionInTransformations() {
+        testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInHeaderConverter() {
+        testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInValueConverter() {
+        testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInKeyConverter() {
+        testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception());
+    }
+
+    @Test
+    public void testHandleExceptionInTaskPut() {
+        testHandleExceptionInStage(Stage.TASK_PUT, new 
org.apache.kafka.connect.errors.RetriableException("Test"));
+    }
+
+    @Test
+    public void testHandleExceptionInTaskPoll() {
+        testHandleExceptionInStage(Stage.TASK_POLL, new 
org.apache.kafka.connect.errors.RetriableException("Test"));
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testThrowExceptionInTaskPut() {
+        testHandleExceptionInStage(Stage.TASK_PUT, new Exception());
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testThrowExceptionInTaskPoll() {
+        testHandleExceptionInStage(Stage.TASK_POLL, new Exception());
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testThrowExceptionInKafkaConsume() {
+        testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception());
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testThrowExceptionInKafkaProduce() {
+        testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception());
+    }
+
+    private void testHandleExceptionInStage(Stage type, Exception ex) {
+        RetryWithToleranceOperator retryWithToleranceOperator = 
setupExecutor();
+        retryWithToleranceOperator.execute(new ExceptionThrower(ex), type, 
ExceptionThrower.class);
+        assertTrue(retryWithToleranceOperator.failed());
+        PowerMock.verifyAll();
+    }
+
+    private RetryWithToleranceOperator setupExecutor() {
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator();
+        Map<String, Object> props = 
config(RetryWithToleranceOperator.RETRY_TIMEOUT, "0");
+        props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
+        retryWithToleranceOperator.configure(props);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        return retryWithToleranceOperator;
+    }
+
+    @Test
+    public void testExecAndHandleRetriableErrorOnce() throws Exception {
+        execAndHandleRetriableError(1, 300, new RetriableException("Test"));
+    }
+
+    @Test
+    public void testExecAndHandleRetriableErrorThrice() throws Exception {
+        execAndHandleRetriableError(3, 2100, new RetriableException("Test"));
+    }
+
+    @Test
+    public void testExecAndHandleNonRetriableErrorOnce() throws Exception {
+        execAndHandleNonRetriableError(1, 0, new Exception("Non Retriable 
Test"));
+    }
+
+    @Test
+    public void testExecAndHandleNonRetriableErrorThrice() throws Exception {
+        execAndHandleNonRetriableError(3, 0, new Exception("Non Retriable 
Test"));
+    }
+
+    public void execAndHandleRetriableError(int numRetriableExceptionsThrown, 
long expectedWait, Exception e) throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
+        Map<String, Object> props = 
config(RetryWithToleranceOperator.RETRY_TIMEOUT, "6000");
+        props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
+        retryWithToleranceOperator.configure(props);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+        
EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
+        EasyMock.expect(mockOperation.call()).andReturn("Success");
+
+        replay(mockOperation);
+
+        String result = 
retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
+        assertFalse(retryWithToleranceOperator.failed());
+        assertEquals("Success", result);
+        assertEquals(expectedWait, time.hiResClockMs());
+
+        PowerMock.verifyAll();
+    }
+
+    public void execAndHandleNonRetriableError(int 
numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
+        Map<String, Object> props = 
config(RetryWithToleranceOperator.RETRY_TIMEOUT, "6000");
+        props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
+        retryWithToleranceOperator.configure(props);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+        
EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
+        EasyMock.expect(mockOperation.call()).andReturn("Success");
+
+        replay(mockOperation);
+
+        String result = 
retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
+        assertTrue(retryWithToleranceOperator.failed());
+        assertNull(result);
+        assertEquals(expectedWait, time.hiResClockMs());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCheckRetryLimit() {
+        MockTime time = new MockTime(0, 0, 0);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
+        Map<String, Object> props = 
config(RetryWithToleranceOperator.RETRY_TIMEOUT, "500");
+        props.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, "100");
+        retryWithToleranceOperator.configure(props);
+
+        time.setCurrentTimeMs(100);
+        assertTrue(retryWithToleranceOperator.checkRetry(0));
+
+        time.setCurrentTimeMs(200);
+        assertTrue(retryWithToleranceOperator.checkRetry(0));
+
+        time.setCurrentTimeMs(400);
+        assertTrue(retryWithToleranceOperator.checkRetry(0));
+
+        time.setCurrentTimeMs(499);
+        assertTrue(retryWithToleranceOperator.checkRetry(0));
+
+        time.setCurrentTimeMs(501);
+        assertFalse(retryWithToleranceOperator.checkRetry(0));
+
+        time.setCurrentTimeMs(600);
+        assertFalse(retryWithToleranceOperator.checkRetry(0));
+    }
+
+    @Test
+    public void testBackoffLimit() {
+        MockTime time = new MockTime(0, 0, 0);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
+
+        Map<String, Object> props = 
config(RetryWithToleranceOperator.RETRY_TIMEOUT, "5");
+        props.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, "5000");
+        retryWithToleranceOperator.configure(props);
+
+        long prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(1, 5000);
+        assertEquals(300, time.hiResClockMs() - prevTs);
+
+        prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(2, 5000);
+        assertEquals(600, time.hiResClockMs() - prevTs);
+
+        prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(3, 5000);
+        assertEquals(1200, time.hiResClockMs() - prevTs);
+
+        prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(4, 5000);
+        assertEquals(2400, time.hiResClockMs() - prevTs);
+
+        prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(5, 5000);
+        assertEquals(500, time.hiResClockMs() - prevTs);
+
+        prevTs = time.hiResClockMs();
+        retryWithToleranceOperator.backoff(6, 5000);
+        assertEquals(0, time.hiResClockMs() - prevTs);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testToleranceLimit() {
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator();
+        
retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT,
 "none"));
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.markAsFailed();
+        assertFalse("should not tolerate any errors", 
retryWithToleranceOperator.withinToleranceLimits());
+
+        retryWithToleranceOperator = new RetryWithToleranceOperator();
+        
retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT,
 "all"));
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.markAsFailed();
+        retryWithToleranceOperator.markAsFailed();
+        assertTrue("should tolerate all errors", 
retryWithToleranceOperator.withinToleranceLimits());
+
+        retryWithToleranceOperator = new RetryWithToleranceOperator();
+        
retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT,
 "none"));
+        assertTrue("no tolerance is within limits if no failures", 
retryWithToleranceOperator.withinToleranceLimits());
+    }
+
+    @Test
+    public void testDefaultConfigs() {
+        RetryWithToleranceOperatorConfig configuration;
+        configuration = new RetryWithToleranceOperatorConfig(new HashMap<>());
+        assertEquals(configuration.retryTimeout(), 0);
+        assertEquals(configuration.retryDelayMax(), 60000);
+        assertEquals(configuration.toleranceLimit(), ToleranceType.NONE);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConfigs() {
+        RetryWithToleranceOperatorConfig configuration;
+        configuration = new 
RetryWithToleranceOperatorConfig(config("retry.timeout", "100"));
+        assertEquals(configuration.retryTimeout(), 100);
+
+        configuration = new 
RetryWithToleranceOperatorConfig(config("retry.delay.max.ms", "100"));
+        assertEquals(configuration.retryDelayMax(), 100);
+
+        configuration = new 
RetryWithToleranceOperatorConfig(config("allowed.max", "none"));
+        assertEquals(configuration.toleranceLimit(), ToleranceType.NONE);
+
+        PowerMock.verifyAll();
+    }
+
+    Map<String, Object> config(String key, Object val) {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(key, val);
+        return configs;
+    }
+
+    private static class ExceptionThrower implements Operation<Object> {
+        private Exception e;
+
+        public ExceptionThrower(Exception e) {
+            this.e = e;
+        }
+
+        @Override
+        public Object call() throws Exception {
+            throw e;
+        }
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect handling of bad data
> ----------------------------------
>
>                 Key: KAFKA-6738
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6738
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 1.1.0
>            Reporter: Randall Hauch
>            Assignee: Arjun Satish
>            Priority: Critical
>             Fix For: 2.0.0
>
>
> Kafka Connect connectors and tasks fail when they run into an unexpected 
> situation or error, but the framework should provide more general "bad data 
> handling" options, including (perhaps among others):
> # fail fast, which is what we do today (assuming connector actually fails and 
> doesn't eat errors)
> # retry (possibly with configs to limit)
> # drop data and move on
> # dead letter queue
> This needs to be addressed in a way that handles errors from:
> # The connector itself (e.g. connectivity issues to the other system)
> # Converters/serializers (bad data, unexpected format, etc)
> # SMTs
> # Ideally the framework as well, though we obviously want to fix known bugs 
> anyway



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to