This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 28a1ae4  MINOR: System test for error handling and writes to 
DeadLetterQueue
28a1ae4 is described below

commit 28a1ae4183c707af363b69e2ec2b743bdf4f236c
Author: Arjun Satish <[email protected]>
AuthorDate: Tue Aug 7 14:44:01 2018 -0700

    MINOR: System test for error handling and writes to DeadLetterQueue
    
    Added a system test which creates a file sink with json converter and 
attempts to feed it bad records. The bad records should land in the DLQ if it 
is enabled, and the task should be killed or bad records skipped based on test 
parameters.
    
    Signed-off-by: Arjun Satish <arjunconfluent.io>
    
    *More detailed description of your change,
    if necessary. The PR title and PR message become
    the squashed commit message, so use a separate
    comment to ping reviewers.*
    
    *Summary of testing strategy (including rationale)
    for the feature or bug fix. Unit and/or integration
    tests are expected for any behaviour change and
    system tests should be considered for larger changes.*
    
    Author: Arjun Satish <[email protected]>
    
    Reviewers: Konstantine Karantasis <[email protected]>, Ewen 
Cheslack-Postava <[email protected]>
    
    Closes #5456 from wicknicks/error-handling-sys-test
---
 tests/kafkatest/services/connect.py                |  5 ++
 tests/kafkatest/tests/connect/connect_test.py      | 72 ++++++++++++++++++++++
 .../connect/templates/connect-file-sink.properties | 18 +++++-
 .../templates/connect-file-source.properties       |  7 +++
 4 files changed, 101 insertions(+), 1 deletion(-)

diff --git a/tests/kafkatest/services/connect.py 
b/tests/kafkatest/services/connect.py
index d7ef204..19beddd 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -326,6 +326,11 @@ class ConnectDistributedService(ConnectServiceBase):
             raise RuntimeError("No process ids recorded")
 
 
+class ErrorTolerance(object):
+    ALL = "all"
+    NONE = "none"
+
+
 class ConnectRestError(RuntimeError):
     def __init__(self, status, msg, url):
         self.status = status
diff --git a/tests/kafkatest/tests/connect/connect_test.py 
b/tests/kafkatest/tests/connect/connect_test.py
index 9d34c48..c961681 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -18,10 +18,12 @@ from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 from ducktape.mark import parametrize, matrix
 from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.errors import TimeoutError
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.connect import ConnectStandaloneService
+from kafkatest.services.connect import ErrorTolerance
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
 
@@ -134,3 +136,73 @@ class ConnectStandaloneFileTest(Test):
             return output_hash == hashlib.md5(value).hexdigest()
         except RemoteCommandError:
             return False
+
+    @cluster(num_nodes=5)
+    @parametrize(error_tolerance=ErrorTolerance.ALL)
+    @parametrize(error_tolerance=ErrorTolerance.NONE)
+    def test_skip_and_log_to_dlq(self, error_tolerance):
+        self.kafka = KafkaService(self.test_context, self.num_brokers, 
self.zk, topics=self.topics)
+
+        # set config props
+        self.override_error_tolerance_props = error_tolerance
+        self.enable_deadletterqueue = True
+
+        successful_records = []
+        faulty_records = []
+        records = []
+        for i in range(0, 1000):
+            if i % 2 == 0:
+                records.append('{"some_key":' + str(i) + '}')
+                successful_records.append('{some_key=' + str(i) + '}')
+            else:
+                # badly formatted json records (missing a quote after the key)
+                records.append('{"some_key:' + str(i) + '}')
+                faulty_records.append('{"some_key:' + str(i) + '}')
+
+        records = "\n".join(records) + "\n"
+        successful_records = "\n".join(successful_records) + "\n"
+        if error_tolerance == ErrorTolerance.ALL:
+            faulty_records = ",".join(faulty_records)
+        else:
+            faulty_records = faulty_records[0]
+
+        self.source = ConnectStandaloneService(self.test_context, self.kafka, 
[self.INPUT_FILE, self.OFFSETS_FILE])
+        self.sink = ConnectStandaloneService(self.test_context, self.kafka, 
[self.OUTPUT_FILE, self.OFFSETS_FILE])
+
+        self.zk.start()
+        self.kafka.start()
+
+        self.override_key_converter = 
"org.apache.kafka.connect.storage.StringConverter"
+        self.override_value_converter = 
"org.apache.kafka.connect.storage.StringConverter"
+        self.source.set_configs(lambda node: 
self.render("connect-standalone.properties", node=node), 
[self.render("connect-file-source.properties")])
+
+        self.override_key_converter = 
"org.apache.kafka.connect.json.JsonConverter"
+        self.override_value_converter = 
"org.apache.kafka.connect.json.JsonConverter"
+        self.override_key_converter_schemas_enable = False
+        self.override_value_converter_schemas_enable = False
+        self.sink.set_configs(lambda node: 
self.render("connect-standalone.properties", node=node), 
[self.render("connect-file-sink.properties")])
+
+        self.source.start()
+        self.sink.start()
+
+        # Generating data on the source node should generate new records and 
create new output on the sink node
+        self.source.node.account.ssh("echo -e -n " + repr(records) + " >> " + 
self.INPUT_FILE)
+
+        if error_tolerance == ErrorTolerance.NONE:
+            try:
+                wait_until(lambda: self.validate_output(successful_records), 
timeout_sec=15,
+                           err_msg="Clean records added to input file were not 
seen in the output file in a reasonable amount of time.")
+                raise Exception("Expected to not find any results in this 
file.")
+            except TimeoutError:
+                self.logger.info("Caught expected exception")
+        else:
+            wait_until(lambda: self.validate_output(successful_records), 
timeout_sec=15,
+                       err_msg="Clean records added to input file were not 
seen in the output file in a reasonable amount of time.")
+
+        if self.enable_deadletterqueue:
+            self.logger.info("Reading records from deadletterqueue")
+            consumer_validator = ConsoleConsumer(self.test_context, 1, 
self.kafka, "my-connector-errors",
+                                                 consumer_timeout_ms=10000)
+            consumer_validator.run()
+            actual = ",".join(consumer_validator.messages_consumed[1])
+            assert faulty_records == actual, "Expected %s but saw %s in dead 
letter queue" % (faulty_records, actual)
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties 
b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
index bff002b..a58cc6b 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
@@ -25,4 +25,20 @@ key.converter={{ override_key_converter }}
 {% endif %}
 {% if override_key_converter is defined %}
 value.converter={{ override_value_converter }}
-{% endif %}
\ No newline at end of file
+{% endif %}
+
+key.converter.schemas.enable={{ 
override_key_converter_schemas_enable|default(True) }}
+value.converter.schemas.enable={{ 
override_value_converter_schemas_enable|default(True) }}
+
+# log error context along with application logs
+errors.log.enable=true
+errors.log.include.messages=true
+
+{% if enable_deadletterqueue is defined %}
+# produce error context into the Kafka topic
+errors.deadletterqueue.topic.name={{ 
override_deadletterqueue_topic_name|default("my-connector-errors") }}
+errors.deadletterqueue.topic.replication.factor={{ 
override_deadletterqueue_replication_factor|default(1) }}
+{% endif %}
+
+# Tolerate all errors.
+errors.tolerance={{ override_error_tolerance_props|default("none") }}
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-file-source.properties 
b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
index 800d6a0..147e85a 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-source.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
@@ -26,3 +26,10 @@ key.converter={{ override_key_converter }}
 {% if override_key_converter is defined %}
 value.converter={{ override_value_converter }}
 {% endif %}
+
+# log error context along with application logs
+errors.log.enable=true
+errors.log.include.messages=true
+
+# Tolerate all errors.
+errors.tolerance={{ override_error_tolerance_props|default("none") }}

Reply via email to