This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 74d55ca6393 KAFKA-16448: Add timestamp to error handler context 
(#17054)
74d55ca6393 is described below

commit 74d55ca6393ced994caf900a72e558c1535109fc
Author: Sebastien Viale <[email protected]>
AuthorDate: Thu Sep 5 17:42:52 2024 +0200

    KAFKA-16448: Add timestamp to error handler context (#17054)
    
    Part of KIP-1033.
    
    Co-authored-by: Dabz <[email protected]>
    Co-authored-by: loicgreffier <[email protected]>
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kafka/streams/errors/ErrorHandlerContext.java  | 33 ++++++++++++++++++++++
 .../internals/DefaultErrorHandlerContext.java      | 11 +++++++-
 .../streams/processor/internals/ProcessorNode.java |  3 +-
 .../processor/internals/RecordCollectorImpl.java   |  6 ++--
 .../processor/internals/RecordDeserializer.java    |  3 +-
 .../streams/processor/internals/StreamTask.java    |  3 +-
 .../ProcessingExceptionHandlerIntegrationTest.java | 29 ++++++++++---------
 .../processor/internals/ProcessorNodeTest.java     |  1 +
 .../processor/internals/RecordCollectorTest.java   |  1 +
 .../internals/RecordDeserializerTest.java          |  1 +
 10 files changed, 72 insertions(+), 19 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
index 6c5e4f19596..af67c8f03a4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
@@ -18,8 +18,14 @@ package org.apache.kafka.streams.errors;
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+
 
 /**
  * This interface allows user code to inspect the context of a record that has 
failed processing.
@@ -102,4 +108,31 @@ public interface ErrorHandlerContext {
      * @return the task ID
      */
     TaskId taskId();
+
+    /**
+     * Return the current timestamp.
+     *
+     * <p> If it is triggered while processing a record streamed from the 
source processor,
+     * timestamp is defined as the timestamp of the current input record; the 
timestamp is extracted from
+     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} 
by {@link TimestampExtractor}.
+     * Note, that an upstream {@link Processor} might have set a new timestamp 
by calling
+     * {@link ProcessorContext#forward(Record) 
forward(record.withTimestamp(...))}.
+     * In particular, some Kafka Streams DSL operators set result record 
timestamps explicitly,
+     * to guarantee deterministic results.
+     *
+     * <p> If it is triggered while processing a record generated not from the 
source processor (for example,
+     * if this method is invoked from the punctuate call):
+     * <ul>
+     *   <li>In case of {@link PunctuationType#STREAM_TIME} timestamp is 
defined as the current task's stream time,
+     *   which is defined as the largest timestamp of any record processed by 
the task
+     *   <li>In case of {@link PunctuationType#WALL_CLOCK_TIME} timestamp is 
defined the current system time
+     * </ul>
+     *
+     * <p> If it is triggered from a deserialization failure, timestamp is 
defined as the timestamp of the
+     * current rawRecord
+     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}
+     *
+     * @return the timestamp
+     */
+    long timestamp();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
index 77500ce3c36..fc44e9c95fb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
@@ -33,6 +33,8 @@ public class DefaultErrorHandlerContext implements 
ErrorHandlerContext {
     private final Headers headers;
     private final String processorNodeId;
     private final TaskId taskId;
+
+    private final long timestamp;
     private ProcessorContext processorContext;
 
     public DefaultErrorHandlerContext(final ProcessorContext processorContext,
@@ -41,7 +43,8 @@ public class DefaultErrorHandlerContext implements 
ErrorHandlerContext {
                                       final long offset,
                                       final Headers headers,
                                       final String processorNodeId,
-                                      final TaskId taskId) {
+                                      final TaskId taskId,
+                                      final long timestamp) {
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
@@ -49,6 +52,7 @@ public class DefaultErrorHandlerContext implements 
ErrorHandlerContext {
         this.processorNodeId = processorNodeId;
         this.taskId = taskId;
         this.processorContext = processorContext;
+        this.timestamp = timestamp;
     }
 
     @Override
@@ -81,6 +85,11 @@ public class DefaultErrorHandlerContext implements 
ErrorHandlerContext {
         return taskId;
     }
 
+    @Override
+    public long timestamp() {
+        return timestamp;
+    }
+
     @Override
     public String toString() {
         // we do exclude headers on purpose, to not accidentally log user data
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 5df1a1bd005..2b945f2da29 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -211,7 +211,8 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
                 internalProcessorContext.offset(),
                 internalProcessorContext.headers(),
                 internalProcessorContext.currentNode().name(),
-                internalProcessorContext.taskId());
+                internalProcessorContext.taskId(),
+                internalProcessorContext.timestamp());
 
             final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
             try {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index ced6fdef779..7525513ea22 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -308,7 +308,8 @@ public class RecordCollectorImpl implements RecordCollector 
{
             context.recordContext().offset(),
             context.recordContext().headers(),
             processorNodeId,
-            taskId
+            taskId,
+            context.recordContext().timestamp()
         );
         final ProducerRecord<K, V> record = new ProducerRecord<>(topic, 
partition, timestamp, key, value, headers);
 
@@ -405,7 +406,8 @@ public class RecordCollectorImpl implements RecordCollector 
{
                     context.recordContext().offset(),
                     context.recordContext().headers(),
                     processorNodeId,
-                    taskId
+                    taskId,
+                    context.recordContext().timestamp()
                 );
 
                 final ProductionExceptionHandlerResponse response;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 5fc03352ecc..aefa15da660 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -92,7 +92,8 @@ public class RecordDeserializer {
             rawRecord.offset(),
             rawRecord.headers(),
             sourceNodeName,
-            processorContext.taskId());
+            processorContext.taskId(),
+            rawRecord.timestamp());
 
         final DeserializationHandlerResponse response;
         try {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f08cfa7fd67..6a4e97e4707 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -946,7 +946,8 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 recordContext.offset(),
                 recordContext.headers(),
                 node.name(),
-                id()
+                id(),
+                recordContext.timestamp()
             );
 
             final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
index d0c32310550..d23b9ba0a6d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
@@ -61,6 +61,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class ProcessingExceptionHandlerIntegrationTest {
     private final String threadId = Thread.currentThread().getName();
 
+    private static final Instant TIMESTAMP = Instant.now();
+
     @Test
     public void 
shouldFailWhenProcessingExceptionOccursIfExceptionHandlerReturnsFail() {
         final List<KeyValue<String, String>> events = Arrays.asList(
@@ -71,7 +73,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
         );
 
         final List<KeyValueTimestamp<String, String>> expectedProcessedRecords 
= Collections.singletonList(
-            new KeyValueTimestamp<>("ID123-1", "ID123-A1", 0)
+            new KeyValueTimestamp<>("ID123-1", "ID123-A1", 
TIMESTAMP.toEpochMilli())
         );
 
         final MockProcessorSupplier<String, String, Void, Void> processor = 
new MockProcessorSupplier<>();
@@ -90,7 +92,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
             final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
 
             final StreamsException exception = 
assertThrows(StreamsException.class,
-                () -> inputTopic.pipeKeyValueList(events, Instant.EPOCH, 
Duration.ZERO));
+                () -> inputTopic.pipeKeyValueList(events, TIMESTAMP, 
Duration.ZERO));
 
             assertTrue(exception.getMessage().contains("Exception caught in 
process. "
                 + "taskId=0_0, processor=KSTREAM-SOURCE-0000000000, 
topic=TOPIC_NAME, "
@@ -118,10 +120,10 @@ public class ProcessingExceptionHandlerIntegrationTest {
         );
 
         final List<KeyValueTimestamp<String, String>> expectedProcessedRecords 
= Arrays.asList(
-            new KeyValueTimestamp<>("ID123-1", "ID123-A1", 0),
-            new KeyValueTimestamp<>("ID123-3", "ID123-A3", 0),
-            new KeyValueTimestamp<>("ID123-4", "ID123-A4", 0),
-            new KeyValueTimestamp<>("ID123-6", "ID123-A6", 0)
+            new KeyValueTimestamp<>("ID123-1", "ID123-A1", 
TIMESTAMP.toEpochMilli()),
+            new KeyValueTimestamp<>("ID123-3", "ID123-A3", 
TIMESTAMP.toEpochMilli()),
+            new KeyValueTimestamp<>("ID123-4", "ID123-A4", 
TIMESTAMP.toEpochMilli()),
+            new KeyValueTimestamp<>("ID123-6", "ID123-A6", 
TIMESTAMP.toEpochMilli())
         );
 
         final MockProcessorSupplier<String, String, Void, Void> processor = 
new MockProcessorSupplier<>();
@@ -138,7 +140,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
 
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
             final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
-            inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO);
+            inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO);
 
             assertEquals(expectedProcessedRecords.size(), 
processor.theCapturedProcessor().processed().size());
             assertIterableEquals(expectedProcessedRecords, 
processor.theCapturedProcessor().processed());
@@ -176,10 +178,10 @@ public class ProcessingExceptionHandlerIntegrationTest {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
             final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
             isExecuted.set(false);
-            inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+            inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
             assertTrue(isExecuted.get());
             isExecuted.set(false);
-            final StreamsException e = assertThrows(StreamsException.class, () 
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
+            final StreamsException e = assertThrows(StreamsException.class, () 
-> inputTopic.pipeInput(eventError.key, eventError.value, TIMESTAMP));
             assertTrue(e.getMessage().contains("Exception caught in process. "
                 + "taskId=0_0, processor=KSTREAM-SOURCE-0000000000, 
topic=TOPIC_NAME, "
                 + "partition=0, offset=1"));
@@ -212,10 +214,10 @@ public class ProcessingExceptionHandlerIntegrationTest {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
             final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
             isExecuted.set(false);
-            inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+            inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
             assertTrue(isExecuted.get());
             isExecuted.set(false);
-            inputTopic.pipeInput(eventFalse.key, eventFalse.value, 
Instant.EPOCH);
+            inputTopic.pipeInput(eventFalse.key, eventFalse.value, TIMESTAMP);
             assertFalse(isExecuted.get());
         }
     }
@@ -245,7 +247,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
             final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
             isExecuted.set(false);
-            inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+            inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
             assertTrue(isExecuted.get());
             isExecuted.set(false);
             final StreamsException e = assertThrows(StreamsException.class, () 
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
@@ -281,7 +283,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
             final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
             isExecuted.set(false);
-            inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+            inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
             assertTrue(isExecuted.get());
             isExecuted.set(false);
             final StreamsException e = assertThrows(StreamsException.class, () 
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
@@ -328,6 +330,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
         assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) 
record.value()));
         assertEquals("TOPIC_NAME", context.topic());
         assertEquals("KSTREAM-PROCESSOR-0000000003", 
context.processorNodeId());
+        assertEquals(TIMESTAMP.toEpochMilli(), context.timestamp());
         assertTrue(exception.getMessage().contains("Exception should be 
handled by processing exception handler"));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 9fe9244e0aa..7f4e2d08491 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -356,6 +356,7 @@ public class ProcessorNodeTest {
             assertEquals(internalProcessorContext.offset(), context.offset());
             assertEquals(internalProcessorContext.currentNode().name(), 
context.processorNodeId());
             assertEquals(internalProcessorContext.taskId(), context.taskId());
+            assertEquals(internalProcessorContext.timestamp(), 
context.timestamp());
             assertEquals(KEY, record.key());
             assertEquals(VALUE, record.value());
             assertInstanceOf(RuntimeException.class, exception);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index c4479bbcc5a..9ed655332d0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -1892,6 +1892,7 @@ public class RecordCollectorTest {
             assertEquals(expectedContext.recordContext().offset(), 
context.offset());
             assertEquals(expectedProcessorNodeId, context.processorNodeId());
             assertEquals(expectedTaskId, context.taskId());
+            assertEquals(expectedContext.recordContext().timestamp(), 
context.timestamp());
             assertInstanceOf(RuntimeException.class, exception);
             assertEquals("KABOOM!", exception.getMessage());
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 1bca1c9e379..662af063c08 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -280,6 +280,7 @@ public class RecordDeserializerTest {
             assertEquals(expectedRecord.offset(), context.offset());
             assertEquals(expectedProcessorNodeId, context.processorNodeId());
             assertEquals(expectedTaskId, context.taskId());
+            assertEquals(expectedRecord.timestamp(), context.timestamp());
             assertEquals(expectedRecord, record);
             assertInstanceOf(RuntimeException.class, exception);
             assertEquals("KABOOM!", exception.getMessage());

Reply via email to