[ https://issues.apache.org/jira/browse/KAFKA-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502498#comment-16502498 ]
ASF GitHub Bot commented on KAFKA-6981: --------------------------------------- ewencp closed pull request #5125: KAFKA-6981: Move the error handling configuration properties into the ConnectorConfig and SinkConnectorConfig classes URL: https://github.com/apache/kafka/pull/5125 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index c54c160d5ab..f98469e5b5a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.transforms.Transformation; @@ -36,8 +37,8 @@ import java.util.List; import java.util.Map; -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; /** @@ -54,6 +55,7 @@ public class ConnectorConfig extends AbstractConfig { protected static final String COMMON_GROUP = "Common"; protected static final String TRANSFORMS_GROUP = "Transforms"; + protected static final String ERROR_GROUP = "Error Handling"; public static final String NAME_CONFIG = "name"; private static final String NAME_DOC = "Globally unique name to use for this connector."; @@ -106,6 +108,37 @@ public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.toString(); public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.toString(); + public static final String ERRORS_RETRY_TIMEOUT_CONFIG = "errors.retry.timeout"; + public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout for Errors"; + public static final int ERRORS_RETRY_TIMEOUT_DEFAULT = 0; + public static final String ERRORS_RETRY_TIMEOUT_DOC = "The maximum duration in milliseconds that a failed operation " + + "will be reattempted. The default is 0, which means no retries will be attempted. Use -1 for infinite retries."; + + public static final String ERRORS_RETRY_MAX_DELAY_CONFIG = "errors.retry.delay.max.ms"; + public static final String ERRORS_RETRY_MAX_DELAY_DISPLAY = "Maximum Delay Between Retries for Errors"; + public static final int ERRORS_RETRY_MAX_DELAY_DEFAULT = 60000; + public static final String ERRORS_RETRY_MAX_DELAY_DOC = "The maximum duration in milliseconds between consecutive retry attempts. " + + "Jitter will be added to the delay once this limit is reached to prevent thundering herd issues."; + + public static final String ERRORS_TOLERANCE_CONFIG = "errors.allowed.max"; + public static final String ERRORS_TOLERANCE_DISPLAY = "Error Tolerance"; + public static final ToleranceType ERRORS_TOLERANCE_DEFAULT = ToleranceType.NONE; + public static final String ERRORS_TOLERANCE_DOC = "Behavior for tolerating errors during connector operation. 'none' is the default value " + + "and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records."; + + public static final String ERRORS_LOG_ENABLE_CONFIG = "errors.log.enable"; + public static final String ERRORS_LOG_ENABLE_DISPLAY = "Log Errors"; + public static final boolean ERRORS_LOG_ENABLE_DEFAULT = false; + public static final String ERRORS_LOG_ENABLE_DOC = "If true, write each error and the details of the failed operation and problematic record " + + "to the Connect application log. This is 'false' by default, so that only errors that are not tolerated are reported."; + + public static final String ERRORS_LOG_INCLUDE_MESSAGES_CONFIG = "errors.log.include.messages"; + public static final String ERRORS_LOG_INCLUDE_MESSAGES_DISPLAY = "Log Error Details"; + public static final boolean ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT = false; + public static final String ERRORS_LOG_INCLUDE_MESSAGES_DOC = "Whether to the include in the log the Connect record that resulted in " + + "a failure. This is 'false' by default, which will prevent record keys, values, and headers from being written to log files, " + + "although some information such as topic and partition number will still be logged."; + private final EnrichedConnectorConfig enrichedConfig; private static class EnrichedConnectorConfig extends AbstractConfig { EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> props) { @@ -120,6 +153,7 @@ public Object get(String key) { public static ConfigDef configDef() { int orderInGroup = 0; + int orderInErrorGroup = 0; return new ConfigDef() .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY) .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY) @@ -138,7 +172,18 @@ public void ensureValid(String name, Object value) { }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY) .define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART, in(CONFIG_RELOAD_ACTION_NONE, CONFIG_RELOAD_ACTION_RESTART), Importance.LOW, - CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY); + CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY) + .define(ERRORS_RETRY_TIMEOUT_CONFIG, Type.LONG, ERRORS_RETRY_TIMEOUT_DEFAULT, Importance.MEDIUM, + ERRORS_RETRY_TIMEOUT_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.MEDIUM, ERRORS_RETRY_TIMEOUT_DISPLAY) + .define(ERRORS_RETRY_MAX_DELAY_CONFIG, Type.LONG, ERRORS_RETRY_MAX_DELAY_DEFAULT, Importance.MEDIUM, + ERRORS_RETRY_MAX_DELAY_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.MEDIUM, ERRORS_RETRY_MAX_DELAY_DISPLAY) + .define(ERRORS_TOLERANCE_CONFIG, Type.STRING, ERRORS_TOLERANCE_DEFAULT.value(), + in(ToleranceType.NONE.value(), ToleranceType.ALL.value()), Importance.MEDIUM, + ERRORS_TOLERANCE_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.SHORT, ERRORS_TOLERANCE_DISPLAY) + .define(ERRORS_LOG_ENABLE_CONFIG, Type.BOOLEAN, ERRORS_LOG_ENABLE_DEFAULT, Importance.MEDIUM, + ERRORS_LOG_ENABLE_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.SHORT, ERRORS_LOG_ENABLE_DISPLAY) + .define(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, Type.BOOLEAN, ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT, Importance.MEDIUM, + ERRORS_LOG_INCLUDE_MESSAGES_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.SHORT, ERRORS_LOG_INCLUDE_MESSAGES_DISPLAY); } public ConnectorConfig(Plugins plugins) { @@ -162,6 +207,32 @@ public Object get(String key) { return enrichedConfig.get(key); } + public long errorRetryTimeout() { + return getLong(ERRORS_RETRY_TIMEOUT_CONFIG); + } + + public long errorMaxDelayInMillis() { + return getLong(ERRORS_RETRY_MAX_DELAY_CONFIG); + } + + public ToleranceType errorToleranceType() { + String tolerance = getString(ERRORS_TOLERANCE_CONFIG); + for (ToleranceType type: ToleranceType.values()) { + if (type.name().equalsIgnoreCase(tolerance)) { + return type; + } + } + return ERRORS_TOLERANCE_DEFAULT; + } + + public boolean enableErrorLog() { + return getBoolean(ERRORS_LOG_ENABLE_CONFIG); + } + + public boolean includeRecordDetailsInErrorLog() { + return getBoolean(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG); + } + /** * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}. */ diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java index 887a4da2dea..9629f8f0e42 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.sink.SinkTask; @@ -42,9 +43,19 @@ public static final String TOPICS_REGEX_DEFAULT = ""; private static final String TOPICS_REGEX_DISPLAY = "Topics regex"; + public static final String DLQ_PREFIX = "errors.deadletterqueue."; + + public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + "topic.name"; + public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic to be used as the dead letter queue (DLQ) for messages that " + + "result in an error when processed by this sink connector, or its transformations or converters. The topic name is blank by default, " + + "which means that no messages are to be recorded in the DLQ."; + public static final String DLQ_TOPIC_DEFAULT = ""; + private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name"; + static ConfigDef config = ConnectorConfig.configDef() .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY) - .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY); + .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY) + .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY); public static ConfigDef configDef() { return config; @@ -83,4 +94,7 @@ public static boolean hasTopicsRegexConfig(Map<String, String> props) { return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty(); } + public String dlqTopicName() { + return getString(DLQ_TOPIC_NAME_CONFIG); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 7a72a0e7b26..97e68faa4ca 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -485,8 +485,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, ClassLoader loader) { ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(); - retryWithToleranceOperator.configure(connConfig.originalsWithPrefix("errors.")); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(), + connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM); retryWithToleranceOperator.metrics(errorHandlingMetrics); // Decide which type of worker task we need based on the type of task. @@ -505,7 +505,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, time, retryWithToleranceOperator); } else if (task instanceof SinkTask) { TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator); - retryWithToleranceOperator.reporters(sinkTaskReporters(id, connConfig, errorHandlingMetrics)); + SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings()); + retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics)); return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, valueConverter, headerConverter, transformationChain, loader, time, retryWithToleranceOperator); @@ -519,19 +520,17 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) { return new ErrorHandlingMetrics(id, metrics); } - private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, ConnectorConfig connConfig, - ErrorHandlingMetrics errorHandlingMetrics) { + private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, SinkConnectorConfig connConfig, + ErrorHandlingMetrics errorHandlingMetrics) { ArrayList<ErrorReporter> reporters = new ArrayList<>(); - LogReporter logReporter = new LogReporter(id); - logReporter.configure(connConfig.originalsWithPrefix(LogReporter.PREFIX + ".")); + LogReporter logReporter = new LogReporter(id, connConfig); logReporter.metrics(errorHandlingMetrics); reporters.add(logReporter); // check if topic for dead letter queue exists - String topic = connConfig.getString(DeadLetterQueueReporter.PREFIX + "." + DeadLetterQueueReporter.DLQ_TOPIC_NAME); + String topic = connConfig.dlqTopicName(); if (topic != null && !topic.isEmpty()) { DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, connConfig, producerProps); - reporter.configure(connConfig.originalsWithPrefix(DeadLetterQueueReporter.PREFIX + ".")); reporters.add(reporter); } @@ -541,8 +540,7 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) { private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId id, ConnectorConfig connConfig, ErrorHandlingMetrics errorHandlingMetrics) { List<ErrorReporter> reporters = new ArrayList<>(); - LogReporter logReporter = new LogReporter(id); - logReporter.configure(connConfig.originalsWithPrefix(LogReporter.PREFIX + ".")); + LogReporter logReporter = new LogReporter(id, connConfig); logReporter.metrics(errorHandlingMetrics); reporters.add(logReporter); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java index 454a619ec76..9a8a9afb29b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java @@ -21,12 +21,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.WorkerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,22 +47,14 @@ private static final short DLQ_MAX_DESIRED_REPLICATION_FACTOR = 3; private static final int DLQ_NUM_DESIRED_PARTITIONS = 1; - public static final String PREFIX = "errors.deadletterqueue"; + private final SinkConnectorConfig connConfig; - public static final String DLQ_TOPIC_NAME = "topic.name"; - public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic where these messages are written to."; - public static final String DLQ_TOPIC_DEFAULT = ""; - - private DeadLetterQueueReporterConfig config; private KafkaProducer<byte[], byte[]> kafkaProducer; private ErrorHandlingMetrics errorHandlingMetrics; - private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(DLQ_TOPIC_NAME, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, ConfigDef.Importance.HIGH, DLQ_TOPIC_NAME_DOC); - public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, - ConnectorConfig connConfig, Map<String, Object> producerProps) { - String topic = connConfig.getString(PREFIX + "." + DLQ_TOPIC_NAME); + SinkConnectorConfig connConfig, Map<String, Object> producerProps) { + String topic = connConfig.dlqTopicName(); try (AdminClient admin = AdminClient.create(workerConfig.originals())) { if (!admin.listTopics().names().get().contains(topic)) { @@ -81,7 +71,7 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, } KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps); - return new DeadLetterQueueReporter(dlqProducer); + return new DeadLetterQueueReporter(dlqProducer, connConfig); } /** @@ -90,13 +80,9 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, * @param kafkaProducer a Kafka Producer to produce the original consumed records. */ // Visible for testing - DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer) { + DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig) { this.kafkaProducer = kafkaProducer; - } - - @Override - public void configure(Map<String, ?> configs) { - config = new DeadLetterQueueReporterConfig(configs); + this.connConfig = connConfig; } @Override @@ -110,7 +96,8 @@ public void metrics(ErrorHandlingMetrics errorHandlingMetrics) { * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}. */ public void report(ProcessingContext context) { - if (config.topic().isEmpty()) { + final String dlqTopicName = connConfig.dlqTopicName(); + if (dlqTopicName.isEmpty()) { return; } @@ -124,31 +111,18 @@ public void report(ProcessingContext context) { ProducerRecord<byte[], byte[]> producerRecord; if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) { - producerRecord = new ProducerRecord<>(config.topic(), null, + producerRecord = new ProducerRecord<>(dlqTopicName, null, originalMessage.key(), originalMessage.value(), originalMessage.headers()); } else { - producerRecord = new ProducerRecord<>(config.topic(), null, originalMessage.timestamp(), + producerRecord = new ProducerRecord<>(dlqTopicName, null, originalMessage.timestamp(), originalMessage.key(), originalMessage.value(), originalMessage.headers()); } this.kafkaProducer.send(producerRecord, (metadata, exception) -> { if (exception != null) { - log.error("Could not produce message to dead letter queue. topic=" + config.topic(), exception); + log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception); errorHandlingMetrics.recordDeadLetterQueueProduceFailed(); } }); } - - static class DeadLetterQueueReporterConfig extends AbstractConfig { - public DeadLetterQueueReporterConfig(Map<?, ?> originals) { - super(CONFIG_DEF, originals, true); - } - - /** - * @return name of the dead letter queue topic. - */ - public String topic() { - return getString(DLQ_TOPIC_NAME); - } - } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java index e71b6bc8ba9..f7df1b2d1a3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java @@ -16,12 +16,10 @@ */ package org.apache.kafka.connect.runtime.errors; -import org.apache.kafka.common.Configurable; - /** * Report an error using the information contained in the {@link ProcessingContext}. */ -public interface ErrorReporter extends Configurable { +public interface ErrorReporter { /** * Report an error. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java index 1d2c08fd18b..e81bd547568 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java @@ -16,14 +16,11 @@ */ package org.apache.kafka.connect.runtime.errors; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - /** * Writes errors and their context to application logs. */ @@ -31,29 +28,16 @@ private static final Logger log = LoggerFactory.getLogger(LogReporter.class); - public static final String PREFIX = "errors.log"; - - public static final String LOG_ENABLE = "enable"; - public static final String LOG_ENABLE_DOC = "If true, log to application logs the errors and the information describing where they occurred."; - public static final boolean LOG_ENABLE_DEFAULT = false; - - public static final String LOG_INCLUDE_MESSAGES = "include.messages"; - public static final String LOG_INCLUDE_MESSAGES_DOC = "If true, include in the application log the Connect key, value, and other details of records that resulted in errors and failures."; - public static final boolean LOG_INCLUDE_MESSAGES_DEFAULT = false; - private final ConnectorTaskId id; + private final ConnectorConfig connConfig; - private LogReporterConfig config; private ErrorHandlingMetrics errorHandlingMetrics; - public LogReporter(ConnectorTaskId id) { + public LogReporter(ConnectorTaskId id, ConnectorConfig connConfig) { this.id = id; + this.connConfig = connConfig; } - private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(LOG_ENABLE, ConfigDef.Type.BOOLEAN, LOG_ENABLE_DEFAULT, ConfigDef.Importance.MEDIUM, LOG_ENABLE_DOC) - .define(LOG_INCLUDE_MESSAGES, ConfigDef.Type.BOOLEAN, LOG_INCLUDE_MESSAGES_DEFAULT, ConfigDef.Importance.MEDIUM, LOG_INCLUDE_MESSAGES_DOC); - /** * Log error context. * @@ -61,7 +45,7 @@ public LogReporter(ConnectorTaskId id) { */ @Override public void report(ProcessingContext context) { - if (!config.isEnabled()) { + if (!connConfig.enableErrorLog()) { return; } @@ -80,32 +64,8 @@ public void metrics(ErrorHandlingMetrics errorHandlingMetrics) { // Visible for testing String message(ProcessingContext context) { - return String.format("Error encountered in task %s. %s", String.valueOf(id), context.toString(config.canLogMessages())); - } - - @Override - public void configure(Map<String, ?> configs) { - config = new LogReporterConfig(configs); - } - - private static class LogReporterConfig extends AbstractConfig { - public LogReporterConfig(Map<?, ?> originals) { - super(CONFIG_DEF, originals, true); - } - - /** - * @return true, if logging of error context is desired; false otherwise. - */ - public boolean isEnabled() { - return getBoolean(LOG_ENABLE); - } - - /** - * @return if false, the connect record which caused the exception is not logged. - */ - public boolean canLogMessages() { - return getBoolean(LOG_INCLUDE_MESSAGES); - } + return String.format("Error encountered in task %s. %s", String.valueOf(id), + context.toString(connConfig.includeRecordDetailsInErrorLog())); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java index 941abf3d3d2..eadf276adb3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java @@ -17,37 +17,31 @@ package org.apache.kafka.connect.runtime.errors; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; -import static org.apache.kafka.common.config.ConfigDef.ValidString.in; - /** * Attempt to recover a failed operation with retries and tolerance limits. * <p> * * A retry is attempted if the operation throws a {@link RetriableException}. Retries are accompanied by exponential backoffs, starting with - * {@link #RETRIES_DELAY_MIN_MS}, up to what is specified with {@link RetryWithToleranceOperatorConfig#retryDelayMax()}. + * {@link #RETRIES_DELAY_MIN_MS}, up to what is specified with {@link ConnectorConfig#errorMaxDelayInMillis()}. * Including the first attempt and future retries, the total time taken to evaluate the operation should be within - * {@link RetryWithToleranceOperatorConfig#retryDelayMax()} millis. + * {@link ConnectorConfig#errorMaxDelayInMillis()} millis. * <p> * - * This executor will tolerate failures, as specified by {@link RetryWithToleranceOperatorConfig#toleranceLimit()}. + * This executor will tolerate failures, as specified by {@link ConnectorConfig#errorToleranceType()}. * For transformations and converters, all exceptions are tolerated. For others operations, only {@link RetriableException} are tolerated. * <p> * @@ -61,27 +55,8 @@ private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class); - public static final String RETRY_TIMEOUT = "retry.timeout"; - public static final String RETRY_TIMEOUT_DOC = "The total duration in milliseconds a failed operation will be retried for."; - public static final long RETRY_TIMEOUT_DEFAULT = 0; - - public static final String RETRY_DELAY_MAX_MS = "retry.delay.max.ms"; - public static final String RETRY_DELAY_MAX_MS_DOC = "The maximum duration between two consecutive retries (in milliseconds)."; - public static final long RETRY_DELAY_MAX_MS_DEFAULT = 60000; - public static final long RETRIES_DELAY_MIN_MS = 300; - public static final String TOLERANCE_LIMIT = "allowed.max"; - public static final String TOLERANCE_LIMIT_DOC = "Fail the task if we exceed specified number of errors overall."; - public static final String TOLERANCE_LIMIT_DEFAULT = "none"; - - // for testing only - public static final RetryWithToleranceOperator NOOP_OPERATOR = new RetryWithToleranceOperator(); - static { - NOOP_OPERATOR.configure(Collections.emptyMap()); - NOOP_OPERATOR.metrics(new ErrorHandlingMetrics()); - } - private static final Map<Stage, Class<? extends Exception>> TOLERABLE_EXCEPTIONS = new HashMap<>(); static { TOLERABLE_EXCEPTIONS.put(Stage.TRANSFORMATION, Exception.class); @@ -90,29 +65,24 @@ TOLERABLE_EXCEPTIONS.put(Stage.VALUE_CONVERTER, Exception.class); } + private final long errorRetryTimeout; + private final long errorMaxDelayInMillis; + private final ToleranceType errorToleranceType; + private long totalFailures = 0; private final Time time; - private RetryWithToleranceOperatorConfig config; private ErrorHandlingMetrics errorHandlingMetrics; protected ProcessingContext context = new ProcessingContext(); - public RetryWithToleranceOperator() { - this(new SystemTime()); - } - - // Visible for testing - public RetryWithToleranceOperator(Time time) { + public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, + ToleranceType toleranceType, Time time) { + this.errorRetryTimeout = errorRetryTimeout; + this.errorMaxDelayInMillis = errorMaxDelayInMillis; + this.errorToleranceType = toleranceType; this.time = time; } - static ConfigDef getConfigDef() { - return new ConfigDef() - .define(RETRY_TIMEOUT, ConfigDef.Type.LONG, RETRY_TIMEOUT_DEFAULT, ConfigDef.Importance.HIGH, RETRY_TIMEOUT_DOC) - .define(RETRY_DELAY_MAX_MS, ConfigDef.Type.LONG, RETRY_DELAY_MAX_MS_DEFAULT, atLeast(1), ConfigDef.Importance.MEDIUM, RETRY_DELAY_MAX_MS_DOC) - .define(TOLERANCE_LIMIT, ConfigDef.Type.STRING, TOLERANCE_LIMIT_DEFAULT, in("none", "all"), ConfigDef.Importance.HIGH, TOLERANCE_LIMIT_DOC); - } - /** * Execute the recoverable operation. If the operation is already in a failed state, then simply return * with the existing failure. @@ -151,7 +121,7 @@ static ConfigDef getConfigDef() { protected <V> V execAndRetry(Operation<V> operation) throws Exception { int attempt = 0; long startTime = time.milliseconds(); - long deadline = startTime + config.retryTimeout(); + long deadline = startTime + errorRetryTimeout; do { try { attempt++; @@ -221,27 +191,27 @@ void markAsFailed() { // Visible for testing boolean withinToleranceLimits() { - switch (config.toleranceLimit()) { + switch (errorToleranceType) { case NONE: if (totalFailures > 0) return false; case ALL: return true; default: - throw new ConfigException("Unknown tolerance type: {}", config.toleranceLimit()); + throw new ConfigException("Unknown tolerance type: {}", errorToleranceType); } } // Visible for testing boolean checkRetry(long startTime) { - return (time.milliseconds() - startTime) < config.retryTimeout(); + return (time.milliseconds() - startTime) < errorRetryTimeout; } // Visible for testing void backoff(int attempt, long deadline) { int numRetry = attempt - 1; long delay = RETRIES_DELAY_MIN_MS << numRetry; - if (delay > config.retryDelayMax()) { - delay = ThreadLocalRandom.current().nextLong(config.retryDelayMax()); + if (delay > errorMaxDelayInMillis) { + delay = ThreadLocalRandom.current().nextLong(errorMaxDelayInMillis); } if (delay + time.milliseconds() > deadline) { delay = deadline - time.milliseconds(); @@ -250,45 +220,19 @@ void backoff(int attempt, long deadline) { time.sleep(delay); } - public void configure(Map<String, ?> configs) { - config = new RetryWithToleranceOperatorConfig(configs); - } - public void metrics(ErrorHandlingMetrics errorHandlingMetrics) { this.errorHandlingMetrics = errorHandlingMetrics; } - static class RetryWithToleranceOperatorConfig extends AbstractConfig { - public RetryWithToleranceOperatorConfig(Map<?, ?> originals) { - super(getConfigDef(), originals, true); - } - - /** - * @return the total time an operation can take to succeed (including the first attempt and retries). - */ - public long retryTimeout() { - return getLong(RETRY_TIMEOUT); - } - - /** - * @return the maximum delay between two subsequent retries in milliseconds. - */ - public long retryDelayMax() { - return getLong(RETRY_DELAY_MAX_MS); - } - - /** - * @return determine how many errors to tolerate. - */ - public ToleranceType toleranceLimit() { - return ToleranceType.fromString(getString(TOLERANCE_LIMIT)); - } - } - @Override public String toString() { return "RetryWithToleranceOperator{" + - "config=" + config + + "errorRetryTimeout=" + errorRetryTimeout + + ", errorMaxDelayInMillis=" + errorMaxDelayInMillis + + ", errorToleranceType=" + errorToleranceType + + ", totalFailures=" + totalFailures + + ", time=" + time + + ", context=" + context + '}'; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java index 79956ac7963..dd40a60648a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ToleranceType.java @@ -33,7 +33,8 @@ */ ALL; - public static ToleranceType fromString(String typeStr) { - return "ALL".equals(typeStr.toUpperCase(Locale.ROOT)) ? ToleranceType.ALL : ToleranceType.NONE; + public String value() { + return name().toLowerCase(Locale.ROOT); } + } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index da017e851b7..5728465095a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -170,16 +170,16 @@ public void testConfigValidationMissingName() { // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on // the config fields for SourceConnectorConfig, but we expect these to change rarely. assertEquals(TestSourceConnector.class.getName(), result.name()); - assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP), result.groups()); + assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP, ConnectorConfig.ERROR_GROUP), result.groups()); assertEquals(2, result.errorCount()); - // Base connector config has 8 fields, connector's configs add 2 - assertEquals(10, result.values().size()); + // Base connector config has 13 fields, connector's configs add 2 + assertEquals(15, result.values().size()); // Missing name should generate an error assertEquals(ConnectorConfig.NAME_CONFIG, result.values().get(0).configValue().name()); assertEquals(1, result.values().get(0).configValue().errors().size()); // "required" config from connector should generate an error - assertEquals("required", result.values().get(8).configValue().name()); - assertEquals(1, result.values().get(8).configValue().errors().size()); + assertEquals("required", result.values().get(13).configValue().name()); + assertEquals(1, result.values().get(13).configValue().errors().size()); verifyAll(); } @@ -228,20 +228,21 @@ public void testConfigValidationTransformsExtendResults() { List<String> expectedGroups = Arrays.asList( ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP, + ConnectorConfig.ERROR_GROUP, "Transforms: xformA", "Transforms: xformB" ); assertEquals(expectedGroups, result.groups()); assertEquals(2, result.errorCount()); - // Base connector config has 8 fields, connector's configs add 2, 2 type fields from the transforms, and + // Base connector config has 13 fields, connector's configs add 2, 2 type fields from the transforms, and // 1 from the valid transformation's config - assertEquals(13, result.values().size()); + assertEquals(18, result.values().size()); // Should get 2 type fields from the transforms, first adds its own config since it has a valid class - assertEquals("transforms.xformA.type", result.values().get(8).configValue().name()); - assertTrue(result.values().get(8).configValue().errors().isEmpty()); - assertEquals("transforms.xformA.subconfig", result.values().get(9).configValue().name()); - assertEquals("transforms.xformB.type", result.values().get(10).configValue().name()); - assertFalse(result.values().get(10).configValue().errors().isEmpty()); + assertEquals("transforms.xformA.type", result.values().get(13).configValue().name()); + assertTrue(result.values().get(13).configValue().errors().isEmpty()); + assertEquals("transforms.xformA.subconfig", result.values().get(14).configValue().name()); + assertEquals("transforms.xformB.type", result.values().get(15).configValue().name()); + assertFalse(result.values().get(15).configValue().errors().isEmpty()); verifyAll(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index b50e7ff0956..e931642afcc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -35,7 +35,9 @@ import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.LogReporter; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; @@ -69,6 +71,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Time.SYSTEM; import static org.junit.Assert.assertEquals; @RunWith(PowerMockRunner.class) @@ -81,6 +84,8 @@ private static final int PARTITION2 = 13; private static final long FIRST_OFFSET = 45; + @Mock Plugins plugins; + private static final Map<String, String> TASK_PROPS = new HashMap<>(); static { @@ -88,17 +93,11 @@ TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName()); } - private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS); - - private static final Map<String, String> OPERATION_EXECUTOR_PROPS = new HashMap<>(); + public static final long OPERATOR_RETRY_TIMEOUT_MILLIS = 60000; + public static final long OPERATOR_RETRY_MAX_DELAY_MILLIS = 5000; + public static final ToleranceType OPERATOR_TOLERANCE_TYPE = ToleranceType.ALL; - static { - OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all"); - // wait up to 1 minute for an operation - OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.RETRY_TIMEOUT, "60000"); - // wait up 5 seconds between subsequent retries - OPERATION_EXECUTOR_PROPS.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, "5000"); - } + private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private TargetState initialState = TargetState.STARTED; @@ -164,15 +163,13 @@ public void tearDown() { @Test public void testErrorHandlingInSinkTasks() throws Exception { - LogReporter reporter = new LogReporter(taskId); - Map<String, Object> reportProps = new HashMap<>(); - reportProps.put(LogReporter.LOG_ENABLE, "true"); - reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true"); - reporter.configure(reportProps); + Map<String, String> reportProps = new HashMap<>(); + reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); + reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); + LogReporter reporter = new LogReporter(taskId, connConfig(reportProps)); reporter.metrics(errorHandlingMetrics); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time); - retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS); + RetryWithToleranceOperator retryWithToleranceOperator = operator(); retryWithToleranceOperator.metrics(errorHandlingMetrics); retryWithToleranceOperator.reporters(singletonList(reporter)); createSinkTask(initialState, retryWithToleranceOperator); @@ -212,17 +209,19 @@ public void testErrorHandlingInSinkTasks() throws Exception { PowerMock.verifyAll(); } + private RetryWithToleranceOperator operator() { + return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM); + } + @Test public void testErrorHandlingInSourceTasks() throws Exception { - LogReporter reporter = new LogReporter(taskId); - Map<String, Object> reportProps = new HashMap<>(); - reportProps.put(LogReporter.LOG_ENABLE, "true"); - reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true"); - reporter.configure(reportProps); + Map<String, String> reportProps = new HashMap<>(); + reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); + reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); + LogReporter reporter = new LogReporter(taskId, connConfig(reportProps)); reporter.metrics(errorHandlingMetrics); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time); - retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS); + RetryWithToleranceOperator retryWithToleranceOperator = operator(); retryWithToleranceOperator.metrics(errorHandlingMetrics); retryWithToleranceOperator.reporters(singletonList(reporter)); createSourceTask(initialState, retryWithToleranceOperator); @@ -271,17 +270,23 @@ public void testErrorHandlingInSourceTasks() throws Exception { PowerMock.verifyAll(); } + private ConnectorConfig connConfig(Map<String, String> connProps) { + Map<String, String> props = new HashMap<>(); + props.put(ConnectorConfig.NAME_CONFIG, "test"); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SinkTask.class.getName()); + props.putAll(connProps); + return new ConnectorConfig(plugins, props); + } + @Test public void testErrorHandlingInSourceTasksWthBadConverter() throws Exception { - LogReporter reporter = new LogReporter(taskId); - Map<String, Object> reportProps = new HashMap<>(); - reportProps.put(LogReporter.LOG_ENABLE, "true"); - reportProps.put(LogReporter.LOG_INCLUDE_MESSAGES, "true"); - reporter.configure(reportProps); + Map<String, String> reportProps = new HashMap<>(); + reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); + reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); + LogReporter reporter = new LogReporter(taskId, connConfig(reportProps)); reporter.metrics(errorHandlingMetrics); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time); - retryWithToleranceOperator.configure(OPERATION_EXECUTOR_PROPS); + RetryWithToleranceOperator retryWithToleranceOperator = operator(); retryWithToleranceOperator.metrics(errorHandlingMetrics); retryWithToleranceOperator.reporters(singletonList(reporter)); createSourceTask(initialState, retryWithToleranceOperator, badConverter()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index d23adbf3d69..4a7c760fc74 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -34,7 +34,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup; -import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; @@ -166,7 +166,7 @@ private void createTask(TargetState initialState) { taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, headerConverter, transformationChain, pluginLoader, time, - RetryWithToleranceOperator.NOOP_OPERATOR); + RetryWithToleranceOperatorTest.NOOP_OPERATOR); } @After diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 800301e05dc..73689d35710 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -29,7 +29,7 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; -import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; @@ -140,8 +140,8 @@ public void setup() { WorkerSinkTask.class, new String[]{"createConsumer"}, taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, headerConverter, - new TransformationChain(Collections.emptyList(), RetryWithToleranceOperator.NOOP_OPERATOR), - pluginLoader, time, RetryWithToleranceOperator.NOOP_OPERATOR); + new TransformationChain(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR), + pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR); recordsReturned = 0; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 1482d75b513..db73a8e0914 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -27,7 +27,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; -import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.source.SourceRecord; @@ -151,7 +151,7 @@ private void createWorkerTask() { private void createWorkerTask(TargetState initialState) { workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, - RetryWithToleranceOperator.NOOP_OPERATOR); + RetryWithToleranceOperatorTest.NOOP_OPERATOR); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java index de0ba8a9ddf..33349f4c2f5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.common.utils.MockTime; @@ -62,7 +63,7 @@ @Before public void setup() { metrics = new MockConnectMetrics(); - retryWithToleranceOperator = new RetryWithToleranceOperator(); + retryWithToleranceOperator = RetryWithToleranceOperatorTest.NOOP_OPERATOR; } @After diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 6fa7ed11b9e..77238e9aaad 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -70,6 +70,7 @@ import java.util.List; import java.util.Map; +import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.eq; import static org.junit.Assert.assertEquals; @@ -487,7 +488,7 @@ public void testAddRemoveTask() throws Exception { anyObject(JsonConverter.class), anyObject(JsonConverter.class), anyObject(JsonConverter.class), - EasyMock.eq(new TransformationChain(Collections.emptyList(), RetryWithToleranceOperator.NOOP_OPERATOR)), + EasyMock.eq(new TransformationChain(Collections.emptyList(), NOOP_OPERATOR)), anyObject(KafkaProducer.class), anyObject(OffsetStorageReader.class), anyObject(OffsetStorageWriter.class), @@ -626,7 +627,7 @@ public void testCleanupTasksOnStop() throws Exception { anyObject(JsonConverter.class), anyObject(JsonConverter.class), anyObject(JsonConverter.class), - EasyMock.eq(new TransformationChain(Collections.emptyList(), RetryWithToleranceOperator.NOOP_OPERATOR)), + EasyMock.eq(new TransformationChain(Collections.emptyList(), NOOP_OPERATOR)), anyObject(KafkaProducer.class), anyObject(OffsetStorageReader.class), anyObject(OffsetStorageWriter.class), @@ -719,7 +720,7 @@ public void testConverterOverrides() throws Exception { EasyMock.capture(keyConverter), EasyMock.capture(valueConverter), EasyMock.capture(headerConverter), - EasyMock.eq(new TransformationChain(Collections.emptyList(), RetryWithToleranceOperator.NOOP_OPERATOR)), + EasyMock.eq(new TransformationChain(Collections.emptyList(), NOOP_OPERATOR)), anyObject(KafkaProducer.class), anyObject(OffsetStorageReader.class), anyObject(OffsetStorageWriter.class), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java index f6a0507909c..b5410d071d8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java @@ -21,7 +21,11 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.ConnectMetrics; +import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.MockConnectMetrics; +import org.apache.kafka.connect.runtime.SinkConnectorConfig; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.util.ConnectorTaskId; import org.easymock.EasyMock; import org.easymock.Mock; @@ -37,6 +41,8 @@ import java.util.Map; import java.util.concurrent.Future; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; @@ -54,13 +60,14 @@ @Mock Future<RecordMetadata> metadata; - private HashMap<String, Object> config; + @Mock + Plugins plugins; + private ErrorHandlingMetrics errorHandlingMetrics; private MockConnectMetrics metrics; @Before public void setup() { - config = new HashMap<>(); metrics = new MockConnectMetrics(); errorHandlingMetrics = new ErrorHandlingMetrics(new ConnectorTaskId("connector-", 1), metrics); } @@ -74,8 +81,7 @@ public void tearDown() { @Test public void testDLQConfigWithEmptyTopicName() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer); - deadLetterQueueReporter.configure(config); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(emptyMap())); deadLetterQueueReporter.metrics(errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -90,8 +96,7 @@ public void testDLQConfigWithEmptyTopicName() { @Test public void testDLQConfigWithValidTopicName() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer); - deadLetterQueueReporter.configure(config(DeadLetterQueueReporter.DLQ_TOPIC_NAME, DLQ_TOPIC)); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC))); deadLetterQueueReporter.metrics(errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -106,8 +111,7 @@ public void testDLQConfigWithValidTopicName() { @Test public void testReportDLQTwice() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer); - deadLetterQueueReporter.configure(config(DeadLetterQueueReporter.DLQ_TOPIC_NAME, DLQ_TOPIC)); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC))); deadLetterQueueReporter.metrics(errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -123,8 +127,7 @@ public void testReportDLQTwice() { @Test public void testLogOnDisabledLogReporter() { - LogReporter logReporter = new LogReporter(TASK_ID); - logReporter.configure(config); + LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap())); logReporter.metrics(errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -137,8 +140,7 @@ public void testLogOnDisabledLogReporter() { @Test public void testLogOnEnabledLogReporter() { - LogReporter logReporter = new LogReporter(TASK_ID); - logReporter.configure(config(LogReporter.LOG_ENABLE, "true")); + LogReporter logReporter = new LogReporter(TASK_ID, config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"))); logReporter.metrics(errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -151,8 +153,7 @@ public void testLogOnEnabledLogReporter() { @Test public void testLogMessageWithNoRecords() { - LogReporter logReporter = new LogReporter(TASK_ID); - logReporter.configure(config(LogReporter.LOG_ENABLE, "true")); + LogReporter logReporter = new LogReporter(TASK_ID, config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"))); logReporter.metrics(errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -164,9 +165,11 @@ public void testLogMessageWithNoRecords() { @Test public void testLogMessageWithSinkRecords() { - LogReporter logReporter = new LogReporter(TASK_ID); - logReporter.configure(config(LogReporter.LOG_ENABLE, "true")); - logReporter.configure(config(LogReporter.LOG_INCLUDE_MESSAGES, "true")); + Map<String, String> props = new HashMap<>(); + props.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); + props.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); + + LogReporter logReporter = new LogReporter(TASK_ID, config(props)); logReporter.metrics(errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -184,9 +187,12 @@ private ProcessingContext processingContext() { return context; } - private Map<String, Object> config(String key, Object val) { - config.put(key, val); - return config; + private SinkConnectorConfig config(Map<String, String> configProps) { + Map<String, String> props = new HashMap<>(); + props.put(ConnectorConfig.NAME_CONFIG, "test"); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SinkTask.class.getName()); + props.putAll(configProps); + return new SinkConnectorConfig(plugins, props); } private void assertErrorHandlingMetricValue(String name, double expected) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java index 751510d5fa6..2d340ac32e5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java @@ -19,6 +19,9 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.sink.SinkTask; import org.easymock.EasyMock; import org.easymock.Mock; import org.junit.Test; @@ -31,7 +34,17 @@ import java.util.HashMap; import java.util.Map; -import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.RetryWithToleranceOperatorConfig; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.common.utils.Time.SYSTEM; +import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_RETRY_MAX_DELAY_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_RETRY_MAX_DELAY_DEFAULT; +import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_RETRY_TIMEOUT_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_RETRY_TIMEOUT_DEFAULT; +import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_TOLERANCE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_TOLERANCE_DEFAULT; +import static org.apache.kafka.connect.runtime.errors.ToleranceType.ALL; +import static org.apache.kafka.connect.runtime.errors.ToleranceType.NONE; import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -43,6 +56,12 @@ @PowerMockIgnore("javax.management.*") public class RetryWithToleranceOperatorTest { + public static final RetryWithToleranceOperator NOOP_OPERATOR = new RetryWithToleranceOperator( + ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM); + static { + NOOP_OPERATOR.metrics(new ErrorHandlingMetrics()); + } + @SuppressWarnings("unused") @Mock private Operation<String> mockOperation; @@ -50,6 +69,9 @@ @Mock ErrorHandlingMetrics errorHandlingMetrics; + @Mock + Plugins plugins; + @Test public void testHandleExceptionInTransformations() { testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception()); @@ -108,10 +130,7 @@ private void testHandleExceptionInStage(Stage type, Exception ex) { } private RetryWithToleranceOperator setupExecutor() { - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(); - Map<String, Object> props = config(RetryWithToleranceOperator.RETRY_TIMEOUT, "0"); - props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all"); - retryWithToleranceOperator.configure(props); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM); retryWithToleranceOperator.metrics(errorHandlingMetrics); return retryWithToleranceOperator; } @@ -138,10 +157,7 @@ public void testExecAndHandleNonRetriableErrorThrice() throws Exception { public void execAndHandleRetriableError(int numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception { MockTime time = new MockTime(0, 0, 0); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time); - Map<String, Object> props = config(RetryWithToleranceOperator.RETRY_TIMEOUT, "6000"); - props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all"); - retryWithToleranceOperator.configure(props); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time); retryWithToleranceOperator.metrics(errorHandlingMetrics); EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown); @@ -159,10 +175,7 @@ public void execAndHandleRetriableError(int numRetriableExceptionsThrown, long e public void execAndHandleNonRetriableError(int numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception { MockTime time = new MockTime(0, 0, 0); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time); - Map<String, Object> props = config(RetryWithToleranceOperator.RETRY_TIMEOUT, "6000"); - props.put(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all"); - retryWithToleranceOperator.configure(props); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time); retryWithToleranceOperator.metrics(errorHandlingMetrics); EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown); @@ -181,10 +194,7 @@ public void execAndHandleNonRetriableError(int numRetriableExceptionsThrown, lon @Test public void testCheckRetryLimit() { MockTime time = new MockTime(0, 0, 0); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time); - Map<String, Object> props = config(RetryWithToleranceOperator.RETRY_TIMEOUT, "500"); - props.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, "100"); - retryWithToleranceOperator.configure(props); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(500, 100, NONE, time); time.setCurrentTimeMs(100); assertTrue(retryWithToleranceOperator.checkRetry(0)); @@ -208,11 +218,7 @@ public void testCheckRetryLimit() { @Test public void testBackoffLimit() { MockTime time = new MockTime(0, 0, 0); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(time); - - Map<String, Object> props = config(RetryWithToleranceOperator.RETRY_TIMEOUT, "5"); - props.put(RetryWithToleranceOperator.RETRY_DELAY_MAX_MS, "5000"); - retryWithToleranceOperator.configure(props); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(5, 5000, NONE, time); long prevTs = time.hiResClockMs(); retryWithToleranceOperator.backoff(1, 5000); @@ -243,56 +249,54 @@ public void testBackoffLimit() { @Test public void testToleranceLimit() { - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(); - retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT, "none")); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM); retryWithToleranceOperator.metrics(errorHandlingMetrics); retryWithToleranceOperator.markAsFailed(); assertFalse("should not tolerate any errors", retryWithToleranceOperator.withinToleranceLimits()); - retryWithToleranceOperator = new RetryWithToleranceOperator(); - retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT, "all")); + retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM); retryWithToleranceOperator.metrics(errorHandlingMetrics); retryWithToleranceOperator.markAsFailed(); retryWithToleranceOperator.markAsFailed(); assertTrue("should tolerate all errors", retryWithToleranceOperator.withinToleranceLimits()); - retryWithToleranceOperator = new RetryWithToleranceOperator(); - retryWithToleranceOperator.configure(config(RetryWithToleranceOperator.TOLERANCE_LIMIT, "none")); + retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM); assertTrue("no tolerance is within limits if no failures", retryWithToleranceOperator.withinToleranceLimits()); } @Test public void testDefaultConfigs() { - RetryWithToleranceOperatorConfig configuration; - configuration = new RetryWithToleranceOperatorConfig(new HashMap<>()); - assertEquals(configuration.retryTimeout(), 0); - assertEquals(configuration.retryDelayMax(), 60000); - assertEquals(configuration.toleranceLimit(), ToleranceType.NONE); + ConnectorConfig configuration = config(emptyMap()); + assertEquals(configuration.errorRetryTimeout(), ERRORS_RETRY_TIMEOUT_DEFAULT); + assertEquals(configuration.errorMaxDelayInMillis(), ERRORS_RETRY_MAX_DELAY_DEFAULT); + assertEquals(configuration.errorToleranceType(), ERRORS_TOLERANCE_DEFAULT); PowerMock.verifyAll(); } + ConnectorConfig config(Map<String, String> connProps) { + Map<String, String> props = new HashMap<>(); + props.put(ConnectorConfig.NAME_CONFIG, "test"); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SinkTask.class.getName()); + props.putAll(connProps); + return new ConnectorConfig(plugins, props); + } + @Test - public void testConfigs() { - RetryWithToleranceOperatorConfig configuration; - configuration = new RetryWithToleranceOperatorConfig(config("retry.timeout", "100")); - assertEquals(configuration.retryTimeout(), 100); + public void testSetConfigs() { + ConnectorConfig configuration; + configuration = config(singletonMap(ERRORS_RETRY_TIMEOUT_CONFIG, "100")); + assertEquals(configuration.errorRetryTimeout(), 100); - configuration = new RetryWithToleranceOperatorConfig(config("retry.delay.max.ms", "100")); - assertEquals(configuration.retryDelayMax(), 100); + configuration = config(singletonMap(ERRORS_RETRY_MAX_DELAY_CONFIG, "100")); + assertEquals(configuration.errorMaxDelayInMillis(), 100); - configuration = new RetryWithToleranceOperatorConfig(config("allowed.max", "none")); - assertEquals(configuration.toleranceLimit(), ToleranceType.NONE); + configuration = config(singletonMap(ERRORS_TOLERANCE_CONFIG, "none")); + assertEquals(configuration.errorToleranceType(), ToleranceType.NONE); PowerMock.verifyAll(); } - Map<String, Object> config(String key, Object val) { - Map<String, Object> configs = new HashMap<>(); - configs.put(key, val); - return configs; - } - private static class ExceptionThrower implements Operation<Object> { private Exception e; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Missing Connector Config (errors.deadletterqueue.topic.name) kills Connect > Clusters > ----------------------------------------------------------------------------------- > > Key: KAFKA-6981 > URL: https://issues.apache.org/jira/browse/KAFKA-6981 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.0.0 > Reporter: Arjun Satish > Assignee: Arjun Satish > Priority: Major > Fix For: 2.0.0 > > > The trunk version of AK currently tries to incorrectly read the property > (errors.deadletterqueue.topic.name) when starting a sink connector. This > fails no matter what the contents of the connector config are. The > ConnectorConfig does not define this property, and any calls to getString > will throw a ConfigException (since only known properties are retained by > AbstractConfig). -- This message was sent by Atlassian JIRA (v7.6.3#76005)