This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8bdafb49eea KAFKA-7699: Anchored punctuation (#19937)
8bdafb49eea is described below

commit 8bdafb49eea06c87a5ab53eaf517758e506eb473
Author: Herman Kolstad Jakobsen <[email protected]>
AuthorDate: Mon Nov 17 19:21:11 2025 +0100

    KAFKA-7699: Anchored punctuation (#19937)
    
    This PR implements Processor API interfaces for using Anchored
    punctuation. As described in
    
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1146%3A+Anchored+punctuation,
    anchored punctuations allows the users of the PAPI to set both a start
    time and an interval for when the punctuations should be triggered. The
    changes have been based on all the places where
    
    `org.apache.kafka.streams.processor.internals.ProcessorContext.schedule()`
    is used.
    
    The changes consist of introducing new interfaces as the punctuation
    time logic in the
    `org.apache.kafka.streams.processor.internals.PunctuationSchedule`
    already supports calculating a new punctuation time based on a start
    time and an interval.
    
    The original `schedule()` implementations have been refactored into
    using the new `schedule()` implementation supporting the `startTime`
    parameter. For the original implementations, the `startTime` parameter
    has been sat to `null`, effectively using the "method overloading"
    programming technique.
    
    Reviewers: Bill Bejeck<[email protected]>
---
 .../kafka/streams/processor/ProcessorContext.java  |  41 ++++++
 .../streams/processor/api/ProcessingContext.java   |  41 ++++++
 .../ForwardingDisabledProcessorContext.java        |   9 ++
 .../internals/GlobalProcessorContextImpl.java      |   6 +
 .../processor/internals/ProcessorContextImpl.java  |  20 ++-
 .../processor/internals/PunctuationSchedule.java   |   2 +-
 .../streams/processor/internals/StreamTask.java    |  30 ++++-
 .../internals/AbstractProcessorContextTest.java    |   9 ++
 .../processor/internals/StreamTaskTest.java        | 146 +++++++++++++++++++++
 .../kafka/test/InternalMockProcessorContext.java   |   9 ++
 .../org/apache/kafka/test/MockApiProcessor.java    |  26 +++-
 .../java/org/apache/kafka/test/MockProcessor.java  |   5 +
 .../org/apache/kafka/test/MockProcessorNode.java   |   5 +
 .../apache/kafka/test/NoOpProcessorContext.java    |   9 ++
 .../streams/processor/MockProcessorContext.java    |  31 ++++-
 .../processor/api/MockProcessorContext.java        |  23 +++-
 .../kafka/streams/MockProcessorContextTest.java    |  40 ++++++
 17 files changed, 440 insertions(+), 12 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 45ab411d62e..aed4497aa74 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.io.File;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Map;
 
 /**
@@ -157,6 +158,46 @@ public interface ProcessorContext {
                          final PunctuationType type,
                          final Punctuator callback);
 
+    /**
+     * Schedule a periodic operation for processors. A processor may call this 
method during a
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}'s
+     * {@link 
org.apache.kafka.streams.kstream.ValueTransformerWithKey#init(ProcessorContext) 
initialization} or
+     * {@link 
org.apache.kafka.streams.kstream.ValueTransformerWithKey#transform(Object, 
Object) processing} to
+     * schedule a periodic callback &mdash; called a punctuation &mdash; to 
{@link Punctuator#punctuate(long)}.
+     * The type parameter controls what notion of time is used for punctuation:
+     * <ul>
+     *   <li>{@link PunctuationType#STREAM_TIME} &mdash; uses "stream time", 
which is advanced by the processing of messages
+     *   in accordance with the timestamp as extracted by the {@link 
TimestampExtractor} in use.
+     *   The first punctuation will be triggered by the first record that is 
processed.
+     *   <b>NOTE:</b> Only advances as messages arrive</li>
+     *   <li>{@link PunctuationType#WALL_CLOCK_TIME} &mdash; uses system time 
(the wall-clock time),
+     *   which advances independent of whether new messages arrive.
+     *   The first punctuation will be triggered after interval has elapsed.
+     *   <b>NOTE:</b> This is best effort only as its granularity is limited 
by how long an iteration of the
+     *   processing loop takes to complete</li>
+     * </ul>
+     *
+     * <b>Skipping punctuations:</b> Punctuations will not be triggered more 
than once at any given timestamp.
+     * This means that "missed" punctuation will be skipped.
+     * It's possible to "miss" a punctuation if:
+     * <ul>
+     *   <li>with {@link PunctuationType#STREAM_TIME}, when stream time 
advances more than interval</li>
+     *   <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too 
short interval, ...</li>
+     * </ul>
+     *
+     * @param startTime the time for the first punctuation.
+     *                  The subsequent trigger times are calculated using the 
{@code startTime} and the {@code interval}
+     * @param interval the time interval between punctuations (supported 
minimum is 1 millisecond)
+     * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link 
PunctuationType#WALL_CLOCK_TIME}
+     * @param callback a function consuming timestamps representing the 
current stream or system time
+     * @return a handle allowing cancellation of the punctuation schedule 
established by this method
+     * @throws IllegalArgumentException if the interval is not representable 
in milliseconds
+     */
+    Cancellable schedule(final Instant startTime,
+                         final Duration interval,
+                         final PunctuationType type,
+                         final Punctuator callback);
+
     /**
      * Forward a key/value pair to all downstream processors.
      * Used the input record's timestamp as timestamp for the output record.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
index 7f6874c8c19..2030db9ae95 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.io.File;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Map;
 import java.util.Optional;
 
@@ -159,6 +160,46 @@ public interface ProcessingContext {
                          final PunctuationType type,
                          final Punctuator callback);
 
+    /**
+     * Schedule a periodic operation for processors. A processor may call this 
method during
+     * {@link Processor#init(ProcessorContext) initialization},
+     * {@link Processor#process(Record) processing},
+     * {@link FixedKeyProcessor#init(FixedKeyProcessorContext) 
initialization}, or
+     * {@link FixedKeyProcessor#process(FixedKeyRecord) processing} to
+     * schedule a periodic callback &mdash; called a punctuation &mdash; to 
{@link Punctuator#punctuate(long)}.
+     * The type parameter controls what notion of time is used for punctuation:
+     * <ul>
+     *   <li>{@link PunctuationType#STREAM_TIME} &mdash; uses "stream time", 
which is advanced by the processing of messages
+     *   in accordance with the timestamp as extracted by the {@link 
TimestampExtractor} in use.
+     *   The first punctuation will be triggered by the first record that is 
processed.
+     *   <b>NOTE:</b> Only advances as messages arrive</li>
+     *   <li>{@link PunctuationType#WALL_CLOCK_TIME} &mdash; uses system time 
(the wall-clock time),
+     *   which advances independent of whether new messages arrive.
+     *   The first punctuation will be triggered after interval has elapsed.
+     *   <b>NOTE:</b> This is best effort only as its granularity is limited 
by how long an iteration of the
+     *   processing loop takes to complete</li>
+     * </ul>
+     *
+     * <b>Skipping punctuations:</b> Punctuations will not be triggered more 
than once at any given timestamp.
+     * This means that "missed" punctuation will be skipped.
+     * It's possible to "miss" a punctuation if:
+     * <ul>
+     *   <li>with {@link PunctuationType#STREAM_TIME}, when stream time 
advances more than interval</li>
+     *   <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too 
short interval, ...</li>
+     * </ul>
+     * @param startTime the time for the first punctuation. The subsequent 
trigger times are calculated
+     *                  using the {@code startTime} and the {@code interval}
+     * @param interval the time interval between punctuations (supported 
minimum is 1 millisecond)
+     * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link 
PunctuationType#WALL_CLOCK_TIME}
+     * @param callback a function consuming timestamps representing the 
current stream or system time
+     * @return a handle allowing cancellation of the punctuation schedule 
established by this method
+     * @throws IllegalArgumentException if the interval is not representable 
in milliseconds
+     */
+    Cancellable schedule(final Instant startTime,
+                         final Duration interval,
+                         final PunctuationType type,
+                         final Punctuator callback);
+
     /**
      * Request a commit. Note that calling {@code commit()} is only a request 
for a commit, but it does not execute one.
      * Hence, when {@code commit()} returns, no commit was executed yet. 
However, Kafka Streams will commit as soon
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 5091074d70b..679699b556f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.To;
 
 import java.io.File;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Map;
 import java.util.Objects;
 
@@ -96,6 +97,14 @@ public final class ForwardingDisabledProcessorContext 
implements ProcessorContex
         return delegate.schedule(interval, type, callback);
     }
 
+    @Override
+    public Cancellable schedule(final Instant startTime,
+                                final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
+        return delegate.schedule(startTime, interval, type, callback);
+    }
+
     @Override
     public <K, V> void forward(final K key, final V value) {
         throw new StreamsException(EXPLANATION);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 01b694863fd..4456b6b5814 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.time.Duration;
+import java.time.Instant;
 
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;
 
@@ -133,6 +134,11 @@ public class GlobalProcessorContextImpl extends 
AbstractProcessorContext<Object,
         throw new UnsupportedOperationException("this should not happen: 
schedule() not supported in global processor context.");
     }
 
+    @Override
+    public Cancellable schedule(final Instant startTime, final Duration 
interval, final PunctuationType type, final Punctuator callback) {
+        throw new UnsupportedOperationException("this should not happen: 
schedule() not supported in global processor context.");
+    }
+
     @Override
     public void logChange(final String storeName,
                           final Bytes key,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 93961daf97b..26f724b75f4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -313,7 +314,24 @@ public final class ProcessorContextImpl extends 
AbstractProcessorContext<Object,
         if (intervalMs < 1) {
             throw new IllegalArgumentException("The minimum supported 
scheduling interval is 1 millisecond.");
         }
-        return streamTask.schedule(intervalMs, type, callback);
+        return streamTask.schedule(intervalMs, type, callback);    }
+
+    @Override
+    public Cancellable schedule(
+            final Instant startTime,
+            final Duration interval,
+            final PunctuationType type,
+            final Punctuator callback) throws IllegalArgumentException {
+        throwUnsupportedOperationExceptionIfStandby("schedule");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, 
"interval");
+        final long intervalMs = validateMillisecondDuration(interval, 
msgPrefix);
+        if (intervalMs < 1) {
+            throw new IllegalArgumentException("The minimum supported 
scheduling interval is 1 millisecond.");
+        }
+        if (startTime.isBefore(Instant.EPOCH)) {
+            throw new IllegalArgumentException("The minimum supported start 
time is Instant.EPOCH.");
+        }
+        return streamTask.schedule(startTime, intervalMs, type, callback);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index 6cbc0b20e08..aa703be4658 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -69,7 +69,7 @@ public class PunctuationSchedule extends 
Stamped<ProcessorNode<?, ?, ?, ?>> {
     public PunctuationSchedule next(final long currTimestamp) {
         long nextPunctuationTime = timestamp + interval;
         if (currTimestamp >= nextPunctuationTime) {
-            // we missed one ore more punctuations
+            // we missed one or more punctuations
             // avoid scheduling a new punctuations immediately, this can 
happen:
             // - when using STREAM_TIME punctuation and there was a gap i.e., 
no data was
             //   received for at least 2*interval
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4ab672fd0c0..bea998c4711 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -53,6 +53,7 @@ import 
org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1185,15 +1186,27 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         switch (type) {
             case STREAM_TIME:
                 // align punctuation to 0L, punctuate as soon as we have data
-                return schedule(0L, interval, type, punctuator);
+                return schedule(0L, interval, type, punctuator, false);
             case WALL_CLOCK_TIME:
                 // align punctuation to now, punctuate after interval has 
elapsed
-                return schedule(time.milliseconds() + interval, interval, 
type, punctuator);
+                return schedule(time.milliseconds() + interval, interval, 
type, punctuator, false);
             default:
                 throw new IllegalArgumentException("Unrecognized 
PunctuationType: " + type);
         }
     }
 
+    /**
+     * Schedules a punctuation for the processor
+     *
+     * @param startTime time of the first punctuation
+     * @param interval the interval in milliseconds
+     * @param type     the punctuation type
+     * @throws IllegalStateException if the current node is not null
+     */
+    public Cancellable schedule(final Instant startTime, final long interval, 
final PunctuationType type, final Punctuator punctuator) {
+        return schedule(startTime.toEpochMilli(), interval, type, punctuator, 
true);
+    }
+
     /**
      * Schedules a punctuation for the processor
      *
@@ -1202,12 +1215,12 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
      * @param type      the punctuation type
      * @throws IllegalStateException if the current node is not null
      */
-    private Cancellable schedule(final long startTime, final long interval, 
final PunctuationType type, final Punctuator punctuator) {
+    private Cancellable schedule(final long startTime, final long interval, 
final PunctuationType type, final Punctuator punctuator, final boolean 
anchored) {
         if (processorContext.currentNode() == null) {
             throw new IllegalStateException(String.format("%sCurrent node is 
null", logPrefix));
         }
 
-        final PunctuationSchedule schedule = new 
PunctuationSchedule(processorContext.currentNode(), startTime, interval, 
punctuator);
+        final PunctuationSchedule schedule = getInitialSchedule(startTime, 
interval, type, punctuator, anchored);
 
         switch (type) {
             case STREAM_TIME:
@@ -1222,6 +1235,15 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         }
     }
 
+    // For anchored schedule, we want to have all punctuations only fire on 
times based on combinations of startTime and interval
+    // This method ensures that the first anchored punctuation is not fired 
prematurely due to startTime < now
+    private PunctuationSchedule getInitialSchedule(final long startTime, final 
long interval, final PunctuationType type, final Punctuator punctuator, final 
boolean anchored) {
+        final PunctuationSchedule originalSchedule = new 
PunctuationSchedule(processorContext.currentNode(), startTime, interval, 
punctuator);
+        final long now = (type == PunctuationType.WALL_CLOCK_TIME) ? 
time.milliseconds() : streamTime();
+
+        return (anchored && startTime < now) ? originalSchedule.next(now) : 
originalSchedule;
+    }
+
     /**
      * Possibly trigger registered stream-time punctuation functions if
      * current partition group timestamp has reached the defined stamp
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 26b8d36f3b0..8c0a2717d32 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -44,6 +44,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Properties;
 
 import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
@@ -221,6 +222,14 @@ public class AbstractProcessorContextTest {
             return null;
         }
 
+        @Override
+        public Cancellable schedule(final Instant startTime,
+                                    final Duration interval,
+                                    final PunctuationType type,
+                                    final Punctuator callback) {
+            return null;
+        }
+
         @Override
         public <K, V> void forward(final Record<K, V> record) {}
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a142ebb9d95..9b8a3dc3335 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -95,6 +95,7 @@ import org.mockito.quality.Strictness;
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -190,6 +191,8 @@ public class StreamTaskTest {
     private final MockProcessorNode<Integer, Integer, ?, ?> 
processorStreamTime = new MockProcessorNode<>(10L);
     private final MockProcessorNode<Integer, Integer, ?, ?> 
processorSystemTime = new MockProcessorNode<>(10L, 
PunctuationType.WALL_CLOCK_TIME);
 
+    private final MockProcessorNode<Integer, Integer, ?, ?> 
anchoredProcessorStreamTime = new MockProcessorNode<>(Instant.ofEpochMilli(15), 
10L, PunctuationType.STREAM_TIME);
+
     private final String storeName = "store";
     private final MockKeyValueStore stateStore = new 
MockKeyValueStore(storeName, false);
     private final TopicPartition changelogPartition = new 
TopicPartition("store-changelog", 1);
@@ -1267,6 +1270,70 @@ public class StreamTaskTest {
         
processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
 20L, 142L, 155L, 160L);
     }
 
+    @Test
+    public void shouldPunctuateUsingAnchoredStreamStartTime() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+        final MockProcessorNode<Integer, Integer, ?, ?> 
anchoredProcessorSystemTime = new MockProcessorNode<>(Instant.ofEpochMilli(15), 
10L, PunctuationType.WALL_CLOCK_TIME);  // Dummy
+        task = createStatelessTaskWithAnchoredPunctuation(createConfig(), 
anchoredProcessorSystemTime);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        task.resumePollingForPartitionsWithAvailableSpace();
+
+        task.addRecords(partition1, asList(
+                getConsumerRecordWithOffsetAsTimestamp(partition1, 14),
+                getConsumerRecordWithOffsetAsTimestamp(partition1, 24)
+        ));
+
+        task.addRecords(partition2, asList(
+                getConsumerRecordWithOffsetAsTimestamp(partition2, 15),
+                getConsumerRecordWithOffsetAsTimestamp(partition2, 25)
+        ));
+
+        task.updateLags();
+
+        // st: -1
+        assertFalse(task.canPunctuateStreamTime());
+        assertFalse(task.maybePunctuateStreamTime()); // punctuate at 15
+
+        // st: 14
+        assertTrue(task.process(0L));
+        assertEquals(3, task.numBuffered());
+        assertEquals(1, source1.numReceived);
+        assertEquals(0, source2.numReceived);
+        assertFalse(task.canPunctuateStreamTime());
+        assertFalse(task.maybePunctuateStreamTime());
+
+        // st: 15
+        // punctuate at 15 due to startTime
+        assertTrue(task.process(0L));
+        assertEquals(2, task.numBuffered());
+        assertEquals(1, source1.numReceived);
+        assertEquals(1, source2.numReceived);
+        assertTrue(task.canPunctuateStreamTime());
+        assertTrue(task.maybePunctuateStreamTime());
+
+        // st: 24
+        assertTrue(task.process(0L));
+        assertEquals(1, task.numBuffered());
+        assertEquals(2, source1.numReceived);
+        assertEquals(1, source2.numReceived);
+        assertFalse(task.canPunctuateStreamTime());
+        assertFalse(task.maybePunctuateStreamTime());
+
+        // st: 25
+        // punctuate at 25 due to startTime + interval
+        assertTrue(task.process(0L));
+        assertEquals(0, task.numBuffered());
+        assertEquals(2, source1.numReceived);
+        assertEquals(2, source2.numReceived);
+        assertTrue(task.canPunctuateStreamTime());
+        assertTrue(task.maybePunctuateStreamTime());
+
+        
anchoredProcessorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
 15L, 25L);
+    }
+
     @Test
     public void shouldRespectPunctuateCancellationStreamTime() {
         when(stateManager.taskId()).thenReturn(taskId);
@@ -1745,6 +1812,36 @@ public class StreamTaskTest {
         
processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
 now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
     }
 
+    @Test
+    public void 
shouldPunctuateUsingAnchoredSystemStartTimeWithStartTimeBeforeNow() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+
+        final long now = time.milliseconds();
+        final long testStartTime = now + (10L - (now % 10L));   // Used to 
make test deterministic
+        time.setCurrentTimeMs(testStartTime);
+        final MockProcessorNode<Integer, Integer, ?, ?> 
anchoredProcessorSystemTime = new 
MockProcessorNode<>(Instant.ofEpochMilli(testStartTime - 10), 10L, 
PunctuationType.WALL_CLOCK_TIME);
+        task = createStatelessTaskWithAnchoredPunctuation(createConfig("100"), 
anchoredProcessorSystemTime);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        assertFalse(task.canPunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+
+        // Expects the punctuations to happen at a time satisfying startTime + 
n * interval
+        // where n is a positive number or 0
+        time.sleep(9);
+        assertFalse(task.canPunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(1);
+        assertTrue(task.canPunctuateSystemTime());
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(10);
+        assertTrue(task.canPunctuateSystemTime());
+        assertTrue(task.maybePunctuateSystemTime());
+        
anchoredProcessorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
  testStartTime + 10, testStartTime + 20);
+    }
+
     @Test
     public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime()
 {
         when(stateManager.taskId()).thenReturn(taskId);
@@ -2016,6 +2113,15 @@ public class StreamTaskTest {
         task.schedule(1, PunctuationType.STREAM_TIME, timestamp -> { });
     }
 
+    @Test
+    public void shouldNotThrowExceptionOnAnchoredSchedule() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+        task = createStatelessTask(createConfig("100"));
+        task.processorContext().setCurrentNode(processorStreamTime);
+        task.schedule(Instant.ofEpochMilli(1000), 1,  
PunctuationType.STREAM_TIME, timestamp -> { });
+    }
+
     @Test
     public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
         when(stateManager.taskId()).thenReturn(taskId);
@@ -3352,6 +3458,46 @@ public class StreamTaskTest {
         );
     }
 
+    private StreamTask createStatelessTaskWithAnchoredPunctuation(
+            final StreamsConfig config,
+            final MockProcessorNode<Integer, Integer, ?, ?>  
anchoredProcessorSystemTime
+    ) {
+        final ProcessorTopology topology = withSources(
+                asList(source1, source2, anchoredProcessorStreamTime, 
anchoredProcessorSystemTime),
+                mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2))
+        );
+
+        source1.addChild(anchoredProcessorStreamTime);
+        source2.addChild(anchoredProcessorStreamTime);
+        source1.addChild(anchoredProcessorSystemTime);
+        source2.addChild(anchoredProcessorSystemTime);
+
+        final InternalProcessorContext<?, ?> context = new 
ProcessorContextImpl(
+                taskId,
+                config,
+                stateManager,
+                streamsMetrics,
+                null
+        );
+
+        return new StreamTask(
+                taskId,
+                partitions,
+                topology,
+                consumer,
+                new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
+                new StreamsMetricsImpl(metrics, "test", "processId", 
"applicationId", time),
+                stateDirectory,
+                cache,
+                time,
+                stateManager,
+                recordCollector,
+                context,
+                logContext,
+                false
+        );
+    }
+
     private StreamTask createStatelessTaskWithForwardingTopology(final 
SourceNode<Integer, Integer> sourceNode) {
         final ProcessorTopology topology = withSources(
             asList(sourceNode, processorStreamTime),
diff --git 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 1361df9a09e..debe7066c2f 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -58,6 +58,7 @@ import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -324,6 +325,14 @@ public class InternalMockProcessorContext<KOut, VOut>
         throw new UnsupportedOperationException("schedule() not supported.");
     }
 
+    @Override
+    public Cancellable schedule(final Instant startTime,
+                                final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) throws 
IllegalArgumentException {
+        throw new UnsupportedOperationException("schedule() not supported.");
+    }
+
     @Override
     public void commit() {}
 
diff --git a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java 
b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
index fed0d625982..573b9b9f912 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
@@ -26,6 +26,7 @@ import 
org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -46,6 +47,7 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> 
implements Processor<KIn, VI
     private Cancellable scheduleCancellable;
 
     private final PunctuationType punctuationType;
+    private final Instant startTime;
     private final long scheduleInterval;
 
     private boolean commitRequested = false;
@@ -54,6 +56,15 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> 
implements Processor<KIn, VI
     public MockApiProcessor(final PunctuationType punctuationType,
                             final long scheduleInterval) {
         this.punctuationType = punctuationType;
+        this.startTime = null;  // unanchored schedule so start time is 
undefined
+        this.scheduleInterval = scheduleInterval;
+    }
+
+    public MockApiProcessor(final PunctuationType punctuationType,
+                            final Instant startTime,
+                            final long scheduleInterval) {
+        this.punctuationType = punctuationType;
+        this.startTime = startTime;
         this.scheduleInterval = scheduleInterval;
     }
 
@@ -65,10 +76,17 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> 
implements Processor<KIn, VI
     public void init(final ProcessorContext<KOut, VOut> context) {
         this.context = context;
         if (scheduleInterval > 0L) {
-            scheduleCancellable = context.schedule(
-                Duration.ofMillis(scheduleInterval),
-                punctuationType,
-                (punctuationType == PunctuationType.STREAM_TIME ? 
punctuatedStreamTime : punctuatedSystemTime)::add
+            scheduleCancellable = (startTime == null)
+                    ? context.schedule(
+                    Duration.ofMillis(scheduleInterval),
+                    punctuationType,
+                    (punctuationType == PunctuationType.STREAM_TIME ? 
punctuatedStreamTime : punctuatedSystemTime)::add
+            )
+                    : context.schedule(
+                    startTime,
+                    Duration.ofMillis(scheduleInterval),
+                    punctuationType,
+                    (punctuationType == PunctuationType.STREAM_TIME ? 
punctuatedStreamTime : punctuatedSystemTime)::add
             );
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java 
b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 8c1812ccf14..a4832d4471f 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +38,10 @@ public class MockProcessor<KIn, VIn, KOut, VOut> implements 
Processor<KIn, VIn,
         delegate = new MockApiProcessor<>(punctuationType, scheduleInterval);
     }
 
+    public MockProcessor(final PunctuationType punctuationType, final Instant 
startTime, final long scheduleInterval) {
+        delegate = new MockApiProcessor<>(punctuationType, startTime, 
scheduleInterval);
+    }
+
     public MockProcessor() {
         delegate = new MockApiProcessor<>();
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java 
b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 28b8f26bc93..dd1187ba129 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
+import java.time.Instant;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -42,6 +43,10 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends 
ProcessorNode<KIn,
         this(new MockProcessor<>(punctuationType, scheduleInterval));
     }
 
+    public MockProcessorNode(final Instant startTime, final long 
scheduleInterval, final PunctuationType punctuationType) {
+        this(new MockProcessor<>(punctuationType, startTime, 
scheduleInterval));
+    }
+
     public MockProcessorNode() {
         this(new MockProcessor<>());
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 734f744f20f..38059cb93d7 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -79,6 +80,14 @@ public class NoOpProcessorContext extends 
AbstractProcessorContext<Object, Objec
         return null;
     }
 
+    @Override
+    public Cancellable schedule(final Instant startTime,
+                                final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
+        return null;
+    }
+
     @Override
     public <K, V> void forward(final Record<K, V> record) {
         forwardedValues.put(record.key(), record.value());
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 08cf542305c..4cd29b5bc64 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -34,6 +34,7 @@ import 
org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 
 import java.io.File;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -80,17 +81,31 @@ public class MockProcessorContext implements 
ProcessorContext, RecordCollector.S
      * {@link CapturedPunctuator} holds captured punctuators, along with their 
scheduling information.
      */
     public static class CapturedPunctuator {
+        private final Instant startTime;
         private final long intervalMs;
         private final PunctuationType type;
         private final Punctuator punctuator;
         private boolean cancelled = false;
 
         private CapturedPunctuator(final long intervalMs, final 
PunctuationType type, final Punctuator punctuator) {
+            this.startTime = null;  // unanchored punctuator so start time is 
undefined
+            this.intervalMs = intervalMs;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        private CapturedPunctuator(final Instant startTime, final long 
intervalMs, final PunctuationType type, final Punctuator punctuator) {
+            this.startTime = startTime;
             this.intervalMs = intervalMs;
             this.type = type;
             this.punctuator = punctuator;
         }
 
+        @SuppressWarnings({"WeakerAccess", "unused"})
+        public Instant getStartTime() {
+            return startTime;
+        }
+
         @SuppressWarnings({"WeakerAccess", "unused"})
         public long getIntervalMs() {
             return intervalMs;
@@ -463,9 +478,23 @@ public class MockProcessorContext implements 
ProcessorContext, RecordCollector.S
             throw new IllegalArgumentException("The minimum supported 
scheduling interval is 1 millisecond.");
         }
         final CapturedPunctuator capturedPunctuator = new 
CapturedPunctuator(intervalMs, type, callback);
-
         punctuators.add(capturedPunctuator);
+        return capturedPunctuator::cancel;    }
 
+    @Override
+    public Cancellable schedule(final Instant startTime,
+                                final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) throws 
IllegalArgumentException {
+        final long intervalMs = ApiUtils.validateMillisecondDuration(interval, 
"interval");
+        if (intervalMs < 1) {
+            throw new IllegalArgumentException("The minimum supported 
scheduling interval is 1 millisecond.");
+        }
+        if (startTime.isBefore(Instant.EPOCH)) {
+            throw new IllegalArgumentException("The minimum supported start 
time is Instant.EPOCH.");
+        }
+        final CapturedPunctuator capturedPunctuator = new 
CapturedPunctuator(startTime, intervalMs, type, callback);
+        punctuators.add(capturedPunctuator);
         return capturedPunctuator::cancel;
     }
 
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
index 25f728cd567..48fad67f0f4 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -41,6 +41,7 @@ import 
org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 
 import java.io.File;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -114,17 +115,30 @@ public class MockProcessorContext<KForward, VForward> 
implements ProcessorContex
      * {@link CapturedPunctuator} holds captured punctuators, along with their 
scheduling information.
      */
     public static final class CapturedPunctuator {
+        private final Instant startTime;
         private final Duration interval;
         private final PunctuationType type;
         private final Punctuator punctuator;
         private boolean cancelled = false;
 
         private CapturedPunctuator(final Duration interval, final 
PunctuationType type, final Punctuator punctuator) {
+            this.startTime = null;  // unanchored punctuator so start time is 
undefined
             this.interval = interval;
             this.type = type;
             this.punctuator = punctuator;
         }
 
+        private CapturedPunctuator(final Instant startTime, final Duration 
interval, final PunctuationType type, final Punctuator punctuator) {
+            this.startTime = startTime;
+            this.interval = interval;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        public Instant getStartTime() {
+            return startTime;
+        }
+
         public Duration getInterval() {
             return interval;
         }
@@ -373,9 +387,16 @@ public class MockProcessorContext<KForward, VForward> 
implements ProcessorContex
                                 final PunctuationType type,
                                 final Punctuator callback) {
         final CapturedPunctuator capturedPunctuator = new 
CapturedPunctuator(interval, type, callback);
-
         punctuators.add(capturedPunctuator);
+        return capturedPunctuator::cancel;    }
 
+    @Override
+    public Cancellable schedule(final Instant startTime,
+                                final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
+        final CapturedPunctuator capturedPunctuator = new 
CapturedPunctuator(startTime, interval, type, callback);
+        punctuators.add(capturedPunctuator);
         return capturedPunctuator::cancel;
     }
 
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index c760e5dd1d6..cacedfafbf7 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -445,6 +446,45 @@ public class MockProcessorContextTest {
         assertTrue(context.committed());
     }
 
+    @Test
+    public void shouldCaptureAnchoredPunctuator() {
+        final Transformer<String, Long, KeyValue<String, Long>> transformer = 
new Transformer<>() {
+            @Override
+            public void init(final ProcessorContext context) {
+                context.schedule(
+                        Instant.ofEpochMilli(1000),
+                        Duration.ofSeconds(1L),
+                        PunctuationType.WALL_CLOCK_TIME,
+                        timestamp -> context.commit()
+                );
+            }
+
+            @Override
+            public KeyValue<String, Long> transform(final String key, final 
Long value) {
+                return null;
+            }
+
+            @Override
+            public void close() { }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext();
+
+        transformer.init(context);
+
+        final MockProcessorContext.CapturedPunctuator capturedPunctuator = 
context.scheduledPunctuators().get(0);
+        assertEquals(Instant.ofEpochMilli(1000), 
capturedPunctuator.getStartTime());
+        assertEquals(1000L, capturedPunctuator.getIntervalMs());
+        assertEquals(PunctuationType.WALL_CLOCK_TIME, 
capturedPunctuator.getType());
+        assertFalse(capturedPunctuator.cancelled());
+
+        final Punctuator punctuator = capturedPunctuator.getPunctuator();
+        assertFalse(context.committed());
+
+        punctuator.punctuate(1234L);
+        assertTrue(context.committed());
+    }
+
     @SuppressWarnings("resource")
     @Test
     public void fullConstructorShouldSetAllExpectedAttributes() {


Reply via email to