This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new ca2589c MINOR: System test for error handling and writes to
DeadLetterQueue
ca2589c is described below
commit ca2589cc7ff60c48ab7492e4e8cd22e78bda9acb
Author: Arjun Satish <[email protected]>
AuthorDate: Tue Aug 7 14:44:01 2018 -0700
MINOR: System test for error handling and writes to DeadLetterQueue
Added a system test which creates a file sink with json converter and
attempts to feed it bad records. The bad records should land in the DLQ if it
is enabled, and the task should be killed or bad records skipped based on test
parameters.
Signed-off-by: Arjun Satish <arjunconfluent.io>
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
Author: Arjun Satish <[email protected]>
Reviewers: Konstantine Karantasis <[email protected]>, Ewen
Cheslack-Postava <[email protected]>
Closes #5456 from wicknicks/error-handling-sys-test
(cherry picked from commit 28a1ae4183c707af363b69e2ec2b743bdf4f236c)
Signed-off-by: Ewen Cheslack-Postava <[email protected]>
---
tests/kafkatest/services/connect.py | 5 ++
tests/kafkatest/tests/connect/connect_test.py | 72 ++++++++++++++++++++++
.../connect/templates/connect-file-sink.properties | 18 +++++-
.../templates/connect-file-source.properties | 7 +++
4 files changed, 101 insertions(+), 1 deletion(-)
diff --git a/tests/kafkatest/services/connect.py
b/tests/kafkatest/services/connect.py
index d7ef204..19beddd 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -326,6 +326,11 @@ class ConnectDistributedService(ConnectServiceBase):
raise RuntimeError("No process ids recorded")
+class ErrorTolerance(object):
+ ALL = "all"
+ NONE = "none"
+
+
class ConnectRestError(RuntimeError):
def __init__(self, status, msg, url):
self.status = status
diff --git a/tests/kafkatest/tests/connect/connect_test.py
b/tests/kafkatest/tests/connect/connect_test.py
index 9d34c48..c961681 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -18,10 +18,12 @@ from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize, matrix
from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.errors import TimeoutError
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.connect import ConnectStandaloneService
+from kafkatest.services.connect import ErrorTolerance
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
@@ -134,3 +136,73 @@ class ConnectStandaloneFileTest(Test):
return output_hash == hashlib.md5(value).hexdigest()
except RemoteCommandError:
return False
+
+ @cluster(num_nodes=5)
+ @parametrize(error_tolerance=ErrorTolerance.ALL)
+ @parametrize(error_tolerance=ErrorTolerance.NONE)
+ def test_skip_and_log_to_dlq(self, error_tolerance):
+ self.kafka = KafkaService(self.test_context, self.num_brokers,
self.zk, topics=self.topics)
+
+ # set config props
+ self.override_error_tolerance_props = error_tolerance
+ self.enable_deadletterqueue = True
+
+ successful_records = []
+ faulty_records = []
+ records = []
+ for i in range(0, 1000):
+ if i % 2 == 0:
+ records.append('{"some_key":' + str(i) + '}')
+ successful_records.append('{some_key=' + str(i) + '}')
+ else:
+ # badly formatted json records (missing a quote after the key)
+ records.append('{"some_key:' + str(i) + '}')
+ faulty_records.append('{"some_key:' + str(i) + '}')
+
+ records = "\n".join(records) + "\n"
+ successful_records = "\n".join(successful_records) + "\n"
+ if error_tolerance == ErrorTolerance.ALL:
+ faulty_records = ",".join(faulty_records)
+ else:
+ faulty_records = faulty_records[0]
+
+ self.source = ConnectStandaloneService(self.test_context, self.kafka,
[self.INPUT_FILE, self.OFFSETS_FILE])
+ self.sink = ConnectStandaloneService(self.test_context, self.kafka,
[self.OUTPUT_FILE, self.OFFSETS_FILE])
+
+ self.zk.start()
+ self.kafka.start()
+
+ self.override_key_converter =
"org.apache.kafka.connect.storage.StringConverter"
+ self.override_value_converter =
"org.apache.kafka.connect.storage.StringConverter"
+ self.source.set_configs(lambda node:
self.render("connect-standalone.properties", node=node),
[self.render("connect-file-source.properties")])
+
+ self.override_key_converter =
"org.apache.kafka.connect.json.JsonConverter"
+ self.override_value_converter =
"org.apache.kafka.connect.json.JsonConverter"
+ self.override_key_converter_schemas_enable = False
+ self.override_value_converter_schemas_enable = False
+ self.sink.set_configs(lambda node:
self.render("connect-standalone.properties", node=node),
[self.render("connect-file-sink.properties")])
+
+ self.source.start()
+ self.sink.start()
+
+ # Generating data on the source node should generate new records and
create new output on the sink node
+ self.source.node.account.ssh("echo -e -n " + repr(records) + " >> " +
self.INPUT_FILE)
+
+ if error_tolerance == ErrorTolerance.NONE:
+ try:
+ wait_until(lambda: self.validate_output(successful_records),
timeout_sec=15,
+ err_msg="Clean records added to input file were not
seen in the output file in a reasonable amount of time.")
+ raise Exception("Expected to not find any results in this
file.")
+ except TimeoutError:
+ self.logger.info("Caught expected exception")
+ else:
+ wait_until(lambda: self.validate_output(successful_records),
timeout_sec=15,
+ err_msg="Clean records added to input file were not
seen in the output file in a reasonable amount of time.")
+
+ if self.enable_deadletterqueue:
+ self.logger.info("Reading records from deadletterqueue")
+ consumer_validator = ConsoleConsumer(self.test_context, 1,
self.kafka, "my-connector-errors",
+ consumer_timeout_ms=10000)
+ consumer_validator.run()
+ actual = ",".join(consumer_validator.messages_consumed[1])
+ assert faulty_records == actual, "Expected %s but saw %s in dead
letter queue" % (faulty_records, actual)
diff --git
a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
index bff002b..a58cc6b 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
@@ -25,4 +25,20 @@ key.converter={{ override_key_converter }}
{% endif %}
{% if override_key_converter is defined %}
value.converter={{ override_value_converter }}
-{% endif %}
\ No newline at end of file
+{% endif %}
+
+key.converter.schemas.enable={{
override_key_converter_schemas_enable|default(True) }}
+value.converter.schemas.enable={{
override_value_converter_schemas_enable|default(True) }}
+
+# log error context along with application logs
+errors.log.enable=true
+errors.log.include.messages=true
+
+{% if enable_deadletterqueue is defined %}
+# produce error context into the Kafka topic
+errors.deadletterqueue.topic.name={{
override_deadletterqueue_topic_name|default("my-connector-errors") }}
+errors.deadletterqueue.topic.replication.factor={{
override_deadletterqueue_replication_factor|default(1) }}
+{% endif %}
+
+# Tolerate all errors.
+errors.tolerance={{ override_error_tolerance_props|default("none") }}
diff --git
a/tests/kafkatest/tests/connect/templates/connect-file-source.properties
b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
index 800d6a0..147e85a 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-source.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
@@ -26,3 +26,10 @@ key.converter={{ override_key_converter }}
{% if override_key_converter is defined %}
value.converter={{ override_value_converter }}
{% endif %}
+
+# log error context along with application logs
+errors.log.enable=true
+errors.log.include.messages=true
+
+# Tolerate all errors.
+errors.tolerance={{ override_error_tolerance_props|default("none") }}