This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 74d55ca6393 KAFKA-16448: Add timestamp to error handler context
(#17054)
74d55ca6393 is described below
commit 74d55ca6393ced994caf900a72e558c1535109fc
Author: Sebastien Viale <[email protected]>
AuthorDate: Thu Sep 5 17:42:52 2024 +0200
KAFKA-16448: Add timestamp to error handler context (#17054)
Part of KIP-1033.
Co-authored-by: Dabz <[email protected]>
Co-authored-by: loicgreffier <[email protected]>
Reviewers: Matthias J. Sax <[email protected]>
---
.../kafka/streams/errors/ErrorHandlerContext.java | 33 ++++++++++++++++++++++
.../internals/DefaultErrorHandlerContext.java | 11 +++++++-
.../streams/processor/internals/ProcessorNode.java | 3 +-
.../processor/internals/RecordCollectorImpl.java | 6 ++--
.../processor/internals/RecordDeserializer.java | 3 +-
.../streams/processor/internals/StreamTask.java | 3 +-
.../ProcessingExceptionHandlerIntegrationTest.java | 29 ++++++++++---------
.../processor/internals/ProcessorNodeTest.java | 1 +
.../processor/internals/RecordCollectorTest.java | 1 +
.../internals/RecordDeserializerTest.java | 1 +
10 files changed, 72 insertions(+), 19 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
index 6c5e4f19596..af67c8f03a4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
@@ -18,8 +18,14 @@ package org.apache.kafka.streams.errors;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+
/**
* This interface allows user code to inspect the context of a record that has
failed processing.
@@ -102,4 +108,31 @@ public interface ErrorHandlerContext {
* @return the task ID
*/
TaskId taskId();
+
+ /**
+ * Return the current timestamp.
+ *
+ * <p> If it is triggered while processing a record streamed from the
source processor,
+ * timestamp is defined as the timestamp of the current input record; the
timestamp is extracted from
+ * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}
by {@link TimestampExtractor}.
+ * Note, that an upstream {@link Processor} might have set a new timestamp
by calling
+ * {@link ProcessorContext#forward(Record)
forward(record.withTimestamp(...))}.
+ * In particular, some Kafka Streams DSL operators set result record
timestamps explicitly,
+ * to guarantee deterministic results.
+ *
+ * <p> If it is triggered while processing a record generated not from the
source processor (for example,
+ * if this method is invoked from the punctuate call):
+ * <ul>
+ * <li>In case of {@link PunctuationType#STREAM_TIME} timestamp is
defined as the current task's stream time,
+ * which is defined as the largest timestamp of any record processed by
the task
+ * <li>In case of {@link PunctuationType#WALL_CLOCK_TIME} timestamp is
defined the current system time
+ * </ul>
+ *
+ * <p> If it is triggered from a deserialization failure, timestamp is
defined as the timestamp of the
+ * current rawRecord
+ * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}
+ *
+ * @return the timestamp
+ */
+ long timestamp();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
index 77500ce3c36..fc44e9c95fb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
@@ -33,6 +33,8 @@ public class DefaultErrorHandlerContext implements
ErrorHandlerContext {
private final Headers headers;
private final String processorNodeId;
private final TaskId taskId;
+
+ private final long timestamp;
private ProcessorContext processorContext;
public DefaultErrorHandlerContext(final ProcessorContext processorContext,
@@ -41,7 +43,8 @@ public class DefaultErrorHandlerContext implements
ErrorHandlerContext {
final long offset,
final Headers headers,
final String processorNodeId,
- final TaskId taskId) {
+ final TaskId taskId,
+ final long timestamp) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
@@ -49,6 +52,7 @@ public class DefaultErrorHandlerContext implements
ErrorHandlerContext {
this.processorNodeId = processorNodeId;
this.taskId = taskId;
this.processorContext = processorContext;
+ this.timestamp = timestamp;
}
@Override
@@ -81,6 +85,11 @@ public class DefaultErrorHandlerContext implements
ErrorHandlerContext {
return taskId;
}
+ @Override
+ public long timestamp() {
+ return timestamp;
+ }
+
@Override
public String toString() {
// we do exclude headers on purpose, to not accidentally log user data
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 5df1a1bd005..2b945f2da29 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -211,7 +211,8 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.currentNode().name(),
- internalProcessorContext.taskId());
+ internalProcessorContext.taskId(),
+ internalProcessorContext.timestamp());
final ProcessingExceptionHandler.ProcessingHandlerResponse
response;
try {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index ced6fdef779..7525513ea22 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -308,7 +308,8 @@ public class RecordCollectorImpl implements RecordCollector
{
context.recordContext().offset(),
context.recordContext().headers(),
processorNodeId,
- taskId
+ taskId,
+ context.recordContext().timestamp()
);
final ProducerRecord<K, V> record = new ProducerRecord<>(topic,
partition, timestamp, key, value, headers);
@@ -405,7 +406,8 @@ public class RecordCollectorImpl implements RecordCollector
{
context.recordContext().offset(),
context.recordContext().headers(),
processorNodeId,
- taskId
+ taskId,
+ context.recordContext().timestamp()
);
final ProductionExceptionHandlerResponse response;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 5fc03352ecc..aefa15da660 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -92,7 +92,8 @@ public class RecordDeserializer {
rawRecord.offset(),
rawRecord.headers(),
sourceNodeName,
- processorContext.taskId());
+ processorContext.taskId(),
+ rawRecord.timestamp());
final DeserializationHandlerResponse response;
try {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f08cfa7fd67..6a4e97e4707 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -946,7 +946,8 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
recordContext.offset(),
recordContext.headers(),
node.name(),
- id()
+ id(),
+ recordContext.timestamp()
);
final ProcessingExceptionHandler.ProcessingHandlerResponse
response;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
index d0c32310550..d23b9ba0a6d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
@@ -61,6 +61,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ProcessingExceptionHandlerIntegrationTest {
private final String threadId = Thread.currentThread().getName();
+ private static final Instant TIMESTAMP = Instant.now();
+
@Test
public void
shouldFailWhenProcessingExceptionOccursIfExceptionHandlerReturnsFail() {
final List<KeyValue<String, String>> events = Arrays.asList(
@@ -71,7 +73,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
);
final List<KeyValueTimestamp<String, String>> expectedProcessedRecords
= Collections.singletonList(
- new KeyValueTimestamp<>("ID123-1", "ID123-A1", 0)
+ new KeyValueTimestamp<>("ID123-1", "ID123-A1",
TIMESTAMP.toEpochMilli())
);
final MockProcessorSupplier<String, String, Void, Void> processor =
new MockProcessorSupplier<>();
@@ -90,7 +92,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
final StreamsException exception =
assertThrows(StreamsException.class,
- () -> inputTopic.pipeKeyValueList(events, Instant.EPOCH,
Duration.ZERO));
+ () -> inputTopic.pipeKeyValueList(events, TIMESTAMP,
Duration.ZERO));
assertTrue(exception.getMessage().contains("Exception caught in
process. "
+ "taskId=0_0, processor=KSTREAM-SOURCE-0000000000,
topic=TOPIC_NAME, "
@@ -118,10 +120,10 @@ public class ProcessingExceptionHandlerIntegrationTest {
);
final List<KeyValueTimestamp<String, String>> expectedProcessedRecords
= Arrays.asList(
- new KeyValueTimestamp<>("ID123-1", "ID123-A1", 0),
- new KeyValueTimestamp<>("ID123-3", "ID123-A3", 0),
- new KeyValueTimestamp<>("ID123-4", "ID123-A4", 0),
- new KeyValueTimestamp<>("ID123-6", "ID123-A6", 0)
+ new KeyValueTimestamp<>("ID123-1", "ID123-A1",
TIMESTAMP.toEpochMilli()),
+ new KeyValueTimestamp<>("ID123-3", "ID123-A3",
TIMESTAMP.toEpochMilli()),
+ new KeyValueTimestamp<>("ID123-4", "ID123-A4",
TIMESTAMP.toEpochMilli()),
+ new KeyValueTimestamp<>("ID123-6", "ID123-A6",
TIMESTAMP.toEpochMilli())
);
final MockProcessorSupplier<String, String, Void, Void> processor =
new MockProcessorSupplier<>();
@@ -138,7 +140,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
- inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO);
+ inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO);
assertEquals(expectedProcessedRecords.size(),
processor.theCapturedProcessor().processed().size());
assertIterableEquals(expectedProcessedRecords,
processor.theCapturedProcessor().processed());
@@ -176,10 +178,10 @@ public class ProcessingExceptionHandlerIntegrationTest {
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
isExecuted.set(false);
- inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+ inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
assertTrue(isExecuted.get());
isExecuted.set(false);
- final StreamsException e = assertThrows(StreamsException.class, ()
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
+ final StreamsException e = assertThrows(StreamsException.class, ()
-> inputTopic.pipeInput(eventError.key, eventError.value, TIMESTAMP));
assertTrue(e.getMessage().contains("Exception caught in process. "
+ "taskId=0_0, processor=KSTREAM-SOURCE-0000000000,
topic=TOPIC_NAME, "
+ "partition=0, offset=1"));
@@ -212,10 +214,10 @@ public class ProcessingExceptionHandlerIntegrationTest {
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
isExecuted.set(false);
- inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+ inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
assertTrue(isExecuted.get());
isExecuted.set(false);
- inputTopic.pipeInput(eventFalse.key, eventFalse.value,
Instant.EPOCH);
+ inputTopic.pipeInput(eventFalse.key, eventFalse.value, TIMESTAMP);
assertFalse(isExecuted.get());
}
}
@@ -245,7 +247,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
isExecuted.set(false);
- inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+ inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
assertTrue(isExecuted.get());
isExecuted.set(false);
final StreamsException e = assertThrows(StreamsException.class, ()
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
@@ -281,7 +283,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
isExecuted.set(false);
- inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+ inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
assertTrue(isExecuted.get());
isExecuted.set(false);
final StreamsException e = assertThrows(StreamsException.class, ()
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
@@ -328,6 +330,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String)
record.value()));
assertEquals("TOPIC_NAME", context.topic());
assertEquals("KSTREAM-PROCESSOR-0000000003",
context.processorNodeId());
+ assertEquals(TIMESTAMP.toEpochMilli(), context.timestamp());
assertTrue(exception.getMessage().contains("Exception should be
handled by processing exception handler"));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 9fe9244e0aa..7f4e2d08491 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -356,6 +356,7 @@ public class ProcessorNodeTest {
assertEquals(internalProcessorContext.offset(), context.offset());
assertEquals(internalProcessorContext.currentNode().name(),
context.processorNodeId());
assertEquals(internalProcessorContext.taskId(), context.taskId());
+ assertEquals(internalProcessorContext.timestamp(),
context.timestamp());
assertEquals(KEY, record.key());
assertEquals(VALUE, record.value());
assertInstanceOf(RuntimeException.class, exception);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index c4479bbcc5a..9ed655332d0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -1892,6 +1892,7 @@ public class RecordCollectorTest {
assertEquals(expectedContext.recordContext().offset(),
context.offset());
assertEquals(expectedProcessorNodeId, context.processorNodeId());
assertEquals(expectedTaskId, context.taskId());
+ assertEquals(expectedContext.recordContext().timestamp(),
context.timestamp());
assertInstanceOf(RuntimeException.class, exception);
assertEquals("KABOOM!", exception.getMessage());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 1bca1c9e379..662af063c08 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -280,6 +280,7 @@ public class RecordDeserializerTest {
assertEquals(expectedRecord.offset(), context.offset());
assertEquals(expectedProcessorNodeId, context.processorNodeId());
assertEquals(expectedTaskId, context.taskId());
+ assertEquals(expectedRecord.timestamp(), context.timestamp());
assertEquals(expectedRecord, record);
assertInstanceOf(RuntimeException.class, exception);
assertEquals("KABOOM!", exception.getMessage());