This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new c49954c  KAFKA-10115: Incorporate errors.tolerance with the Errant 
Record Reporter (#8829)
c49954c is described below

commit c49954c5cf0e5699a6885dd385e40c5b34af7e9c
Author: Aakash Shah <as...@confluent.io>
AuthorDate: Wed Jun 10 20:43:47 2020 -0700

    KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter 
(#8829)
    
    Make sure that the Errant Record Reporter recently added in KIP-610 adheres 
to the  `errors.tolerance` policy.
    
    Author: Aakash Shah <as...@confluent.io>
    Reviewers: Arjun Satish <arjunconfluent.io>, Randall Hauch 
<rha...@gmail.com>
---
 .../kafka/connect/runtime/WorkerSinkTask.java      |  6 ++
 .../runtime/errors/RetryWithToleranceOperator.java | 23 ++++++--
 .../runtime/errors/WorkerErrantRecordReporter.java |  3 +-
 .../integration/ErrorHandlingIntegrationTest.java  | 68 +++++++++++++++++++++
 .../integration/ExampleConnectIntegrationTest.java | 69 ----------------------
 .../errors/RetryWithToleranceOperatorTest.java     | 10 ++++
 6 files changed, 104 insertions(+), 75 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 50efffc..11318fd 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -558,6 +558,12 @@ class WorkerSinkTask extends WorkerTask {
             log.trace("{} Delivering batch of {} messages to task", this, 
messageBatch.size());
             long start = time.milliseconds();
             task.put(new ArrayList<>(messageBatch));
+            // if errors raised from the operator were swallowed by the task 
implementation, an
+            // exception needs to be thrown to kill the task indicating the 
tolerance was exceeded
+            if (retryWithToleranceOperator.failed() && 
!retryWithToleranceOperator.withinToleranceLimits()) {
+                throw new ConnectException("Tolerance exceeded in error 
handler",
+                    retryWithToleranceOperator.error());
+            }
             recordBatch(messageBatch.size());
             sinkTaskMetricsGroup.recordPut(time.milliseconds() - start);
             currentOffsets.putAll(origOffsets);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index ff1702d..4fb633d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -87,11 +87,18 @@ public class RetryWithToleranceOperator implements 
AutoCloseable {
     public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> 
consumerRecord,
                                       Throwable error) {
+
+        markAsFailed();
         context.consumerRecord(consumerRecord);
         context.currentContext(stage, executingClass);
         context.error(error);
-        errorHandlingMetrics.recordError();
-        return context.report();
+        errorHandlingMetrics.recordFailure();
+        Future<Void> errantRecordFuture = context.report();
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordError();
+            throw new ConnectException("Tolerance exceeded in error handler", 
error);
+        }
+        return errantRecordFuture;
     }
 
     /**
@@ -200,9 +207,8 @@ public class RetryWithToleranceOperator implements 
AutoCloseable {
         totalFailures++;
     }
 
-    // Visible for testing
     @SuppressWarnings("fallthrough")
-    boolean withinToleranceLimits() {
+    public boolean withinToleranceLimits() {
         switch (errorToleranceType) {
             case NONE:
                 if (totalFailures > 0) return false;
@@ -282,6 +288,15 @@ public class RetryWithToleranceOperator implements 
AutoCloseable {
         return this.context.failed();
     }
 
+    /**
+     * Returns the error encountered when processing the current stage.
+     *
+     * @return the error encountered when processing the current stage
+     */
+    public Throwable error() {
+        return this.context.error();
+    }
+
     @Override
     public void close() {
         this.context.close();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
index 6e1fa53..b709e28 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
@@ -99,8 +99,7 @@ public class WorkerErrantRecordReporter implements 
ErrantRecordReporter {
                 valLength, key, value, headers);
         }
 
-        Future<Void> future = 
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-            SinkTask.class, consumerRecord, error);
+        Future<Void> future = 
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, 
consumerRecord, error);
 
         if (!future.isDone()) {
             futures.add(future);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 8963b8c..b6211ed 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -181,6 +181,74 @@ public class ErrorHandlingIntegrationTest {
 
     }
 
+    @Test
+    public void testErrantRecordReporter() throws Exception {
+        // create test topic
+        connect.kafka().createTopic("test-topic");
+
+        // setup connector config
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, 
ErrantRecordSinkConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
+
+        // log all errors, along with message metadata
+        props.put(ERRORS_LOG_ENABLE_CONFIG, "true");
+        props.put(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
+
+        // produce bad messages into dead letter queue
+        props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+        props.put(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
+        props.put(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
+
+        // tolerate all erros
+        props.put(ERRORS_TOLERANCE_CONFIG, "all");
+
+        // retry for up to one second
+        props.put(ERRORS_RETRY_TIMEOUT_CONFIG, "1000");
+
+        // set expected records to successfully reach the task
+        
connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS);
+
+        connect.configureConnector(CONNECTOR_NAME, props);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+            "Connector tasks did not start in time.");
+
+        waitForCondition(this::checkForPartitionAssignment,
+            CONNECTOR_SETUP_DURATION_MS,
+            "Connector task was not assigned a partition.");
+
+        // produce some strings into test topic
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            connect.kafka().produce("test-topic", "key-" + i, "value-" + i);
+        }
+
+        // consume all records from test topic
+        log.info("Consuming records from test topic");
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> rec : 
connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, 
"test-topic")) {
+            String k = new String(rec.key());
+            String v = new String(rec.value());
+            log.debug("Consumed record (key='{}', value='{}') from topic {}", 
k, v, rec.topic());
+            assertEquals("Unexpected key", k, "key-" + i);
+            assertEquals("Unexpected value", v, "value-" + i);
+            i++;
+        }
+
+        // wait for records to reach the task
+        
connectorHandle.taskHandle(TASK_ID).awaitRecords(CONSUME_MAX_DURATION_MS);
+
+        // consume failed records from dead letter queue topic
+        log.info("Consuming records from test topic");
+        ConsumerRecords<byte[], byte[]> messages = 
connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, 
DLQ_TOPIC);
+
+        connect.deleteConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
+            "Connector tasks did not stop in time.");
+    }
+
     /**
      * Check if a partition was assigned to each task. This method swallows 
exceptions since it is invoked from a
      * {@link org.apache.kafka.test.TestUtils#waitForCondition} that will 
throw an error if this method continued
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index 3e35098..6f8d8a1 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -38,7 +38,6 @@ import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
-import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG;
 import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
@@ -68,9 +67,6 @@ public class ExampleConnectIntegrationTest {
     private static final String CONNECTOR_NAME = "simple-conn";
     private static final String SINK_CONNECTOR_CLASS_NAME = 
MonitorableSinkConnector.class.getSimpleName();
     private static final String SOURCE_CONNECTOR_CLASS_NAME = 
MonitorableSourceConnector.class.getSimpleName();
-    private static final String DLQ_TOPIC = "dlq-topic";
-    private static final String ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME =
-        ErrantRecordSinkConnector.class.getSimpleName();
 
     private EmbeddedConnectCluster connect;
     private ConnectorHandle connectorHandle;
@@ -228,71 +224,6 @@ public class ExampleConnectIntegrationTest {
         connect.deleteConnector(CONNECTOR_NAME);
     }
 
-    @Test
-    public void testErrantRecordReporter() throws Exception {
-        connect.kafka().createTopic(DLQ_TOPIC, 1);
-        // create test topic
-        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
-
-        // setup up props for the sink connector
-        Map<String, String> props = new HashMap<>();
-        props.put(CONNECTOR_CLASS_CONFIG, 
ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME);
-        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
-        props.put(TOPICS_CONFIG, "test-topic");
-        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
-        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
-        props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
-
-        // expect all records to be consumed by the connector
-        connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
-
-        // expect all records to be consumed by the connector
-        connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
-
-        // validate the intended connector configuration, a config that errors
-        
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME,
 props, 1,
-            "Validating connector configuration produced an unexpected number 
or errors.");
-
-        // add missing configuration to make the config valid
-        props.put("name", CONNECTOR_NAME);
-
-        // validate the intended connector configuration, a valid config
-        
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(ERRANT_RECORD_SINK_CONNECTOR_CLASS_NAME,
 props, 0,
-            "Validating connector configuration produced an unexpected number 
or errors.");
-
-        // start a sink connector
-        connect.configureConnector(CONNECTOR_NAME, props);
-
-        waitForCondition(this::checkForPartitionAssignment,
-            CONNECTOR_SETUP_DURATION_MS,
-            "Connector tasks were not assigned a partition each.");
-
-        // produce some messages into source topic partitions
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            connect.kafka().produce("test-topic", i % NUM_TOPIC_PARTITIONS, 
"key", "simple-message-value-" + i);
-        }
-
-        // consume all records from the source topic or fail, to ensure that 
they were correctly produced.
-        assertEquals("Unexpected number of records consumed", 
NUM_RECORDS_PRODUCED,
-            connect.kafka().consume(NUM_RECORDS_PRODUCED, 
RECORD_TRANSFER_DURATION_MS, "test-topic").count());
-
-        // wait for the connector tasks to consume all records.
-        connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
-
-        // wait for the connector tasks to commit all records.
-        connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
-
-        // consume all records from the dlq topic or fail, to ensure that they 
were correctly produced
-        int recordNum = connect.kafka().consume(
-            NUM_RECORDS_PRODUCED,
-            RECORD_TRANSFER_DURATION_MS,
-            DLQ_TOPIC
-        ).count();
-
-        // delete connector
-        connect.deleteConnector(CONNECTOR_NAME);
-    }
-
     /**
      * Check if a partition was assigned to each task. This method swallows 
exceptions since it is invoked from a
      * {@link org.apache.kafka.test.TestUtils#waitForCondition} that will 
throw an error if this method continued
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index 3aed19d..f4c4299 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -100,6 +100,16 @@ public class RetryWithToleranceOperatorTest {
     @Test
     public void testExecuteFailed() {
         RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
+            ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+        retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+            SinkTask.class, consumerRecord, new Throwable());
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testExecuteFailedNoTolerance() {
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
             ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
 

Reply via email to