[ 
https://issues.apache.org/jira/browse/KAFKA-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502498#comment-16502498
 ] 

ASF GitHub Bot commented on KAFKA-6981:
---------------------------------------

ewencp closed pull request #5125:  KAFKA-6981: Move the error handling 
configuration properties into the ConnectorConfig and SinkConnectorConfig 
classes
URL: https://github.com/apache/kafka/pull/5125
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index c54c160d5ab..f98469e5b5a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.errors.ToleranceType;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.transforms.Transformation;
@@ -36,8 +37,8 @@
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static 
org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 /**
@@ -54,6 +55,7 @@
 public class ConnectorConfig extends AbstractConfig {
     protected static final String COMMON_GROUP = "Common";
     protected static final String TRANSFORMS_GROUP = "Transforms";
+    protected static final String ERROR_GROUP = "Error Handling";
 
     public static final String NAME_CONFIG = "name";
     private static final String NAME_DOC = "Globally unique name to use for 
this connector.";
@@ -106,6 +108,37 @@
     public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.toString();
     public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.toString();
 
+    public static final String ERRORS_RETRY_TIMEOUT_CONFIG = 
"errors.retry.timeout";
+    public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout 
for Errors";
+    public static final int ERRORS_RETRY_TIMEOUT_DEFAULT = 0;
+    public static final String ERRORS_RETRY_TIMEOUT_DOC = "The maximum 
duration in milliseconds that a failed operation " +
+            "will be reattempted. The default is 0, which means no retries 
will be attempted. Use -1 for infinite retries.";
+
+    public static final String ERRORS_RETRY_MAX_DELAY_CONFIG = 
"errors.retry.delay.max.ms";
+    public static final String ERRORS_RETRY_MAX_DELAY_DISPLAY = "Maximum Delay 
Between Retries for Errors";
+    public static final int ERRORS_RETRY_MAX_DELAY_DEFAULT = 60000;
+    public static final String ERRORS_RETRY_MAX_DELAY_DOC = "The maximum 
duration in milliseconds between consecutive retry attempts. " +
+            "Jitter will be added to the delay once this limit is reached to 
prevent thundering herd issues.";
+
+    public static final String ERRORS_TOLERANCE_CONFIG = "errors.allowed.max";
+    public static final String ERRORS_TOLERANCE_DISPLAY = "Error Tolerance";
+    public static final ToleranceType ERRORS_TOLERANCE_DEFAULT = 
ToleranceType.NONE;
+    public static final String ERRORS_TOLERANCE_DOC = "Behavior for tolerating 
errors during connector operation. 'none' is the default value " +
+            "and signals that any error will result in an immediate connector 
task failure; 'all' changes the behavior to skip over problematic records.";
+
+    public static final String ERRORS_LOG_ENABLE_CONFIG = "errors.log.enable";
+    public static final String ERRORS_LOG_ENABLE_DISPLAY = "Log Errors";
+    public static final boolean ERRORS_LOG_ENABLE_DEFAULT = false;
+    public static final String ERRORS_LOG_ENABLE_DOC = "If true, write each 
error and the details of the failed operation and problematic record " +
+            "to the Connect application log. This is 'false' by default, so 
that only errors that are not tolerated are reported.";
+
+    public static final String ERRORS_LOG_INCLUDE_MESSAGES_CONFIG = 
"errors.log.include.messages";
+    public static final String ERRORS_LOG_INCLUDE_MESSAGES_DISPLAY = "Log 
Error Details";
+    public static final boolean ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT = false;
+    public static final String ERRORS_LOG_INCLUDE_MESSAGES_DOC = "Whether to 
the include in the log the Connect record that resulted in " +
+            "a failure. This is 'false' by default, which will prevent record 
keys, values, and headers from being written to log files, " +
+            "although some information such as topic and partition number will 
still be logged.";
+
     private final EnrichedConnectorConfig enrichedConfig;
     private static class EnrichedConnectorConfig extends AbstractConfig {
         EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> 
props) {
@@ -120,6 +153,7 @@ public Object get(String key) {
 
     public static ConfigDef configDef() {
         int orderInGroup = 0;
+        int orderInErrorGroup = 0;
         return new ConfigDef()
                 .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, 
nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, 
++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
                 .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, 
CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, 
CONNECTOR_CLASS_DISPLAY)
@@ -138,7 +172,18 @@ public void ensureValid(String name, Object value) {
                 }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 
++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
                 .define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, 
CONFIG_RELOAD_ACTION_RESTART,
                         in(CONFIG_RELOAD_ACTION_NONE, 
CONFIG_RELOAD_ACTION_RESTART), Importance.LOW,
-                        CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, 
++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY);
+                        CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, 
++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY)
+                .define(ERRORS_RETRY_TIMEOUT_CONFIG, Type.LONG, 
ERRORS_RETRY_TIMEOUT_DEFAULT, Importance.MEDIUM,
+                        ERRORS_RETRY_TIMEOUT_DOC, ERROR_GROUP, 
++orderInErrorGroup, Width.MEDIUM, ERRORS_RETRY_TIMEOUT_DISPLAY)
+                .define(ERRORS_RETRY_MAX_DELAY_CONFIG, Type.LONG, 
ERRORS_RETRY_MAX_DELAY_DEFAULT, Importance.MEDIUM,
+                        ERRORS_RETRY_MAX_DELAY_DOC, ERROR_GROUP, 
++orderInErrorGroup, Width.MEDIUM, ERRORS_RETRY_MAX_DELAY_DISPLAY)
+                .define(ERRORS_TOLERANCE_CONFIG, Type.STRING, 
ERRORS_TOLERANCE_DEFAULT.value(),
+                        in(ToleranceType.NONE.value(), 
ToleranceType.ALL.value()), Importance.MEDIUM,
+                        ERRORS_TOLERANCE_DOC, ERROR_GROUP, 
++orderInErrorGroup, Width.SHORT, ERRORS_TOLERANCE_DISPLAY)
+                .define(ERRORS_LOG_ENABLE_CONFIG, Type.BOOLEAN, 
ERRORS_LOG_ENABLE_DEFAULT, Importance.MEDIUM,
+                        ERRORS_LOG_ENABLE_DOC, ERROR_GROUP, 
++orderInErrorGroup, Width.SHORT, ERRORS_LOG_ENABLE_DISPLAY)
+                .define(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, Type.BOOLEAN, 
ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT, Importance.MEDIUM,
+                        ERRORS_LOG_INCLUDE_MESSAGES_DOC, ERROR_GROUP, 
++orderInErrorGroup, Width.SHORT, ERRORS_LOG_INCLUDE_MESSAGES_DISPLAY);
     }
 
     public ConnectorConfig(Plugins plugins) {
@@ -162,6 +207,32 @@ public Object get(String key) {
         return enrichedConfig.get(key);
     }
 
+    public long errorRetryTimeout() {
+        return getLong(ERRORS_RETRY_TIMEOUT_CONFIG);
+    }
+
+    public long errorMaxDelayInMillis() {
+        return getLong(ERRORS_RETRY_MAX_DELAY_CONFIG);
+    }
+
+    public ToleranceType errorToleranceType() {
+        String tolerance = getString(ERRORS_TOLERANCE_CONFIG);
+        for (ToleranceType type: ToleranceType.values()) {
+            if (type.name().equalsIgnoreCase(tolerance)) {
+                return type;
+            }
+        }
+        return ERRORS_TOLERANCE_DEFAULT;
+    }
+
+    public boolean enableErrorLog() {
+        return getBoolean(ERRORS_LOG_ENABLE_CONFIG);
+    }
+
+    public boolean includeRecordDetailsInErrorLog() {
+        return getBoolean(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG);
+    }
+
     /**
      * Returns the initialized list of {@link Transformation} which are 
specified in {@link #TRANSFORMS_CONFIG}.
      */
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 887a4da2dea..9629f8f0e42 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -42,9 +43,19 @@
     public static final String TOPICS_REGEX_DEFAULT = "";
     private static final String TOPICS_REGEX_DISPLAY = "Topics regex";
 
+    public static final String DLQ_PREFIX = "errors.deadletterqueue.";
+
+    public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + 
"topic.name";
+    public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic to 
be used as the dead letter queue (DLQ) for messages that " +
+        "result in an error when processed by this sink connector, or its 
transformations or converters. The topic name is blank by default, " +
+        "which means that no messages are to be recorded in the DLQ.";
+    public static final String DLQ_TOPIC_DEFAULT = "";
+    private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic 
Name";
+
     static ConfigDef config = ConnectorConfig.configDef()
         .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, 
ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, 
TOPICS_DISPLAY)
-        .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, 
TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, 
TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY);
+        .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, 
TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, 
TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
+        .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, 
DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY);
 
     public static ConfigDef configDef() {
         return config;
@@ -83,4 +94,7 @@ public static boolean hasTopicsRegexConfig(Map<String, 
String> props) {
         return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
     }
 
+    public String dlqTopicName() {
+        return getString(DLQ_TOPIC_NAME_CONFIG);
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 7a72a0e7b26..97e68faa4ca 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -485,8 +485,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
                                        ClassLoader loader) {
         ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
 
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator();
-        
retryWithToleranceOperator.configure(connConfig.originalsWithPrefix("errors."));
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
+                connConfig.errorMaxDelayInMillis(), 
connConfig.errorToleranceType(), Time.SYSTEM);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
 
         // Decide which type of worker task we need based on the type of task.
@@ -505,7 +505,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
                     time, retryWithToleranceOperator);
         } else if (task instanceof SinkTask) {
             TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
-            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
connConfig, errorHandlingMetrics));
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics));
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
                     valueConverter, headerConverter, transformationChain, 
loader, time,
                     retryWithToleranceOperator);
@@ -519,19 +520,17 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId 
id) {
         return new ErrorHandlingMetrics(id, metrics);
     }
 
-    private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, 
ConnectorConfig connConfig,
-                                                    ErrorHandlingMetrics 
errorHandlingMetrics) {
+    private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, 
SinkConnectorConfig connConfig,
+                                                  ErrorHandlingMetrics 
errorHandlingMetrics) {
         ArrayList<ErrorReporter> reporters = new ArrayList<>();
-        LogReporter logReporter = new LogReporter(id);
-        
logReporter.configure(connConfig.originalsWithPrefix(LogReporter.PREFIX + "."));
+        LogReporter logReporter = new LogReporter(id, connConfig);
         logReporter.metrics(errorHandlingMetrics);
         reporters.add(logReporter);
 
         // check if topic for dead letter queue exists
-        String topic = connConfig.getString(DeadLetterQueueReporter.PREFIX + 
"." + DeadLetterQueueReporter.DLQ_TOPIC_NAME);
+        String topic = connConfig.dlqTopicName();
         if (topic != null && !topic.isEmpty()) {
             DeadLetterQueueReporter reporter = 
DeadLetterQueueReporter.createAndSetup(config, connConfig, producerProps);
-            
reporter.configure(connConfig.originalsWithPrefix(DeadLetterQueueReporter.PREFIX
 + "."));
             reporters.add(reporter);
         }
 
@@ -541,8 +540,7 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId 
id) {
     private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId id, 
ConnectorConfig connConfig,
                                                       ErrorHandlingMetrics 
errorHandlingMetrics) {
         List<ErrorReporter> reporters = new ArrayList<>();
-        LogReporter logReporter = new LogReporter(id);
-        
logReporter.configure(connConfig.originalsWithPrefix(LogReporter.PREFIX + "."));
+        LogReporter logReporter = new LogReporter(id, connConfig);
         logReporter.metrics(errorHandlingMetrics);
         reporters.add(logReporter);
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 454a619ec76..9a8a9afb29b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -21,12 +21,10 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,22 +47,14 @@
     private static final short DLQ_MAX_DESIRED_REPLICATION_FACTOR = 3;
     private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
 
-    public static final String PREFIX = "errors.deadletterqueue";
+    private final SinkConnectorConfig connConfig;
 
-    public static final String DLQ_TOPIC_NAME = "topic.name";
-    public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic 
where these messages are written to.";
-    public static final String DLQ_TOPIC_DEFAULT = "";
-
-    private DeadLetterQueueReporterConfig config;
     private KafkaProducer<byte[], byte[]> kafkaProducer;
     private ErrorHandlingMetrics errorHandlingMetrics;
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(DLQ_TOPIC_NAME, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, 
ConfigDef.Importance.HIGH, DLQ_TOPIC_NAME_DOC);
-
     public static DeadLetterQueueReporter createAndSetup(WorkerConfig 
workerConfig,
-                                                         ConnectorConfig 
connConfig, Map<String, Object> producerProps) {
-        String topic = connConfig.getString(PREFIX + "." + DLQ_TOPIC_NAME);
+                                                         SinkConnectorConfig 
connConfig, Map<String, Object> producerProps) {
+        String topic = connConfig.dlqTopicName();
 
         try (AdminClient admin = AdminClient.create(workerConfig.originals())) 
{
             if (!admin.listTopics().names().get().contains(topic)) {
@@ -81,7 +71,7 @@ public static DeadLetterQueueReporter 
createAndSetup(WorkerConfig workerConfig,
         }
 
         KafkaProducer<byte[], byte[]> dlqProducer = new 
KafkaProducer<>(producerProps);
-        return new DeadLetterQueueReporter(dlqProducer);
+        return new DeadLetterQueueReporter(dlqProducer, connConfig);
     }
 
     /**
@@ -90,13 +80,9 @@ public static DeadLetterQueueReporter 
createAndSetup(WorkerConfig workerConfig,
      * @param kafkaProducer a Kafka Producer to produce the original consumed 
records.
      */
     // Visible for testing
-    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer) {
+    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, 
SinkConnectorConfig connConfig) {
         this.kafkaProducer = kafkaProducer;
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs) {
-        config = new DeadLetterQueueReporterConfig(configs);
+        this.connConfig = connConfig;
     }
 
     @Override
@@ -110,7 +96,8 @@ public void metrics(ErrorHandlingMetrics 
errorHandlingMetrics) {
      * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
      */
     public void report(ProcessingContext context) {
-        if (config.topic().isEmpty()) {
+        final String dlqTopicName = connConfig.dlqTopicName();
+        if (dlqTopicName.isEmpty()) {
             return;
         }
 
@@ -124,31 +111,18 @@ public void report(ProcessingContext context) {
 
         ProducerRecord<byte[], byte[]> producerRecord;
         if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) {
-            producerRecord = new ProducerRecord<>(config.topic(), null,
+            producerRecord = new ProducerRecord<>(dlqTopicName, null,
                     originalMessage.key(), originalMessage.value(), 
originalMessage.headers());
         } else {
-            producerRecord = new ProducerRecord<>(config.topic(), null, 
originalMessage.timestamp(),
+            producerRecord = new ProducerRecord<>(dlqTopicName, null, 
originalMessage.timestamp(),
                     originalMessage.key(), originalMessage.value(), 
originalMessage.headers());
         }
 
         this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
             if (exception != null) {
-                log.error("Could not produce message to dead letter queue. 
topic=" + config.topic(), exception);
+                log.error("Could not produce message to dead letter queue. 
topic=" + dlqTopicName, exception);
                 errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
             }
         });
     }
-
-    static class DeadLetterQueueReporterConfig extends AbstractConfig {
-        public DeadLetterQueueReporterConfig(Map<?, ?> originals) {
-            super(CONFIG_DEF, originals, true);
-        }
-
-        /**
-         * @return name of the dead letter queue topic.
-         */
-        public String topic() {
-            return getString(DLQ_TOPIC_NAME);
-        }
-    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
index e71b6bc8ba9..f7df1b2d1a3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -16,12 +16,10 @@
  */
 package org.apache.kafka.connect.runtime.errors;
 
-import org.apache.kafka.common.Configurable;
-
 /**
  * Report an error using the information contained in the {@link 
ProcessingContext}.
  */
-public interface ErrorReporter extends Configurable {
+public interface ErrorReporter {
 
     /**
      * Report an error.
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
index 1d2c08fd18b..e81bd547568 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
@@ -16,14 +16,11 @@
  */
 package org.apache.kafka.connect.runtime.errors;
 
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
  * Writes errors and their context to application logs.
  */
@@ -31,29 +28,16 @@
 
     private static final Logger log = 
LoggerFactory.getLogger(LogReporter.class);
 
-    public static final String PREFIX = "errors.log";
-
-    public static final String LOG_ENABLE = "enable";
-    public static final String LOG_ENABLE_DOC = "If true, log to application 
logs the errors and the information describing where they occurred.";
-    public static final boolean LOG_ENABLE_DEFAULT = false;
-
-    public static final String LOG_INCLUDE_MESSAGES = "include.messages";
-    public static final String LOG_INCLUDE_MESSAGES_DOC = "If true, include in 
the application log the Connect key, value, and other details of records that 
resulted in errors and failures.";
-    public static final boolean LOG_INCLUDE_MESSAGES_DEFAULT = false;
-
     private final ConnectorTaskId id;
+    private final ConnectorConfig connConfig;
 
-    private LogReporterConfig config;
     private ErrorHandlingMetrics errorHandlingMetrics;
 
-    public LogReporter(ConnectorTaskId id) {
+    public LogReporter(ConnectorTaskId id, ConnectorConfig connConfig) {
         this.id = id;
+        this.connConfig = connConfig;
     }
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
-                .define(LOG_ENABLE, ConfigDef.Type.BOOLEAN, 
LOG_ENABLE_DEFAULT, ConfigDef.Importance.MEDIUM, LOG_ENABLE_DOC)
-                .define(LOG_INCLUDE_MESSAGES, ConfigDef.Type.BOOLEAN, 
LOG_INCLUDE_MESSAGES_DEFAULT, ConfigDef.Importance.MEDIUM, 
LOG_INCLUDE_MESSAGES_DOC);
-
     /**
      * Log error context.
      *
@@ -61,7 +45,7 @@ public LogReporter(ConnectorTaskId id) {
      */
     @Override
     public void report(ProcessingContext context) {
-        if (!config.isEnabled()) {
+        if (!connConfig.enableErrorLog()) {
             return;
         }
 
@@ -80,32 +64,8 @@ public void metrics(ErrorHandlingMetrics 
errorHandlingMetrics) {
 
     // Visible for testing
     String message(ProcessingContext context) {
-        return String.format("Error encountered in task %s. %s", 
String.valueOf(id), context.toString(config.canLogMessages()));
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs) {
-        config = new LogReporterConfig(configs);
-    }
-
-    private static class LogReporterConfig extends AbstractConfig {
-        public LogReporterConfig(Map<?, ?> originals) {
-            super(CONFIG_DEF, originals, true);
-        }
-
-        /**
-         * @return true, if logging of error context is desired; false 
otherwise.
-         */
-        public boolean isEnabled() {
-            return getBoolean(LOG_ENABLE);
-        }
-
-        /**
-         * @return if false, the connect record which caused the exception is 
not logged.
-         */
-        public boolean canLogMessages() {
-            return getBoolean(LOG_INCLUDE_MESSAGES);
-        }
+        return String.format("Error encountered in task %s. %s", 
String.valueOf(id),
+                context.toString(connConfig.includeRecordDetailsInErrorLog()));
     }
 
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 941abf3d3d2..eadf276adb3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -17,37 +17,31 @@
 package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
-
 /**
  * Attempt to recover a failed operation with retries and tolerance limits.
  * <p>
  *
  * A retry is attempted if the operation throws a {@link RetriableException}. 
Retries are accompanied by exponential backoffs, starting with
- * {@link #RETRIES_DELAY_MIN_MS}, up to what is specified with {@link 
RetryWithToleranceOperatorConfig#retryDelayMax()}.
+ * {@link #RETRIES_DELAY_MIN_MS}, up to what is specified with {@link 
ConnectorConfig#errorMaxDelayInMillis()}.
  * Including the first attempt and future retries, the total time taken to 
evaluate the operation should be within
- * {@link RetryWithToleranceOperatorConfig#retryDelayMax()} millis.
+ * {@link ConnectorConfig#errorMaxDelayInMillis()} millis.
  * <p>
  *
- * This executor will tolerate failures, as specified by {@link 
RetryWithToleranceOperatorConfig#toleranceLimit()}.
+ * This executor will tolerate failures, as specified by {@link 
ConnectorConfig#errorToleranceType()}.
  * For transformations and converters, all exceptions are tolerated. For 
others operations, only {@link RetriableException} are tolerated.
  * <p>
  *
@@ -61,27 +55,8 @@
 
     private static final Logger log = 
LoggerFactory.getLogger(RetryWithToleranceOperator.class);
 
-    public static final String RETRY_TIMEOUT = "retry.timeout";
-    public static final String RETRY_TIMEOUT_DOC = "The total duration in 
milliseconds a failed operation will be retried for.";
-    public static final long RETRY_TIMEOUT_DEFAULT = 0;
-
-    public static final String RETRY_DELAY_MAX_MS = "retry.delay.max.ms";
-    public static final String RETRY_DELAY_MAX_MS_DOC = "The maximum duration 
between two consecutive retries (in milliseconds).";
-    public static final long RETRY_DELAY_MAX_MS_DEFAULT = 60000;
-
     public static final long RETRIES_DELAY_MIN_MS = 300;
 
-    public static final String TOLERANCE_LIMIT = "allowed.max";
-    public static final String TOLERANCE_LIMIT_DOC = "Fail the task if we 
exceed specified number of errors overall.";
-    public static final String TOLERANCE_LIMIT_DEFAULT = "none";
-
-    // for testing only
-    public static final RetryWithToleranceOperator NOOP_OPERATOR = new 
RetryWithToleranceOperator();
-    static {
-        NOOP_OPERATOR.configure(Collections.emptyMap());
-        NOOP_OPERATOR.metrics(new ErrorHandlingMetrics());
-    }
-
     private static final Map<Stage, Class<? extends Exception>> 
TOLERABLE_EXCEPTIONS = new HashMap<>();
     static {
         TOLERABLE_EXCEPTIONS.put(Stage.TRANSFORMATION, Exception.class);
@@ -90,29 +65,24 @@
         TOLERABLE_EXCEPTIONS.put(Stage.VALUE_CONVERTER, Exception.class);
     }
 
+    private final long errorRetryTimeout;
+    private final long errorMaxDelayInMillis;
+    private final ToleranceType errorToleranceType;
+
     private long totalFailures = 0;
     private final Time time;
-    private RetryWithToleranceOperatorConfig config;
     private ErrorHandlingMetrics errorHandlingMetrics;
 
     protected ProcessingContext context = new ProcessingContext();
 
-    public RetryWithToleranceOperator() {
-        this(new SystemTime());
-    }
-
-    // Visible for testing
-    public RetryWithToleranceOperator(Time time) {
+    public RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
+                                      ToleranceType toleranceType, Time time) {
+        this.errorRetryTimeout = errorRetryTimeout;
+        this.errorMaxDelayInMillis = errorMaxDelayInMillis;
+        this.errorToleranceType = toleranceType;
         this.time = time;
     }
 
-    static ConfigDef getConfigDef() {
-        return new ConfigDef()
-                .define(RETRY_TIMEOUT, ConfigDef.Type.LONG, 
RETRY_TIMEOUT_DEFAULT, ConfigDef.Importance.HIGH, RETRY_TIMEOUT_DOC)
-                .define(RETRY_DELAY_MAX_MS, ConfigDef.Type.LONG, 
RETRY_DELAY_MAX_MS_DEFAULT, atLeast(1), ConfigDef.Importance.MEDIUM, 
RETRY_DELAY_MAX_MS_DOC)
-                .define(TOLERANCE_LIMIT, ConfigDef.Type.STRING, 
TOLERANCE_LIMIT_DEFAULT, in("none", "all"), ConfigDef.Importance.HIGH, 
TOLERANCE_LIMIT_DOC);
-    }
-
     /**
      * Execute the recoverable operation. If the operation is already in a 
failed state, then simply return
      * with the existing failure.
@@ -151,7 +121,7 @@ static ConfigDef getConfigDef() {
     protected <V> V execAndRetry(Operation<V> operation) throws Exception {
         int attempt = 0;
         long startTime = time.milliseconds();
-        long deadline = startTime + config.retryTimeout();
+        long deadline = startTime + errorRetryTimeout;
         do {
             try {
                 attempt++;
@@ -221,27 +191,27 @@ void markAsFailed() {
 
     // Visible for testing
     boolean withinToleranceLimits() {
-        switch (config.toleranceLimit()) {
+        switch (errorToleranceType) {
             case NONE:
                 if (totalFailures > 0) return false;
             case ALL:
                 return true;
             default:
-                throw new ConfigException("Unknown tolerance type: {}", 
config.toleranceLimit());
+                throw new ConfigException("Unknown tolerance type: {}", 
errorToleranceType);
         }
     }
 
     // Visible for testing
     boolean checkRetry(long startTime) {
-        return (time.milliseconds() - startTime) < config.retryTimeout();
+        return (time.milliseconds() - startTime) < errorRetryTimeout;
     }
 
     // Visible for testing
     void backoff(int attempt, long deadline) {
         int numRetry = attempt - 1;
         long delay = RETRIES_DELAY_MIN_MS << numRetry;
-        if (delay > config.retryDelayMax()) {
-            delay = 
ThreadLocalRandom.current().nextLong(config.retryDelayMax());
+        if (delay > errorMaxDelayInMillis) {
+            delay = 
ThreadLocalRandom.current().nextLong(errorMaxDelayInMillis);
         }
         if (delay + time.milliseconds() > deadline) {
             delay = deadline - time.milliseconds();
@@ -250,45 +220,19 @@ void backoff(int attempt, long deadline) {
         time.sleep(delay);
     }
 
-    public void configure(Map<String, ?> configs) {
-        config = new RetryWithToleranceOperatorConfig(configs);
-    }
-
     public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
         this.errorHandlingMetrics = errorHandlingMetrics;
     }
 
-    static class RetryWithToleranceOperatorConfig extends AbstractConfig {
-        public RetryWithToleranceOperatorConfig(Map<?, ?> originals) {
-            super(getConfigDef(), originals, true);
-        }
-
-        /**
-         * @return the total time an operation can take to succeed (including 
the first attempt and retries).
-         */
-        public long retryTimeout() {
-            return getLong(RETRY_TIMEOUT);
-        }
-
-        /**
-         * @return the maximum delay between two subsequent retries in 
milliseconds.
-         */
-        public long retryDelayMax() {
-            return getLong(RETRY_DELAY_MAX_MS);
-        }
-
-        /**
-         * @return determine how many errors to tolerate.
-         */
-        public ToleranceType toleranceLimit() {
-            return ToleranceType.fromString(getString(TOLERANCE_LIMIT));
-        }
-    }
-
     @Override
     public String toString() {
         return "RetryWithToleranceOperator{" +
-                "config=" + config +
+                "errorRetryTimeout=" + errorRetryTimeout +
+                ", errorMaxDelayInMillis=" + errorMaxDelayInMillis +
+                ", errorToleranceType=" + errorToleranceType +
+                ", totalFailures=" + totalFailures +
+                ", time=" + time +
+                ", context=" + context +
                 '}';
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java
index 79956ac7963..dd40a60648a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java
@@ -33,7 +33,8 @@
      */
     ALL;
 
-    public static ToleranceType fromString(String typeStr) {
-        return "ALL".equals(typeStr.toUpperCase(Locale.ROOT)) ? 
ToleranceType.ALL : ToleranceType.NONE;
+    public String value() {
+        return name().toLowerCase(Locale.ROOT);
     }
+
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index da017e851b7..5728465095a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -170,16 +170,16 @@ public void testConfigValidationMissingName() {
         // We expect there to be errors due to the missing name and .... Note 
that these assertions depend heavily on
         // the config fields for SourceConnectorConfig, but we expect these to 
change rarely.
         assertEquals(TestSourceConnector.class.getName(), result.name());
-        assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, 
ConnectorConfig.TRANSFORMS_GROUP), result.groups());
+        assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, 
ConnectorConfig.TRANSFORMS_GROUP, ConnectorConfig.ERROR_GROUP), 
result.groups());
         assertEquals(2, result.errorCount());
-        // Base connector config has 8 fields, connector's configs add 2
-        assertEquals(10, result.values().size());
+        // Base connector config has 13 fields, connector's configs add 2
+        assertEquals(15, result.values().size());
         // Missing name should generate an error
         assertEquals(ConnectorConfig.NAME_CONFIG, 
result.values().get(0).configValue().name());
         assertEquals(1, result.values().get(0).configValue().errors().size());
         // "required" config from connector should generate an error
-        assertEquals("required", result.values().get(8).configValue().name());
-        assertEquals(1, result.values().get(8).configValue().errors().size());
+        assertEquals("required", result.values().get(13).configValue().name());
+        assertEquals(1, result.values().get(13).configValue().errors().size());
 
         verifyAll();
     }
@@ -228,20 +228,21 @@ public void testConfigValidationTransformsExtendResults() 
{
         List<String> expectedGroups = Arrays.asList(
                 ConnectorConfig.COMMON_GROUP,
                 ConnectorConfig.TRANSFORMS_GROUP,
+                ConnectorConfig.ERROR_GROUP,
                 "Transforms: xformA",
                 "Transforms: xformB"
         );
         assertEquals(expectedGroups, result.groups());
         assertEquals(2, result.errorCount());
-        // Base connector config has 8 fields, connector's configs add 2, 2 
type fields from the transforms, and
+        // Base connector config has 13 fields, connector's configs add 2, 2 
type fields from the transforms, and
         // 1 from the valid transformation's config
-        assertEquals(13, result.values().size());
+        assertEquals(18, result.values().size());
         // Should get 2 type fields from the transforms, first adds its own 
config since it has a valid class
-        assertEquals("transforms.xformA.type", 
result.values().get(8).configValue().name());
-        assertTrue(result.values().get(8).configValue().errors().isEmpty());
-        assertEquals("transforms.xformA.subconfig", 
result.values().get(9).configValue().name());
-        assertEquals("transforms.xformB.type", 
result.values().get(10).configValue().name());
-        assertFalse(result.values().get(10).configValue().errors().isEmpty());
+        assertEquals("transforms.xformA.type", 
result.values().get(13).configValue().name());
+        assertTrue(result.values().get(13).configValue().errors().isEmpty());
+        assertEquals("transforms.xformA.subconfig", 
result.values().get(14).configValue().name());
+        assertEquals("transforms.xformB.type", 
result.values().get(15).configValue().name());
+        assertFalse(result.values().get(15).configValue().errors().isEmpty());
 
         verifyAll();
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index b50e7ff0956..e931642afcc 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -35,7 +35,9 @@
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.LogReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.ToleranceType;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -69,6 +71,7 @@
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Time.SYSTEM;
 import static org.junit.Assert.assertEquals;
 
 @RunWith(PowerMockRunner.class)
@@ -81,6 +84,8 @@
     private static final int PARTITION2 = 13;
     private static final long FIRST_OFFSET = 45;
 
+    @Mock Plugins plugins;
+
     private static final Map<String, String> TASK_PROPS = new HashMap<>();
 
     static {
@@ -88,17 +93,11 @@
         TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSinkTask.class.getName());
     }
 
-    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
-
-    private static final Map<String, String> OPERATION_EXECUTOR_PROPS = new 
HashMap<>();
+    public static final long OPERATOR_RETRY_TIMEOUT_MILLIS = 60000;
+    public static final long OPERATOR_RETRY_MAX_DELAY_MILLIS = 5000;
+    public static final ToleranceType OPERATOR_TOLERANCE_TYPE = 
ToleranceType.ALL;
 
-    static {
-        
OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
-        // wait up to 1 minute for an operation
-        OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.RETRY_TIMEOUT, 
"60000");
-        // wait up 5 seconds between subsequent retries
-        
OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, 
"5000");
-    }
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
 
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private TargetState initialState = TargetState.STARTED;
@@ -164,15 +163,13 @@ public void tearDown() {
 
     @Test
     public void testErrorHandlingInSinkTasks() throws Exception {
-        LogReporter reporter = new LogReporter(taskId);
-        Map<String, Object> reportProps = new HashMap<>();
-        reportProps.put(LogReporter.LOG_ENABLE, "true");
-        reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true");
-        reporter.configure(reportProps);
+        Map<String, String> reportProps = new HashMap<>();
+        reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
+        reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, 
"true");
+        LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps));
         reporter.metrics(errorHandlingMetrics);
 
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
-        retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS);
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
         retryWithToleranceOperator.reporters(singletonList(reporter));
         createSinkTask(initialState, retryWithToleranceOperator);
@@ -212,17 +209,19 @@ public void testErrorHandlingInSinkTasks() throws 
Exception {
         PowerMock.verifyAll();
     }
 
+    private RetryWithToleranceOperator operator() {
+        return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, 
OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM);
+    }
+
     @Test
     public void testErrorHandlingInSourceTasks() throws Exception {
-        LogReporter reporter = new LogReporter(taskId);
-        Map<String, Object> reportProps = new HashMap<>();
-        reportProps.put(LogReporter.LOG_ENABLE, "true");
-        reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true");
-        reporter.configure(reportProps);
+        Map<String, String> reportProps = new HashMap<>();
+        reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
+        reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, 
"true");
+        LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps));
         reporter.metrics(errorHandlingMetrics);
 
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
-        retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS);
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
         retryWithToleranceOperator.reporters(singletonList(reporter));
         createSourceTask(initialState, retryWithToleranceOperator);
@@ -271,17 +270,23 @@ public void testErrorHandlingInSourceTasks() throws 
Exception {
         PowerMock.verifyAll();
     }
 
+    private ConnectorConfig connConfig(Map<String, String> connProps) {
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.NAME_CONFIG, "test");
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
SinkTask.class.getName());
+        props.putAll(connProps);
+        return new ConnectorConfig(plugins, props);
+    }
+
     @Test
     public void testErrorHandlingInSourceTasksWthBadConverter() throws 
Exception {
-        LogReporter reporter = new LogReporter(taskId);
-        Map<String, Object> reportProps = new HashMap<>();
-        reportProps.put(LogReporter.LOG_ENABLE, "true");
-        reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true");
-        reporter.configure(reportProps);
+        Map<String, String> reportProps = new HashMap<>();
+        reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
+        reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, 
"true");
+        LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps));
         reporter.metrics(errorHandlingMetrics);
 
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
-        retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS);
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
         retryWithToleranceOperator.reporters(singletonList(reporter));
         createSourceTask(initialState, retryWithToleranceOperator, 
badConverter());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index d23adbf3d69..4a7c760fc74 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -34,7 +34,7 @@
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
-import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -166,7 +166,7 @@ private void createTask(TargetState initialState) {
                 taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics,
                 keyConverter, valueConverter, headerConverter,
                 transformationChain, pluginLoader, time,
-                RetryWithToleranceOperator.NOOP_OPERATOR);
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR);
     }
 
     @After
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 800301e05dc..73689d35710 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -29,7 +29,7 @@
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
-import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -140,8 +140,8 @@ public void setup() {
                 WorkerSinkTask.class, new String[]{"createConsumer"},
                 taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics, keyConverter,
                 valueConverter, headerConverter,
-                new TransformationChain(Collections.emptyList(), 
RetryWithToleranceOperator.NOOP_OPERATOR),
-                pluginLoader, time, RetryWithToleranceOperator.NOOP_OPERATOR);
+                new TransformationChain(Collections.emptyList(), 
RetryWithToleranceOperatorTest.NOOP_OPERATOR),
+                pluginLoader, time, 
RetryWithToleranceOperatorTest.NOOP_OPERATOR);
 
         recordsReturned = 0;
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 1482d75b513..db73a8e0914 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -27,7 +27,7 @@
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import 
org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
-import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -151,7 +151,7 @@ private void createWorkerTask() {
     private void createWorkerTask(TargetState initialState) {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, 
initialState, keyConverter, valueConverter, headerConverter,
                 transformationChain, producer, offsetReader, offsetWriter, 
config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
-                RetryWithToleranceOperator.NOOP_OPERATOR);
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR);
     }
 
     @Test
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index de0ba8a9ddf..33349f4c2f5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.common.utils.MockTime;
@@ -62,7 +63,7 @@
     @Before
     public void setup() {
         metrics = new MockConnectMetrics();
-        retryWithToleranceOperator = new RetryWithToleranceOperator();
+        retryWithToleranceOperator = 
RetryWithToleranceOperatorTest.NOOP_OPERATOR;
     }
 
     @After
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 6fa7ed11b9e..77238e9aaad 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -70,6 +70,7 @@
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.eq;
 import static org.junit.Assert.assertEquals;
@@ -487,7 +488,7 @@ public void testAddRemoveTask() throws Exception {
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
-                EasyMock.eq(new TransformationChain(Collections.emptyList(), 
RetryWithToleranceOperator.NOOP_OPERATOR)),
+                EasyMock.eq(new TransformationChain(Collections.emptyList(), 
NOOP_OPERATOR)),
                 anyObject(KafkaProducer.class),
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
@@ -626,7 +627,7 @@ public void testCleanupTasksOnStop() throws Exception {
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
                 anyObject(JsonConverter.class),
-                EasyMock.eq(new TransformationChain(Collections.emptyList(), 
RetryWithToleranceOperator.NOOP_OPERATOR)),
+                EasyMock.eq(new TransformationChain(Collections.emptyList(), 
NOOP_OPERATOR)),
                 anyObject(KafkaProducer.class),
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
@@ -719,7 +720,7 @@ public void testConverterOverrides() throws Exception {
                 EasyMock.capture(keyConverter),
                 EasyMock.capture(valueConverter),
                 EasyMock.capture(headerConverter),
-                EasyMock.eq(new TransformationChain(Collections.emptyList(), 
RetryWithToleranceOperator.NOOP_OPERATOR)),
+                EasyMock.eq(new TransformationChain(Collections.emptyList(), 
NOOP_OPERATOR)),
                 anyObject(KafkaProducer.class),
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index f6a0507909c..b5410d071d8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -21,7 +21,11 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.runtime.ConnectMetrics;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.easymock.EasyMock;
 import org.easymock.Mock;
@@ -37,6 +41,8 @@
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
 import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
 
@@ -54,13 +60,14 @@
     @Mock
     Future<RecordMetadata> metadata;
 
-    private HashMap<String, Object> config;
+    @Mock
+    Plugins plugins;
+
     private ErrorHandlingMetrics errorHandlingMetrics;
     private MockConnectMetrics metrics;
 
     @Before
     public void setup() {
-        config = new HashMap<>();
         metrics = new MockConnectMetrics();
         errorHandlingMetrics = new ErrorHandlingMetrics(new 
ConnectorTaskId("connector-", 1), metrics);
     }
@@ -74,8 +81,7 @@ public void tearDown() {
 
     @Test
     public void testDLQConfigWithEmptyTopicName() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer);
-        deadLetterQueueReporter.configure(config);
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, config(emptyMap()));
         deadLetterQueueReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -90,8 +96,7 @@ public void testDLQConfigWithEmptyTopicName() {
 
     @Test
     public void testDLQConfigWithValidTopicName() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer);
-        
deadLetterQueueReporter.configure(config(DeadLetterQueueReporter.DLQ_TOPIC_NAME,
 DLQ_TOPIC));
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, 
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)));
         deadLetterQueueReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -106,8 +111,7 @@ public void testDLQConfigWithValidTopicName() {
 
     @Test
     public void testReportDLQTwice() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer);
-        
deadLetterQueueReporter.configure(config(DeadLetterQueueReporter.DLQ_TOPIC_NAME,
 DLQ_TOPIC));
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, 
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)));
         deadLetterQueueReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -123,8 +127,7 @@ public void testReportDLQTwice() {
 
     @Test
     public void testLogOnDisabledLogReporter() {
-        LogReporter logReporter = new LogReporter(TASK_ID);
-        logReporter.configure(config);
+        LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap()));
         logReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -137,8 +140,7 @@ public void testLogOnDisabledLogReporter() {
 
     @Test
     public void testLogOnEnabledLogReporter() {
-        LogReporter logReporter = new LogReporter(TASK_ID);
-        logReporter.configure(config(LogReporter.LOG_ENABLE, "true"));
+        LogReporter logReporter = new LogReporter(TASK_ID, 
config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")));
         logReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -151,8 +153,7 @@ public void testLogOnEnabledLogReporter() {
 
     @Test
     public void testLogMessageWithNoRecords() {
-        LogReporter logReporter = new LogReporter(TASK_ID);
-        logReporter.configure(config(LogReporter.LOG_ENABLE, "true"));
+        LogReporter logReporter = new LogReporter(TASK_ID, 
config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")));
         logReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -164,9 +165,11 @@ public void testLogMessageWithNoRecords() {
 
     @Test
     public void testLogMessageWithSinkRecords() {
-        LogReporter logReporter = new LogReporter(TASK_ID);
-        logReporter.configure(config(LogReporter.LOG_ENABLE, "true"));
-        logReporter.configure(config(LogReporter.LOG_INCLUDE_MESSAGES, 
"true"));
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
+        props.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
+
+        LogReporter logReporter = new LogReporter(TASK_ID, config(props));
         logReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -184,9 +187,12 @@ private ProcessingContext processingContext() {
         return context;
     }
 
-    private Map<String, Object> config(String key, Object val) {
-        config.put(key, val);
-        return config;
+    private SinkConnectorConfig config(Map<String, String> configProps) {
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.NAME_CONFIG, "test");
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
SinkTask.class.getName());
+        props.putAll(configProps);
+        return new SinkConnectorConfig(plugins, props);
     }
 
     private void assertErrorHandlingMetricValue(String name, double expected) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index 751510d5fa6..2d340ac32e5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -19,6 +19,9 @@
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.sink.SinkTask;
 import org.easymock.EasyMock;
 import org.easymock.Mock;
 import org.junit.Test;
@@ -31,7 +34,17 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.RetryWithToleranceOperatorConfig;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.utils.Time.SYSTEM;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_RETRY_MAX_DELAY_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_RETRY_MAX_DELAY_DEFAULT;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_RETRY_TIMEOUT_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_RETRY_TIMEOUT_DEFAULT;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_TOLERANCE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_TOLERANCE_DEFAULT;
+import static org.apache.kafka.connect.runtime.errors.ToleranceType.ALL;
+import static org.apache.kafka.connect.runtime.errors.ToleranceType.NONE;
 import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -43,6 +56,12 @@
 @PowerMockIgnore("javax.management.*")
 public class RetryWithToleranceOperatorTest {
 
+    public static final RetryWithToleranceOperator NOOP_OPERATOR = new 
RetryWithToleranceOperator(
+            ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, 
NONE, SYSTEM);
+    static {
+        NOOP_OPERATOR.metrics(new ErrorHandlingMetrics());
+    }
+
     @SuppressWarnings("unused")
     @Mock
     private Operation<String> mockOperation;
@@ -50,6 +69,9 @@
     @Mock
     ErrorHandlingMetrics errorHandlingMetrics;
 
+    @Mock
+    Plugins plugins;
+
     @Test
     public void testHandleExceptionInTransformations() {
         testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception());
@@ -108,10 +130,7 @@ private void testHandleExceptionInStage(Stage type, 
Exception ex) {
     }
 
     private RetryWithToleranceOperator setupExecutor() {
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator();
-        Map<String, Object> props = 
config(RetryWithToleranceOperator.RETRY_TIMEOUT, "0");
-        props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
-        retryWithToleranceOperator.configure(props);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
         return retryWithToleranceOperator;
     }
@@ -138,10 +157,7 @@ public void testExecAndHandleNonRetriableErrorThrice() 
throws Exception {
 
     public void execAndHandleRetriableError(int numRetriableExceptionsThrown, 
long expectedWait, Exception e) throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
-        Map<String, Object> props = 
config(RetryWithToleranceOperator.RETRY_TIMEOUT, "6000");
-        props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
-        retryWithToleranceOperator.configure(props);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
 
         
EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
@@ -159,10 +175,7 @@ public void execAndHandleRetriableError(int 
numRetriableExceptionsThrown, long e
 
     public void execAndHandleNonRetriableError(int 
numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
-        Map<String, Object> props = 
config(RetryWithToleranceOperator.RETRY_TIMEOUT, "6000");
-        props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all");
-        retryWithToleranceOperator.configure(props);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
 
         
EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
@@ -181,10 +194,7 @@ public void execAndHandleNonRetriableError(int 
numRetriableExceptionsThrown, lon
     @Test
     public void testCheckRetryLimit() {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
-        Map<String, Object> props = 
config(RetryWithToleranceOperator.RETRY_TIMEOUT, "500");
-        props.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, "100");
-        retryWithToleranceOperator.configure(props);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(500, 100, NONE, time);
 
         time.setCurrentTimeMs(100);
         assertTrue(retryWithToleranceOperator.checkRetry(0));
@@ -208,11 +218,7 @@ public void testCheckRetryLimit() {
     @Test
     public void testBackoffLimit() {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(time);
-
-        Map<String, Object> props = 
config(RetryWithToleranceOperator.RETRY_TIMEOUT, "5");
-        props.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, "5000");
-        retryWithToleranceOperator.configure(props);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(5, 5000, NONE, time);
 
         long prevTs = time.hiResClockMs();
         retryWithToleranceOperator.backoff(1, 5000);
@@ -243,56 +249,54 @@ public void testBackoffLimit() {
 
     @Test
     public void testToleranceLimit() {
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator();
-        
retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT,
 "none"));
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, 
ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
         retryWithToleranceOperator.markAsFailed();
         assertFalse("should not tolerate any errors", 
retryWithToleranceOperator.withinToleranceLimits());
 
-        retryWithToleranceOperator = new RetryWithToleranceOperator();
-        
retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT,
 "all"));
+        retryWithToleranceOperator = new 
RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, 
ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
         retryWithToleranceOperator.markAsFailed();
         retryWithToleranceOperator.markAsFailed();
         assertTrue("should tolerate all errors", 
retryWithToleranceOperator.withinToleranceLimits());
 
-        retryWithToleranceOperator = new RetryWithToleranceOperator();
-        
retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT,
 "none"));
+        retryWithToleranceOperator = new 
RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, 
ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
         assertTrue("no tolerance is within limits if no failures", 
retryWithToleranceOperator.withinToleranceLimits());
     }
 
     @Test
     public void testDefaultConfigs() {
-        RetryWithToleranceOperatorConfig configuration;
-        configuration = new RetryWithToleranceOperatorConfig(new HashMap<>());
-        assertEquals(configuration.retryTimeout(), 0);
-        assertEquals(configuration.retryDelayMax(), 60000);
-        assertEquals(configuration.toleranceLimit(), ToleranceType.NONE);
+        ConnectorConfig configuration = config(emptyMap());
+        assertEquals(configuration.errorRetryTimeout(), 
ERRORS_RETRY_TIMEOUT_DEFAULT);
+        assertEquals(configuration.errorMaxDelayInMillis(), 
ERRORS_RETRY_MAX_DELAY_DEFAULT);
+        assertEquals(configuration.errorToleranceType(), 
ERRORS_TOLERANCE_DEFAULT);
 
         PowerMock.verifyAll();
     }
 
+    ConnectorConfig config(Map<String, String> connProps) {
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.NAME_CONFIG, "test");
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
SinkTask.class.getName());
+        props.putAll(connProps);
+        return new ConnectorConfig(plugins, props);
+    }
+
     @Test
-    public void testConfigs() {
-        RetryWithToleranceOperatorConfig configuration;
-        configuration = new 
RetryWithToleranceOperatorConfig(config("retry.timeout", "100"));
-        assertEquals(configuration.retryTimeout(), 100);
+    public void testSetConfigs() {
+        ConnectorConfig configuration;
+        configuration = config(singletonMap(ERRORS_RETRY_TIMEOUT_CONFIG, 
"100"));
+        assertEquals(configuration.errorRetryTimeout(), 100);
 
-        configuration = new 
RetryWithToleranceOperatorConfig(config("retry.delay.max.ms", "100"));
-        assertEquals(configuration.retryDelayMax(), 100);
+        configuration = config(singletonMap(ERRORS_RETRY_MAX_DELAY_CONFIG, 
"100"));
+        assertEquals(configuration.errorMaxDelayInMillis(), 100);
 
-        configuration = new 
RetryWithToleranceOperatorConfig(config("allowed.max", "none"));
-        assertEquals(configuration.toleranceLimit(), ToleranceType.NONE);
+        configuration = config(singletonMap(ERRORS_TOLERANCE_CONFIG, "none"));
+        assertEquals(configuration.errorToleranceType(), ToleranceType.NONE);
 
         PowerMock.verifyAll();
     }
 
-    Map<String, Object> config(String key, Object val) {
-        Map<String, Object> configs = new HashMap<>();
-        configs.put(key, val);
-        return configs;
-    }
-
     private static class ExceptionThrower implements Operation<Object> {
         private Exception e;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Missing Connector Config (errors.deadletterqueue.topic.name) kills Connect 
> Clusters
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-6981
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6981
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Arjun Satish
>            Assignee: Arjun Satish
>            Priority: Major
>             Fix For: 2.0.0
>
>
> The trunk version of AK currently tries to incorrectly read the property 
> (errors.deadletterqueue.topic.name) when starting a sink connector. This 
> fails no matter what the contents of the connector config are. The 
> ConnectorConfig does not define this property, and any calls to getString 
> will throw a ConfigException (since only known properties are retained by 
> AbstractConfig). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to