[ https://issues.apache.org/jira/browse/KAFKA-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508849#comment-16508849 ]
ASF GitHub Bot commented on KAFKA-7003: --------------------------------------- ewencp closed pull request #5159: KAFKA-7003: Set error context in message headers (KIP-298) URL: https://github.com/apache/kafka/pull/5159 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java index 6e9bd6b9e71..d9d140b9cdc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java @@ -57,11 +57,19 @@ public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3; private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor"; + public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable"; + public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false; + public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add headers containing error context to the messages " + + "written to the dead letter queue. To avoid clashing with headers from the original record, all error context header " + + "keys, all error context header keys will start with <code>__connect.errors.</code>"; + private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers"; + static ConfigDef config = ConnectorConfig.configDef() .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY) .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY) .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY) - .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY); + .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY) + .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY); public static ConfigDef configDef() { return config; @@ -107,4 +115,8 @@ public String dlqTopicName() { public short dlqTopicReplicationFactor() { return getShort(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG); } + + public boolean isDlqContextHeadersEnabled() { + return getBoolean(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 97e68faa4ca..c794eb8c807 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -530,7 +530,7 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) { // check if topic for dead letter queue exists String topic = connConfig.dlqTopicName(); if (topic != null && !topic.isEmpty()) { - DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, connConfig, producerProps); + DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps); reporters.add(reporter); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java index 459eeae1ff4..d36ec22ec88 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java @@ -22,13 +22,19 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -46,12 +52,26 @@ private static final int DLQ_NUM_DESIRED_PARTITIONS = 1; + public static final String HEADER_PREFIX = "__connect.errors."; + public static final String ERROR_HEADER_ORIG_TOPIC = HEADER_PREFIX + "topic"; + public static final String ERROR_HEADER_ORIG_PARTITION = HEADER_PREFIX + "partition"; + public static final String ERROR_HEADER_ORIG_OFFSET = HEADER_PREFIX + "offset"; + public static final String ERROR_HEADER_CONNECTOR_NAME = HEADER_PREFIX + "connector.name"; + public static final String ERROR_HEADER_TASK_ID = HEADER_PREFIX + "task.id"; + public static final String ERROR_HEADER_STAGE = HEADER_PREFIX + "stage"; + public static final String ERROR_HEADER_EXECUTING_CLASS = HEADER_PREFIX + "class.name"; + public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + "exception.class.name"; + public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX + "exception.message"; + public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = HEADER_PREFIX + "exception.stacktrace"; + private final SinkConnectorConfig connConfig; + private final ConnectorTaskId connectorTaskId; private KafkaProducer<byte[], byte[]> kafkaProducer; private ErrorHandlingMetrics errorHandlingMetrics; public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, + ConnectorTaskId id, SinkConnectorConfig sinkConfig, Map<String, Object> producerProps) { String topic = sinkConfig.dlqTopicName(); @@ -70,7 +90,7 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, } KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps); - return new DeadLetterQueueReporter(dlqProducer, sinkConfig); + return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id); } /** @@ -79,9 +99,10 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, * @param kafkaProducer a Kafka Producer to produce the original consumed records. */ // Visible for testing - DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig) { + DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig, ConnectorTaskId id) { this.kafkaProducer = kafkaProducer; this.connConfig = connConfig; + this.connectorTaskId = id; } @Override @@ -117,6 +138,10 @@ public void report(ProcessingContext context) { originalMessage.key(), originalMessage.value(), originalMessage.headers()); } + if (connConfig.isDlqContextHeadersEnabled()) { + populateContextHeaders(producerRecord, context); + } + this.kafkaProducer.send(producerRecord, (metadata, exception) -> { if (exception != null) { log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception); @@ -124,4 +149,52 @@ public void report(ProcessingContext context) { } }); } + + // Visible for testing + void populateContextHeaders(ProducerRecord<byte[], byte[]> producerRecord, ProcessingContext context) { + Headers headers = producerRecord.headers(); + if (context.consumerRecord() != null) { + headers.add(ERROR_HEADER_ORIG_TOPIC, toBytes(context.consumerRecord().topic())); + headers.add(ERROR_HEADER_ORIG_PARTITION, toBytes(context.consumerRecord().partition())); + headers.add(ERROR_HEADER_ORIG_OFFSET, toBytes(context.consumerRecord().offset())); + } + + headers.add(ERROR_HEADER_CONNECTOR_NAME, toBytes(connectorTaskId.connector())); + headers.add(ERROR_HEADER_TASK_ID, toBytes(String.valueOf(connectorTaskId.task()))); + headers.add(ERROR_HEADER_STAGE, toBytes(context.stage().name())); + headers.add(ERROR_HEADER_EXECUTING_CLASS, toBytes(context.executingClass().getName())); + if (context.error() != null) { + headers.add(ERROR_HEADER_EXCEPTION, toBytes(context.error().getClass().getName())); + headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, toBytes(context.error().getMessage())); + byte[] trace; + if ((trace = stacktrace(context.error())) != null) { + headers.add(ERROR_HEADER_EXCEPTION_STACK_TRACE, trace); + } + } + } + + private byte[] stacktrace(Throwable error) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + PrintStream stream = new PrintStream(bos, true, "UTF-8"); + error.printStackTrace(stream); + bos.close(); + return bos.toByteArray(); + } catch (IOException e) { + log.error("Could not serialize stacktrace.", e); + } + return null; + } + + private byte[] toBytes(int value) { + return toBytes(String.valueOf(value)); + } + + private byte[] toBytes(long value) { + return toBytes(String.valueOf(value)); + } + + private byte[] toBytes(String value) { + return value.getBytes(StandardCharsets.UTF_8); + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java index f35c514816f..f199982231f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java @@ -18,7 +18,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.ConnectMetrics; import org.apache.kafka.connect.runtime.ConnectorConfig; @@ -26,6 +29,7 @@ import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.util.ConnectorTaskId; import org.easymock.EasyMock; import org.easymock.Mock; @@ -43,8 +47,19 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_CONNECTOR_NAME; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_STACK_TRACE; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXECUTING_CLASS; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_OFFSET; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_PARTITION; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_STAGE; +import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID; import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) @PowerMockIgnore("javax.management.*") @@ -81,7 +96,7 @@ public void tearDown() { @Test public void testDLQConfigWithEmptyTopicName() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(emptyMap())); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID); deadLetterQueueReporter.metrics(errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -96,7 +111,7 @@ public void testDLQConfigWithEmptyTopicName() { @Test public void testDLQConfigWithValidTopicName() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC))); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID); deadLetterQueueReporter.metrics(errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -111,7 +126,7 @@ public void testDLQConfigWithValidTopicName() { @Test public void testReportDLQTwice() { - DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC))); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID); deadLetterQueueReporter.metrics(errorHandlingMetrics); ProcessingContext context = processingContext(); @@ -189,6 +204,64 @@ public void testSetDLQConfigs() { assertEquals(configuration.dlqTopicReplicationFactor(), 7); } + public void testDlqHeaderConsumerRecord() { + Map<String, String> props = new HashMap<>(); + props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC); + props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID); + + ProcessingContext context = new ProcessingContext(); + context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes())); + context.currentContext(Stage.TRANSFORMATION, Transformation.class); + context.error(new ConnectException("Test Exception")); + + ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes()); + + deadLetterQueueReporter.populateContextHeaders(producerRecord, context); + assertEquals("source-topic", headerValue(producerRecord, ERROR_HEADER_ORIG_TOPIC)); + assertEquals("7", headerValue(producerRecord, ERROR_HEADER_ORIG_PARTITION)); + assertEquals("10", headerValue(producerRecord, ERROR_HEADER_ORIG_OFFSET)); + assertEquals(TASK_ID.connector(), headerValue(producerRecord, ERROR_HEADER_CONNECTOR_NAME)); + assertEquals(String.valueOf(TASK_ID.task()), headerValue(producerRecord, ERROR_HEADER_TASK_ID)); + assertEquals(Stage.TRANSFORMATION.name(), headerValue(producerRecord, ERROR_HEADER_STAGE)); + assertEquals(Transformation.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXECUTING_CLASS)); + assertEquals(ConnectException.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXCEPTION)); + assertEquals("Test Exception", headerValue(producerRecord, ERROR_HEADER_EXCEPTION_MESSAGE)); + assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).length() > 0); + assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).startsWith("org.apache.kafka.connect.errors.ConnectException: Test Exception")); + } + + @Test + public void testDlqHeaderIsAppended() { + Map<String, String> props = new HashMap<>(); + props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC); + props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); + DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID); + + ProcessingContext context = new ProcessingContext(); + context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes())); + context.currentContext(Stage.TRANSFORMATION, Transformation.class); + context.error(new ConnectException("Test Exception")); + + ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes()); + producerRecord.headers().add(ERROR_HEADER_ORIG_TOPIC, "dummy".getBytes()); + + deadLetterQueueReporter.populateContextHeaders(producerRecord, context); + int appearances = 0; + for (Header header: producerRecord.headers()) { + if (ERROR_HEADER_ORIG_TOPIC.equalsIgnoreCase(header.key())) { + appearances++; + } + } + + assertEquals("source-topic", headerValue(producerRecord, ERROR_HEADER_ORIG_TOPIC)); + assertEquals(2, appearances); + } + + private String headerValue(ProducerRecord<byte[], byte[]> producerRecord, String headerSuffix) { + return new String(producerRecord.headers().lastHeader(headerSuffix).value()); + } + private ProcessingContext processingContext() { ProcessingContext context = new ProcessingContext(); context.consumerRecord(new ConsumerRecord<>(TOPIC, 5, 100, new byte[]{'a', 'b'}, new byte[]{'x'})); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add headers with error context in messages written to the Connect > DeadLetterQueue topic > --------------------------------------------------------------------------------------- > > Key: KAFKA-7003 > URL: https://issues.apache.org/jira/browse/KAFKA-7003 > Project: Kafka > Issue Type: Task > Reporter: Arjun Satish > Priority: Major > Fix For: 2.0.0, 2.1.0 > > > This was added to the KIP after the feature freeze. > If the property {{errors.deadletterqueue.}}{{context.headers.enable}} is set > to {{*true*}}, the following headers will be added to the produced raw > message (only if they don't already exist in the message). All values will be > serialized as UTF-8 strings. > ||Header Name||Description|| > |__connect.errors.topic|Name of the topic that contained the message.| > |__connect.errors.task.id|The numeric ID of the task that encountered the > error (encoded as a UTF-8 string).| > |__connect.errors.stage|The name of the stage where the error occurred.| > |__connect.errors.partition|The numeric ID of the partition in the original > topic that contained the message (encoded as a UTF-8 string).| > |__connect.errors.offset|The numeric value of the message offset in the > original topic (encoded as a UTF-8 string).| > |__connect.errors.exception.stacktrace|The stacktrace of the exception.| > |__connect.errors.exception.message|The message in the exception.| > |__connect.errors.exception.class.name|The fully qualified classname of the > exception that was thrown during the execution.| > |__connect.errors.connector.name|The name of the connector which encountered > the error.| > |__connect.errors.class.name|The fully qualified name of the class that > caused the error.| -- This message was sent by Atlassian JIRA (v7.6.3#76005)