This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new c49954c KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter (#8829) c49954c is described below commit c49954c5cf0e5699a6885dd385e40c5b34af7e9c Author: Aakash Shah <as...@confluent.io> AuthorDate: Wed Jun 10 20:43:47 2020 -0700 KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter (#8829) Make sure that the Errant Record Reporter recently added in KIP-610 adheres to the `errors.tolerance` policy. Author: Aakash Shah <as...@confluent.io> Reviewers: Arjun Satish <arjunconfluent.io>, Randall Hauch <rha...@gmail.com> --- .../kafka/connect/runtime/WorkerSinkTask.java | 6 ++ .../runtime/errors/RetryWithToleranceOperator.java | 23 ++++++-- .../runtime/errors/WorkerErrantRecordReporter.java | 3 +- .../integration/ErrorHandlingIntegrationTest.java | 68 +++++++++++++++++++++ .../integration/ExampleConnectIntegrationTest.java | 69 ---------------------- .../errors/RetryWithToleranceOperatorTest.java | 10 ++++ 6 files changed, 104 insertions(+), 75 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 50efffc..11318fd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -558,6 +558,12 @@ class WorkerSinkTask extends WorkerTask { log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size()); long start = time.milliseconds(); task.put(new ArrayList<>(messageBatch)); + // if errors raised from the operator were swallowed by the task implementation, an + // exception needs to be thrown to kill the task indicating the tolerance was exceeded + if (retryWithToleranceOperator.failed() && !retryWithToleranceOperator.withinToleranceLimits()) { + throw new ConnectException("Tolerance exceeded in error handler", + retryWithToleranceOperator.error()); + } recordBatch(messageBatch.size()); sinkTaskMetricsGroup.recordPut(time.milliseconds() - start); currentOffsets.putAll(origOffsets); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java index ff1702d..4fb633d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java @@ -87,11 +87,18 @@ public class RetryWithToleranceOperator implements AutoCloseable { public Future<Void> executeFailed(Stage stage, Class<?> executingClass, ConsumerRecord<byte[], byte[]> consumerRecord, Throwable error) { + + markAsFailed(); context.consumerRecord(consumerRecord); context.currentContext(stage, executingClass); context.error(error); - errorHandlingMetrics.recordError(); - return context.report(); + errorHandlingMetrics.recordFailure(); + Future<Void> errantRecordFuture = context.report(); + if (!withinToleranceLimits()) { + errorHandlingMetrics.recordError(); + throw new ConnectException("Tolerance exceeded in error handler", error); + } + return errantRecordFuture; } /** @@ -200,9 +207,8 @@ public class RetryWithToleranceOperator implements AutoCloseable { totalFailures++; } - // Visible for testing @SuppressWarnings("fallthrough") - boolean withinToleranceLimits() { + public boolean withinToleranceLimits() { switch (errorToleranceType) { case NONE: if (totalFailures > 0) return false; @@ -282,6 +288,15 @@ public class RetryWithToleranceOperator implements AutoCloseable { return this.context.failed(); } + /** + * Returns the error encountered when processing the current stage. + * + * @return the error encountered when processing the current stage + */ + public Throwable error() { + return this.context.error(); + } + @Override public void close() { this.context.close(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java index 6e1fa53..b709e28 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java @@ -99,8 +99,7 @@ public class WorkerErrantRecordReporter implements ErrantRecordReporter { valLength, key, value, headers); } - Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, - SinkTask.class, consumerRecord, error); + Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, consumerRecord, error); if (!future.isDone()) { futures.add(future); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 8963b8c..b6211ed 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -181,6 +181,74 @@ public class ErrorHandlingIntegrationTest { } + @Test + public void testErrantRecordReporter() throws Exception { + // create test topic + connect.kafka().createTopic("test-topic"); + + // setup connector config + Map<String, String> props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, ErrantRecordSinkConnector.class.getSimpleName()); + props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS)); + props.put(TOPICS_CONFIG, "test-topic"); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + + // log all errors, along with message metadata + props.put(ERRORS_LOG_ENABLE_CONFIG, "true"); + props.put(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); + + // produce bad messages into dead letter queue + props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC); + props.put(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); + props.put(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); + + // tolerate all erros + props.put(ERRORS_TOLERANCE_CONFIG, "all"); + + // retry for up to one second + props.put(ERRORS_RETRY_TIMEOUT_CONFIG, "1000"); + + // set expected records to successfully reach the task + connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS); + + connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); + + waitForCondition(this::checkForPartitionAssignment, + CONNECTOR_SETUP_DURATION_MS, + "Connector task was not assigned a partition."); + + // produce some strings into test topic + for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { + connect.kafka().produce("test-topic", "key-" + i, "value-" + i); + } + + // consume all records from test topic + log.info("Consuming records from test topic"); + int i = 0; + for (ConsumerRecord<byte[], byte[]> rec : connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, "test-topic")) { + String k = new String(rec.key()); + String v = new String(rec.value()); + log.debug("Consumed record (key='{}', value='{}') from topic {}", k, v, rec.topic()); + assertEquals("Unexpected key", k, "key-" + i); + assertEquals("Unexpected value", v, "value-" + i); + i++; + } + + // wait for records to reach the task + connectorHandle.taskHandle(TASK_ID).awaitRecords(CONSUME_MAX_DURATION_MS); + + // consume failed records from dead letter queue topic + log.info("Consuming records from test topic"); + ConsumerRecords<byte[], byte[]> messages = connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, DLQ_TOPIC); + + connect.deleteConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME, + "Connector tasks did not stop in time."); + } + /** * Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from a * {@link org.apache.kafka.test.TestUtils#waitForCondition} that will throw an error if this method continued diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java index 3e35098..6f8d8a1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java @@ -38,7 +38,6 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; -import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG; import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; @@ -68,9 +67,6 @@ public class ExampleConnectIntegrationTest { private static final String CONNECTOR_NAME = "simple-conn"; private static final String SINK_CONNECTOR_CLASS_NAME = MonitorableSinkConnector.class.getSimpleName(); private static final String SOURCE_CONNECTOR_CLASS_NAME = MonitorableSourceConnector.class.getSimpleName(); - private static final String DLQ_TOPIC = "dlq-topic"; - private static final String ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME = - ErrantRecordSinkConnector.class.getSimpleName(); private EmbeddedConnectCluster connect; private ConnectorHandle connectorHandle; @@ -228,71 +224,6 @@ public class ExampleConnectIntegrationTest { connect.deleteConnector(CONNECTOR_NAME); } - @Test - public void testErrantRecordReporter() throws Exception { - connect.kafka().createTopic(DLQ_TOPIC, 1); - // create test topic - connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS); - - // setup up props for the sink connector - Map<String, String> props = new HashMap<>(); - props.put(CONNECTOR_CLASS_CONFIG, ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME); - props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS)); - props.put(TOPICS_CONFIG, "test-topic"); - props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); - props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); - props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC); - - // expect all records to be consumed by the connector - connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED); - - // expect all records to be consumed by the connector - connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED); - - // validate the intended connector configuration, a config that errors - connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 1, - "Validating connector configuration produced an unexpected number or errors."); - - // add missing configuration to make the config valid - props.put("name", CONNECTOR_NAME); - - // validate the intended connector configuration, a valid config - connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME, props, 0, - "Validating connector configuration produced an unexpected number or errors."); - - // start a sink connector - connect.configureConnector(CONNECTOR_NAME, props); - - waitForCondition(this::checkForPartitionAssignment, - CONNECTOR_SETUP_DURATION_MS, - "Connector tasks were not assigned a partition each."); - - // produce some messages into source topic partitions - for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { - connect.kafka().produce("test-topic", i % NUM_TOPIC_PARTITIONS, "key", "simple-message-value-" + i); - } - - // consume all records from the source topic or fail, to ensure that they were correctly produced. - assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED, - connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count()); - - // wait for the connector tasks to consume all records. - connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS); - - // wait for the connector tasks to commit all records. - connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS); - - // consume all records from the dlq topic or fail, to ensure that they were correctly produced - int recordNum = connect.kafka().consume( - NUM_RECORDS_PRODUCED, - RECORD_TRANSFER_DURATION_MS, - DLQ_TOPIC - ).count(); - - // delete connector - connect.deleteConnector(CONNECTOR_NAME); - } - /** * Check if a partition was assigned to each task. This method swallows exceptions since it is invoked from a * {@link org.apache.kafka.test.TestUtils#waitForCondition} that will throw an error if this method continued diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java index 3aed19d..f4c4299 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java @@ -100,6 +100,16 @@ public class RetryWithToleranceOperatorTest { @Test public void testExecuteFailed() { RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, + ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + + retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, + SinkTask.class, consumerRecord, new Throwable()); + } + + @Test(expected = ConnectException.class) + public void testExecuteFailedNoTolerance() { + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM); retryWithToleranceOperator.metrics(errorHandlingMetrics);