This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 70d8828 KAFKA-7228: Set errorHandlingMetrics for dead letter queue
70d8828 is described below
commit 70d882861e1bf3eb503c84a31834e8b628de2df9
Author: Arjun Satish <[email protected]>
AuthorDate: Thu Aug 2 14:36:02 2018 -0700
KAFKA-7228: Set errorHandlingMetrics for dead letter queue
DLQ reporter does not get a `errorHandlingMetrics` object when created by
the worker. This results in an NPE.
Signed-off-by: Arjun Satish <arjunconfluent.io>
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
Author: Arjun Satish <[email protected]>
Reviewers: Konstantine Karantasis <[email protected]>, Ewen
Cheslack-Postava <[email protected]>
Closes #5440 from wicknicks/KAFKA-7228
---
.../org/apache/kafka/connect/runtime/Worker.java | 8 ++----
.../runtime/errors/DeadLetterQueueReporter.java | 20 +++++++------
.../connect/runtime/errors/ErrorReporter.java | 8 ------
.../kafka/connect/runtime/errors/LogReporter.java | 15 +++++-----
.../connect/runtime/ErrorHandlingTaskTest.java | 9 ++----
.../connect/runtime/errors/ErrorReporterTest.java | 33 +++++++++++-----------
6 files changed, 43 insertions(+), 50 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index e2fe6b6..df73a43 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -523,14 +523,13 @@ public class Worker {
private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id,
SinkConnectorConfig connConfig,
ErrorHandlingMetrics
errorHandlingMetrics) {
ArrayList<ErrorReporter> reporters = new ArrayList<>();
- LogReporter logReporter = new LogReporter(id, connConfig);
- logReporter.metrics(errorHandlingMetrics);
+ LogReporter logReporter = new LogReporter(id, connConfig,
errorHandlingMetrics);
reporters.add(logReporter);
// check if topic for dead letter queue exists
String topic = connConfig.dlqTopicName();
if (topic != null && !topic.isEmpty()) {
- DeadLetterQueueReporter reporter =
DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps);
+ DeadLetterQueueReporter reporter =
DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps,
errorHandlingMetrics);
reporters.add(reporter);
}
@@ -540,8 +539,7 @@ public class Worker {
private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId id,
ConnectorConfig connConfig,
ErrorHandlingMetrics
errorHandlingMetrics) {
List<ErrorReporter> reporters = new ArrayList<>();
- LogReporter logReporter = new LogReporter(id, connConfig);
- logReporter.metrics(errorHandlingMetrics);
+ LogReporter logReporter = new LogReporter(id, connConfig,
errorHandlingMetrics);
reporters.add(logReporter);
return reporters;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index d36ec22..c059dcf 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -36,6 +36,7 @@ import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutionException;
import static java.util.Collections.singleton;
@@ -66,13 +67,14 @@ public class DeadLetterQueueReporter implements
ErrorReporter {
private final SinkConnectorConfig connConfig;
private final ConnectorTaskId connectorTaskId;
+ private final ErrorHandlingMetrics errorHandlingMetrics;
private KafkaProducer<byte[], byte[]> kafkaProducer;
- private ErrorHandlingMetrics errorHandlingMetrics;
public static DeadLetterQueueReporter createAndSetup(WorkerConfig
workerConfig,
ConnectorTaskId id,
- SinkConnectorConfig
sinkConfig, Map<String, Object> producerProps) {
+ SinkConnectorConfig
sinkConfig, Map<String, Object> producerProps,
+ ErrorHandlingMetrics
errorHandlingMetrics) {
String topic = sinkConfig.dlqTopicName();
try (AdminClient admin = AdminClient.create(workerConfig.originals()))
{
@@ -90,7 +92,7 @@ public class DeadLetterQueueReporter implements ErrorReporter
{
}
KafkaProducer<byte[], byte[]> dlqProducer = new
KafkaProducer<>(producerProps);
- return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id);
+ return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id,
errorHandlingMetrics);
}
/**
@@ -99,14 +101,16 @@ public class DeadLetterQueueReporter implements
ErrorReporter {
* @param kafkaProducer a Kafka Producer to produce the original consumed
records.
*/
// Visible for testing
- DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer,
SinkConnectorConfig connConfig, ConnectorTaskId id) {
+ DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer,
SinkConnectorConfig connConfig,
+ ConnectorTaskId id, ErrorHandlingMetrics
errorHandlingMetrics) {
+ Objects.requireNonNull(kafkaProducer);
+ Objects.requireNonNull(connConfig);
+ Objects.requireNonNull(id);
+ Objects.requireNonNull(errorHandlingMetrics);
+
this.kafkaProducer = kafkaProducer;
this.connConfig = connConfig;
this.connectorTaskId = id;
- }
-
- @Override
- public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
this.errorHandlingMetrics = errorHandlingMetrics;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
index f7df1b2..5833616 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -28,12 +28,4 @@ public interface ErrorReporter {
*/
void report(ProcessingContext context);
- /**
- * Provides the container for error handling metrics to implementations.
This method will be called once the error
- * reporter object is instantiated.
- *
- * @param errorHandlingMetrics metrics for error handling (cannot be null).
- */
- void metrics(ErrorHandlingMetrics errorHandlingMetrics);
-
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
index e81bd54..8b07adf 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/LogReporter.java
@@ -21,6 +21,8 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Objects;
+
/**
* Writes errors and their context to application logs.
*/
@@ -30,12 +32,16 @@ public class LogReporter implements ErrorReporter {
private final ConnectorTaskId id;
private final ConnectorConfig connConfig;
+ private final ErrorHandlingMetrics errorHandlingMetrics;
- private ErrorHandlingMetrics errorHandlingMetrics;
+ public LogReporter(ConnectorTaskId id, ConnectorConfig connConfig,
ErrorHandlingMetrics errorHandlingMetrics) {
+ Objects.requireNonNull(id);
+ Objects.requireNonNull(connConfig);
+ Objects.requireNonNull(errorHandlingMetrics);
- public LogReporter(ConnectorTaskId id, ConnectorConfig connConfig) {
this.id = id;
this.connConfig = connConfig;
+ this.errorHandlingMetrics = errorHandlingMetrics;
}
/**
@@ -57,11 +63,6 @@ public class LogReporter implements ErrorReporter {
errorHandlingMetrics.recordErrorLogged();
}
- @Override
- public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
- this.errorHandlingMetrics = errorHandlingMetrics;
- }
-
// Visible for testing
String message(ProcessingContext context) {
return String.format("Error encountered in task %s. %s",
String.valueOf(id),
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index e931642..1bf9c71 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -166,8 +166,7 @@ public class ErrorHandlingTaskTest {
Map<String, String> reportProps = new HashMap<>();
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG,
"true");
- LogReporter reporter = new LogReporter(taskId,
connConfig(reportProps));
- reporter.metrics(errorHandlingMetrics);
+ LogReporter reporter = new LogReporter(taskId,
connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.metrics(errorHandlingMetrics);
@@ -218,8 +217,7 @@ public class ErrorHandlingTaskTest {
Map<String, String> reportProps = new HashMap<>();
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG,
"true");
- LogReporter reporter = new LogReporter(taskId,
connConfig(reportProps));
- reporter.metrics(errorHandlingMetrics);
+ LogReporter reporter = new LogReporter(taskId,
connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.metrics(errorHandlingMetrics);
@@ -283,8 +281,7 @@ public class ErrorHandlingTaskTest {
Map<String, String> reportProps = new HashMap<>();
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG,
"true");
- LogReporter reporter = new LogReporter(taskId,
connConfig(reportProps));
- reporter.metrics(errorHandlingMetrics);
+ LogReporter reporter = new LogReporter(taskId,
connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.metrics(errorHandlingMetrics);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index f199982..fa628b0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -94,10 +94,15 @@ public class ErrorReporterTest {
}
}
+ @Test(expected = NullPointerException.class)
+ public void initializeDLQWithNullMetrics() {
+ new DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID,
null);
+ }
+
@Test
public void testDLQConfigWithEmptyTopicName() {
- DeadLetterQueueReporter deadLetterQueueReporter = new
DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID);
- deadLetterQueueReporter.metrics(errorHandlingMetrics);
+ DeadLetterQueueReporter deadLetterQueueReporter = new
DeadLetterQueueReporter(
+ producer, config(emptyMap()), TASK_ID, errorHandlingMetrics);
ProcessingContext context = processingContext();
@@ -111,8 +116,8 @@ public class ErrorReporterTest {
@Test
public void testDLQConfigWithValidTopicName() {
- DeadLetterQueueReporter deadLetterQueueReporter = new
DeadLetterQueueReporter(producer,
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)),
TASK_ID);
- deadLetterQueueReporter.metrics(errorHandlingMetrics);
+ DeadLetterQueueReporter deadLetterQueueReporter = new
DeadLetterQueueReporter(
+ producer,
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)),
TASK_ID, errorHandlingMetrics);
ProcessingContext context = processingContext();
@@ -126,8 +131,8 @@ public class ErrorReporterTest {
@Test
public void testReportDLQTwice() {
- DeadLetterQueueReporter deadLetterQueueReporter = new
DeadLetterQueueReporter(producer,
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)),
TASK_ID);
- deadLetterQueueReporter.metrics(errorHandlingMetrics);
+ DeadLetterQueueReporter deadLetterQueueReporter = new
DeadLetterQueueReporter(
+ producer,
config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)),
TASK_ID, errorHandlingMetrics);
ProcessingContext context = processingContext();
@@ -142,8 +147,7 @@ public class ErrorReporterTest {
@Test
public void testLogOnDisabledLogReporter() {
- LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap()));
- logReporter.metrics(errorHandlingMetrics);
+ LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap()),
errorHandlingMetrics);
ProcessingContext context = processingContext();
context.error(new RuntimeException());
@@ -155,8 +159,7 @@ public class ErrorReporterTest {
@Test
public void testLogOnEnabledLogReporter() {
- LogReporter logReporter = new LogReporter(TASK_ID,
config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")));
- logReporter.metrics(errorHandlingMetrics);
+ LogReporter logReporter = new LogReporter(TASK_ID,
config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")),
errorHandlingMetrics);
ProcessingContext context = processingContext();
context.error(new RuntimeException());
@@ -168,8 +171,7 @@ public class ErrorReporterTest {
@Test
public void testLogMessageWithNoRecords() {
- LogReporter logReporter = new LogReporter(TASK_ID,
config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")));
- logReporter.metrics(errorHandlingMetrics);
+ LogReporter logReporter = new LogReporter(TASK_ID,
config(singletonMap(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true")),
errorHandlingMetrics);
ProcessingContext context = processingContext();
@@ -184,8 +186,7 @@ public class ErrorReporterTest {
props.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
props.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
- LogReporter logReporter = new LogReporter(TASK_ID, config(props));
- logReporter.metrics(errorHandlingMetrics);
+ LogReporter logReporter = new LogReporter(TASK_ID, config(props),
errorHandlingMetrics);
ProcessingContext context = processingContext();
@@ -208,7 +209,7 @@ public class ErrorReporterTest {
Map<String, String> props = new HashMap<>();
props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG,
"true");
- DeadLetterQueueReporter deadLetterQueueReporter = new
DeadLetterQueueReporter(producer, config(props), TASK_ID);
+ DeadLetterQueueReporter deadLetterQueueReporter = new
DeadLetterQueueReporter(producer, config(props), TASK_ID, errorHandlingMetrics);
ProcessingContext context = new ProcessingContext();
context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10,
"source-key".getBytes(), "source-value".getBytes()));
@@ -236,7 +237,7 @@ public class ErrorReporterTest {
Map<String, String> props = new HashMap<>();
props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG,
"true");
- DeadLetterQueueReporter deadLetterQueueReporter = new
DeadLetterQueueReporter(producer, config(props), TASK_ID);
+ DeadLetterQueueReporter deadLetterQueueReporter = new
DeadLetterQueueReporter(producer, config(props), TASK_ID, errorHandlingMetrics);
ProcessingContext context = new ProcessingContext();
context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10,
"source-key".getBytes(), "source-value".getBytes()));