This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push: new 14f2d22 KAFKA-12323: Set timestamp in record context when punctuate (#10170) 14f2d22 is described below commit 14f2d22ef74d65ea8b2843ab63566e4b70a34d2f Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Tue Feb 23 20:41:02 2021 -0800 KAFKA-12323: Set timestamp in record context when punctuate (#10170) We need to preserve the timestamp when punctuating so that downstream operators would retain it via context. Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../streams/processor/internals/StreamTask.java | 29 +++++--- .../processor/internals/StreamThreadTest.java | 85 +++++++++++++++++++++- 2 files changed, 99 insertions(+), 15 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 02f2d9f..a2e103c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -697,17 +698,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, log.trace("Start processing one record [{}]", record); - updateProcessorContext( - currNode, - wallClockTime, - new ProcessorRecordContext( - record.timestamp, - record.offset(), - record.partition(), - record.topic(), - record.headers() - ) + final ProcessorRecordContext recordContext = new ProcessorRecordContext( + record.timestamp, + record.offset(), + record.partition(), + record.topic(), + record.headers() ); + updateProcessorContext(currNode, wallClockTime, recordContext); maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name()); final Record<Object, Object> toProcess = new Record<>( @@ -801,7 +799,16 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix)); } - updateProcessorContext(node, time.milliseconds(), null); + // when punctuating, we need to preserve the timestamp (this can be either system time or event time) + // while other record context are set as dummy: null topic, -1 partition, -1 offset and empty header + final ProcessorRecordContext recordContext = new ProcessorRecordContext( + timestamp, + -1L, + -1, + null, + new RecordHeaders() + ); + updateProcessorContext(node, time.milliseconds(), recordContext); if (log.isTraceEnabled()) { log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", node.name(), timestamp, type); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index bfc32d5..e2b1549 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -48,6 +48,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; @@ -1805,7 +1806,7 @@ public class StreamThreadTest { final List<Long> punctuatedStreamTime = new ArrayList<>(); final List<Long> punctuatedWallClockTime = new ArrayList<>(); final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> punctuateProcessor = - () -> new org.apache.kafka.streams.processor.Processor<Object, Object>() { + () -> new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() { @Override public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add); @@ -1814,9 +1815,6 @@ public class StreamThreadTest { @Override public void process(final Object key, final Object value) {} - - @Override - public void close() {} }; internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor); @@ -1875,6 +1873,85 @@ public class StreamThreadTest { } @Test + public void shouldPunctuateWithTimestampPreservedInProcessorContext() { + final org.apache.kafka.streams.kstream.TransformerSupplier<Object, Object, KeyValue<Object, Object>> punctuateProcessor = + () -> new org.apache.kafka.streams.kstream.Transformer<Object, Object, KeyValue<Object, Object>>() { + @Override + public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { + context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward("key", "value")); + context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, timestamp -> context.forward("key", "value")); + } + + @Override + public KeyValue<Object, Object> transform(final Object key, final Object value) { + return null; + } + + @Override + public void close() {} + }; + + final List<Long> peekedContextTime = new ArrayList<>(); + final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> peekProcessor = + () -> new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() { + @Override + public void process(final Object key, final Object value) { + peekedContextTime.add(context.timestamp()); + } + }; + + internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) + .transform(punctuateProcessor) + .process(peekProcessor); + internalStreamsBuilder.buildAndOptimizeTopology(); + + final long currTime = mockTime.milliseconds(); + final StreamThread thread = createStreamThread(CLIENT_ID, config, false); + + thread.setState(StreamThread.State.STARTING); + thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); + final List<TopicPartition> assignedPartitions = new ArrayList<>(); + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + + // assign single partition + assignedPartitions.add(t1p1); + activeTasks.put(task1, Collections.singleton(t1p1)); + + thread.taskManager().handleAssignment(activeTasks, emptyMap()); + + clientSupplier.consumer.assign(assignedPartitions); + clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); + thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); + + thread.runOnce(); + assertEquals(0, peekedContextTime.size()); + + mockTime.sleep(100L); + thread.runOnce(); + + assertEquals(1, peekedContextTime.size()); + assertEquals(currTime + 100L, peekedContextTime.get(0).longValue()); + + clientSupplier.consumer.addRecord(new ConsumerRecord<>( + topic1, + 1, + 0L, + 100L, + TimestampType.CREATE_TIME, + ConsumerRecord.NULL_CHECKSUM, + "K".getBytes().length, + "V".getBytes().length, + "K".getBytes(), + "V".getBytes())); + + thread.runOnce(); + + assertEquals(2, peekedContextTime.size()); + assertEquals(0L, peekedContextTime.get(1).longValue()); + } + + @Test public void shouldAlwaysUpdateTasksMetadataAfterChangingState() { final StreamThread thread = createStreamThread(CLIENT_ID, config, false); ThreadMetadata metadata = thread.threadMetadata();