[ 
https://issues.apache.org/jira/browse/KAFKA-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508849#comment-16508849
 ] 

ASF GitHub Bot commented on KAFKA-7003:
---------------------------------------

ewencp closed pull request #5159: KAFKA-7003: Set error context in message 
headers (KIP-298)
URL: https://github.com/apache/kafka/pull/5159
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 6e9bd6b9e71..d9d140b9cdc 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -57,11 +57,19 @@
     public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3;
     private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = 
"Dead Letter Queue Topic Replication Factor";
 
+    public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX 
+ "context.headers.enable";
+    public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false;
+    public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add 
headers containing error context to the messages " +
+            "written to the dead letter queue. To avoid clashing with headers 
from the original record, all error context header " +
+            "keys, all error context header keys will start with 
<code>__connect.errors.</code>";
+    private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable 
Error Context Headers";
+
     static ConfigDef config = ConnectorConfig.configDef()
         .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, 
ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, 
TOPICS_DISPLAY)
         .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, 
TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, 
TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
         .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, 
DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
-        .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY);
+        .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
+        .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, 
DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
 
     public static ConfigDef configDef() {
         return config;
@@ -107,4 +115,8 @@ public String dlqTopicName() {
     public short dlqTopicReplicationFactor() {
         return getShort(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG);
     }
+
+    public boolean isDlqContextHeadersEnabled() {
+        return getBoolean(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG);
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 97e68faa4ca..c794eb8c807 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -530,7 +530,7 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId 
id) {
         // check if topic for dead letter queue exists
         String topic = connConfig.dlqTopicName();
         if (topic != null && !topic.isEmpty()) {
-            DeadLetterQueueReporter reporter = 
DeadLetterQueueReporter.createAndSetup(config, connConfig, producerProps);
+            DeadLetterQueueReporter reporter = 
DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps);
             reporters.add(reporter);
         }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 459eeae1ff4..d36ec22ec88 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -22,13 +22,19 @@
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
@@ -46,12 +52,26 @@
 
     private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
 
+    public static final String HEADER_PREFIX = "__connect.errors.";
+    public static final String ERROR_HEADER_ORIG_TOPIC = HEADER_PREFIX + 
"topic";
+    public static final String ERROR_HEADER_ORIG_PARTITION = HEADER_PREFIX + 
"partition";
+    public static final String ERROR_HEADER_ORIG_OFFSET = HEADER_PREFIX + 
"offset";
+    public static final String ERROR_HEADER_CONNECTOR_NAME = HEADER_PREFIX + 
"connector.name";
+    public static final String ERROR_HEADER_TASK_ID = HEADER_PREFIX + 
"task.id";
+    public static final String ERROR_HEADER_STAGE = HEADER_PREFIX + "stage";
+    public static final String ERROR_HEADER_EXECUTING_CLASS = HEADER_PREFIX + 
"class.name";
+    public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + 
"exception.class.name";
+    public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX 
+ "exception.message";
+    public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = 
HEADER_PREFIX + "exception.stacktrace";
+
     private final SinkConnectorConfig connConfig;
+    private final ConnectorTaskId connectorTaskId;
 
     private KafkaProducer<byte[], byte[]> kafkaProducer;
     private ErrorHandlingMetrics errorHandlingMetrics;
 
     public static DeadLetterQueueReporter createAndSetup(WorkerConfig 
workerConfig,
+                                                         ConnectorTaskId id,
                                                          SinkConnectorConfig 
sinkConfig, Map<String, Object> producerProps) {
         String topic = sinkConfig.dlqTopicName();
 
@@ -70,7 +90,7 @@ public static DeadLetterQueueReporter 
createAndSetup(WorkerConfig workerConfig,
         }
 
         KafkaProducer<byte[], byte[]> dlqProducer = new 
KafkaProducer<>(producerProps);
-        return new DeadLetterQueueReporter(dlqProducer, sinkConfig);
+        return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id);
     }
 
     /**
@@ -79,9 +99,10 @@ public static DeadLetterQueueReporter 
createAndSetup(WorkerConfig workerConfig,
      * @param kafkaProducer a Kafka Producer to produce the original consumed 
records.
      */
     // Visible for testing
-    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, 
SinkConnectorConfig connConfig) {
+    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, 
SinkConnectorConfig connConfig, ConnectorTaskId id) {
         this.kafkaProducer = kafkaProducer;
         this.connConfig = connConfig;
+        this.connectorTaskId = id;
     }
 
     @Override
@@ -117,6 +138,10 @@ public void report(ProcessingContext context) {
                     originalMessage.key(), originalMessage.value(), 
originalMessage.headers());
         }
 
+        if (connConfig.isDlqContextHeadersEnabled()) {
+            populateContextHeaders(producerRecord, context);
+        }
+
         this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
             if (exception != null) {
                 log.error("Could not produce message to dead letter queue. 
topic=" + dlqTopicName, exception);
@@ -124,4 +149,52 @@ public void report(ProcessingContext context) {
             }
         });
     }
+
+    // Visible for testing
+    void populateContextHeaders(ProducerRecord<byte[], byte[]> producerRecord, 
ProcessingContext context) {
+        Headers headers = producerRecord.headers();
+        if (context.consumerRecord() != null) {
+            headers.add(ERROR_HEADER_ORIG_TOPIC, 
toBytes(context.consumerRecord().topic()));
+            headers.add(ERROR_HEADER_ORIG_PARTITION, 
toBytes(context.consumerRecord().partition()));
+            headers.add(ERROR_HEADER_ORIG_OFFSET, 
toBytes(context.consumerRecord().offset()));
+        }
+
+        headers.add(ERROR_HEADER_CONNECTOR_NAME, 
toBytes(connectorTaskId.connector()));
+        headers.add(ERROR_HEADER_TASK_ID, 
toBytes(String.valueOf(connectorTaskId.task())));
+        headers.add(ERROR_HEADER_STAGE, toBytes(context.stage().name()));
+        headers.add(ERROR_HEADER_EXECUTING_CLASS, 
toBytes(context.executingClass().getName()));
+        if (context.error() != null) {
+            headers.add(ERROR_HEADER_EXCEPTION, 
toBytes(context.error().getClass().getName()));
+            headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, 
toBytes(context.error().getMessage()));
+            byte[] trace;
+            if ((trace = stacktrace(context.error())) != null) {
+                headers.add(ERROR_HEADER_EXCEPTION_STACK_TRACE, trace);
+            }
+        }
+    }
+
+    private byte[] stacktrace(Throwable error) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try {
+            PrintStream stream = new PrintStream(bos, true, "UTF-8");
+            error.printStackTrace(stream);
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            log.error("Could not serialize stacktrace.", e);
+        }
+        return null;
+    }
+
+    private byte[] toBytes(int value) {
+        return toBytes(String.valueOf(value));
+    }
+
+    private byte[] toBytes(long value) {
+        return toBytes(String.valueOf(value));
+    }
+
+    private byte[] toBytes(String value) {
+        return value.getBytes(StandardCharsets.UTF_8);
+    }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index f35c514816f..f199982231f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -18,7 +18,10 @@
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.runtime.ConnectMetrics;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -26,6 +29,7 @@
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.easymock.EasyMock;
 import org.easymock.Mock;
@@ -43,8 +47,19 @@
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_CONNECTOR_NAME;
+import static 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION;
+import static 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE;
+import static 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_STACK_TRACE;
+import static 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXECUTING_CLASS;
+import static 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_OFFSET;
+import static 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_PARTITION;
+import static 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC;
+import static 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_STAGE;
+import static 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID;
 import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
@@ -81,7 +96,7 @@ public void tearDown() {
 
     @Test
     public void testDLQConfigWithEmptyTopicName() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, config(emptyMap()));
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID);
         deadLetterQueueReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -96,7 +111,7 @@ public void testDLQConfigWithEmptyTopicName() {
 
     @Test
     public void testDLQConfigWithValidTopicName() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, 
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)));
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, 
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), 
TASK_ID);
         deadLetterQueueReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -111,7 +126,7 @@ public void testDLQConfigWithValidTopicName() {
 
     @Test
     public void testReportDLQTwice() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, 
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)));
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, 
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), 
TASK_ID);
         deadLetterQueueReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -189,6 +204,64 @@ public void testSetDLQConfigs() {
         assertEquals(configuration.dlqTopicReplicationFactor(), 7);
     }
 
+    public void testDlqHeaderConsumerRecord() {
+        Map<String, String> props = new HashMap<>();
+        props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+        props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, 
"true");
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, config(props), TASK_ID);
+
+        ProcessingContext context = new ProcessingContext();
+        context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, 
"source-key".getBytes(), "source-value".getBytes()));
+        context.currentContext(Stage.TRANSFORMATION, Transformation.class);
+        context.error(new ConnectException("Test Exception"));
+
+        ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes());
+
+        deadLetterQueueReporter.populateContextHeaders(producerRecord, 
context);
+        assertEquals("source-topic", headerValue(producerRecord, 
ERROR_HEADER_ORIG_TOPIC));
+        assertEquals("7", headerValue(producerRecord, 
ERROR_HEADER_ORIG_PARTITION));
+        assertEquals("10", headerValue(producerRecord, 
ERROR_HEADER_ORIG_OFFSET));
+        assertEquals(TASK_ID.connector(), headerValue(producerRecord, 
ERROR_HEADER_CONNECTOR_NAME));
+        assertEquals(String.valueOf(TASK_ID.task()), 
headerValue(producerRecord, ERROR_HEADER_TASK_ID));
+        assertEquals(Stage.TRANSFORMATION.name(), headerValue(producerRecord, 
ERROR_HEADER_STAGE));
+        assertEquals(Transformation.class.getName(), 
headerValue(producerRecord, ERROR_HEADER_EXECUTING_CLASS));
+        assertEquals(ConnectException.class.getName(), 
headerValue(producerRecord, ERROR_HEADER_EXCEPTION));
+        assertEquals("Test Exception", headerValue(producerRecord, 
ERROR_HEADER_EXCEPTION_MESSAGE));
+        assertTrue(headerValue(producerRecord, 
ERROR_HEADER_EXCEPTION_STACK_TRACE).length() > 0);
+        assertTrue(headerValue(producerRecord, 
ERROR_HEADER_EXCEPTION_STACK_TRACE).startsWith("org.apache.kafka.connect.errors.ConnectException:
 Test Exception"));
+    }
+
+    @Test
+    public void testDlqHeaderIsAppended() {
+        Map<String, String> props = new HashMap<>();
+        props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+        props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, 
"true");
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, config(props), TASK_ID);
+
+        ProcessingContext context = new ProcessingContext();
+        context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, 
"source-key".getBytes(), "source-value".getBytes()));
+        context.currentContext(Stage.TRANSFORMATION, Transformation.class);
+        context.error(new ConnectException("Test Exception"));
+
+        ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes());
+        producerRecord.headers().add(ERROR_HEADER_ORIG_TOPIC, 
"dummy".getBytes());
+
+        deadLetterQueueReporter.populateContextHeaders(producerRecord, 
context);
+        int appearances = 0;
+        for (Header header: producerRecord.headers()) {
+            if (ERROR_HEADER_ORIG_TOPIC.equalsIgnoreCase(header.key())) {
+                appearances++;
+            }
+        }
+
+        assertEquals("source-topic", headerValue(producerRecord, 
ERROR_HEADER_ORIG_TOPIC));
+        assertEquals(2, appearances);
+    }
+
+    private String headerValue(ProducerRecord<byte[], byte[]> producerRecord, 
String headerSuffix) {
+        return new 
String(producerRecord.headers().lastHeader(headerSuffix).value());
+    }
+
     private ProcessingContext processingContext() {
         ProcessingContext context = new ProcessingContext();
         context.consumerRecord(new ConsumerRecord<>(TOPIC, 5, 100, new 
byte[]{'a', 'b'}, new byte[]{'x'}));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add headers with error context in messages written to the Connect 
> DeadLetterQueue topic
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7003
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7003
>             Project: Kafka
>          Issue Type: Task
>            Reporter: Arjun Satish
>            Priority: Major
>             Fix For: 2.0.0, 2.1.0
>
>
> This was added to the KIP after the feature freeze. 
> If the property {{errors.deadletterqueue.}}{{context.headers.enable}} is set 
> to {{*true*}}, the following headers will be added to the produced raw 
> message (only if they don't already exist in the message). All values will be 
> serialized as UTF-8 strings.
> ||Header Name||Description||
> |__connect.errors.topic|Name of the topic that contained the message.|
> |__connect.errors.task.id|The numeric ID of the task that encountered the 
> error (encoded as a UTF-8 string).|
> |__connect.errors.stage|The name of the stage where the error occurred.|
> |__connect.errors.partition|The numeric ID of the partition in the original 
> topic that contained the message (encoded as a UTF-8 string).|
> |__connect.errors.offset|The numeric value of the message offset in the 
> original topic (encoded as a UTF-8 string).|
> |__connect.errors.exception.stacktrace|The stacktrace of the exception.|
> |__connect.errors.exception.message|The message in the exception.|
> |__connect.errors.exception.class.name|The fully qualified classname of the 
> exception that was thrown during the execution.|
> |__connect.errors.connector.name|The name of the connector which encountered 
> the error.|
> |__connect.errors.class.name|The fully qualified name of the class that 
> caused the error.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to