This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8bdafb49eea KAFKA-7699: Anchored punctuation (#19937)
8bdafb49eea is described below
commit 8bdafb49eea06c87a5ab53eaf517758e506eb473
Author: Herman Kolstad Jakobsen <[email protected]>
AuthorDate: Mon Nov 17 19:21:11 2025 +0100
KAFKA-7699: Anchored punctuation (#19937)
This PR implements Processor API interfaces for using Anchored
punctuation. As described in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1146%3A+Anchored+punctuation,
anchored punctuations allows the users of the PAPI to set both a start
time and an interval for when the punctuations should be triggered. The
changes have been based on all the places where
`org.apache.kafka.streams.processor.internals.ProcessorContext.schedule()`
is used.
The changes consist of introducing new interfaces as the punctuation
time logic in the
`org.apache.kafka.streams.processor.internals.PunctuationSchedule`
already supports calculating a new punctuation time based on a start
time and an interval.
The original `schedule()` implementations have been refactored into
using the new `schedule()` implementation supporting the `startTime`
parameter. For the original implementations, the `startTime` parameter
has been sat to `null`, effectively using the "method overloading"
programming technique.
Reviewers: Bill Bejeck<[email protected]>
---
.../kafka/streams/processor/ProcessorContext.java | 41 ++++++
.../streams/processor/api/ProcessingContext.java | 41 ++++++
.../ForwardingDisabledProcessorContext.java | 9 ++
.../internals/GlobalProcessorContextImpl.java | 6 +
.../processor/internals/ProcessorContextImpl.java | 20 ++-
.../processor/internals/PunctuationSchedule.java | 2 +-
.../streams/processor/internals/StreamTask.java | 30 ++++-
.../internals/AbstractProcessorContextTest.java | 9 ++
.../processor/internals/StreamTaskTest.java | 146 +++++++++++++++++++++
.../kafka/test/InternalMockProcessorContext.java | 9 ++
.../org/apache/kafka/test/MockApiProcessor.java | 26 +++-
.../java/org/apache/kafka/test/MockProcessor.java | 5 +
.../org/apache/kafka/test/MockProcessorNode.java | 5 +
.../apache/kafka/test/NoOpProcessorContext.java | 9 ++
.../streams/processor/MockProcessorContext.java | 31 ++++-
.../processor/api/MockProcessorContext.java | 23 +++-
.../kafka/streams/MockProcessorContextTest.java | 40 ++++++
17 files changed, 440 insertions(+), 12 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 45ab411d62e..aed4497aa74 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
import java.io.File;
import java.time.Duration;
+import java.time.Instant;
import java.util.Map;
/**
@@ -157,6 +158,46 @@ public interface ProcessorContext {
final PunctuationType type,
final Punctuator callback);
+ /**
+ * Schedule a periodic operation for processors. A processor may call this
method during a
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}'s
+ * {@link
org.apache.kafka.streams.kstream.ValueTransformerWithKey#init(ProcessorContext)
initialization} or
+ * {@link
org.apache.kafka.streams.kstream.ValueTransformerWithKey#transform(Object,
Object) processing} to
+ * schedule a periodic callback — called a punctuation — to
{@link Punctuator#punctuate(long)}.
+ * The type parameter controls what notion of time is used for punctuation:
+ * <ul>
+ * <li>{@link PunctuationType#STREAM_TIME} — uses "stream time",
which is advanced by the processing of messages
+ * in accordance with the timestamp as extracted by the {@link
TimestampExtractor} in use.
+ * The first punctuation will be triggered by the first record that is
processed.
+ * <b>NOTE:</b> Only advances as messages arrive</li>
+ * <li>{@link PunctuationType#WALL_CLOCK_TIME} — uses system time
(the wall-clock time),
+ * which advances independent of whether new messages arrive.
+ * The first punctuation will be triggered after interval has elapsed.
+ * <b>NOTE:</b> This is best effort only as its granularity is limited
by how long an iteration of the
+ * processing loop takes to complete</li>
+ * </ul>
+ *
+ * <b>Skipping punctuations:</b> Punctuations will not be triggered more
than once at any given timestamp.
+ * This means that "missed" punctuation will be skipped.
+ * It's possible to "miss" a punctuation if:
+ * <ul>
+ * <li>with {@link PunctuationType#STREAM_TIME}, when stream time
advances more than interval</li>
+ * <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too
short interval, ...</li>
+ * </ul>
+ *
+ * @param startTime the time for the first punctuation.
+ * The subsequent trigger times are calculated using the
{@code startTime} and the {@code interval}
+ * @param interval the time interval between punctuations (supported
minimum is 1 millisecond)
+ * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link
PunctuationType#WALL_CLOCK_TIME}
+ * @param callback a function consuming timestamps representing the
current stream or system time
+ * @return a handle allowing cancellation of the punctuation schedule
established by this method
+ * @throws IllegalArgumentException if the interval is not representable
in milliseconds
+ */
+ Cancellable schedule(final Instant startTime,
+ final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback);
+
/**
* Forward a key/value pair to all downstream processors.
* Used the input record's timestamp as timestamp for the output record.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
index 7f6874c8c19..2030db9ae95 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
import java.io.File;
import java.time.Duration;
+import java.time.Instant;
import java.util.Map;
import java.util.Optional;
@@ -159,6 +160,46 @@ public interface ProcessingContext {
final PunctuationType type,
final Punctuator callback);
+ /**
+ * Schedule a periodic operation for processors. A processor may call this
method during
+ * {@link Processor#init(ProcessorContext) initialization},
+ * {@link Processor#process(Record) processing},
+ * {@link FixedKeyProcessor#init(FixedKeyProcessorContext)
initialization}, or
+ * {@link FixedKeyProcessor#process(FixedKeyRecord) processing} to
+ * schedule a periodic callback — called a punctuation — to
{@link Punctuator#punctuate(long)}.
+ * The type parameter controls what notion of time is used for punctuation:
+ * <ul>
+ * <li>{@link PunctuationType#STREAM_TIME} — uses "stream time",
which is advanced by the processing of messages
+ * in accordance with the timestamp as extracted by the {@link
TimestampExtractor} in use.
+ * The first punctuation will be triggered by the first record that is
processed.
+ * <b>NOTE:</b> Only advances as messages arrive</li>
+ * <li>{@link PunctuationType#WALL_CLOCK_TIME} — uses system time
(the wall-clock time),
+ * which advances independent of whether new messages arrive.
+ * The first punctuation will be triggered after interval has elapsed.
+ * <b>NOTE:</b> This is best effort only as its granularity is limited
by how long an iteration of the
+ * processing loop takes to complete</li>
+ * </ul>
+ *
+ * <b>Skipping punctuations:</b> Punctuations will not be triggered more
than once at any given timestamp.
+ * This means that "missed" punctuation will be skipped.
+ * It's possible to "miss" a punctuation if:
+ * <ul>
+ * <li>with {@link PunctuationType#STREAM_TIME}, when stream time
advances more than interval</li>
+ * <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too
short interval, ...</li>
+ * </ul>
+ * @param startTime the time for the first punctuation. The subsequent
trigger times are calculated
+ * using the {@code startTime} and the {@code interval}
+ * @param interval the time interval between punctuations (supported
minimum is 1 millisecond)
+ * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link
PunctuationType#WALL_CLOCK_TIME}
+ * @param callback a function consuming timestamps representing the
current stream or system time
+ * @return a handle allowing cancellation of the punctuation schedule
established by this method
+ * @throws IllegalArgumentException if the interval is not representable
in milliseconds
+ */
+ Cancellable schedule(final Instant startTime,
+ final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback);
+
/**
* Request a commit. Note that calling {@code commit()} is only a request
for a commit, but it does not execute one.
* Hence, when {@code commit()} returns, no commit was executed yet.
However, Kafka Streams will commit as soon
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 5091074d70b..679699b556f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.To;
import java.io.File;
import java.time.Duration;
+import java.time.Instant;
import java.util.Map;
import java.util.Objects;
@@ -96,6 +97,14 @@ public final class ForwardingDisabledProcessorContext
implements ProcessorContex
return delegate.schedule(interval, type, callback);
}
+ @Override
+ public Cancellable schedule(final Instant startTime,
+ final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback) {
+ return delegate.schedule(startTime, interval, type, callback);
+ }
+
@Override
public <K, V> void forward(final K key, final V value) {
throw new StreamsException(EXPLANATION);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 01b694863fd..4456b6b5814 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
import java.time.Duration;
+import java.time.Instant;
import static
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;
@@ -133,6 +134,11 @@ public class GlobalProcessorContextImpl extends
AbstractProcessorContext<Object,
throw new UnsupportedOperationException("this should not happen:
schedule() not supported in global processor context.");
}
+ @Override
+ public Cancellable schedule(final Instant startTime, final Duration
interval, final PunctuationType type, final Punctuator callback) {
+ throw new UnsupportedOperationException("this should not happen:
schedule() not supported in global processor context.");
+ }
+
@Override
public void logChange(final String storeName,
final Bytes key,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 93961daf97b..26f724b75f4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
import java.time.Duration;
+import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -313,7 +314,24 @@ public final class ProcessorContextImpl extends
AbstractProcessorContext<Object,
if (intervalMs < 1) {
throw new IllegalArgumentException("The minimum supported
scheduling interval is 1 millisecond.");
}
- return streamTask.schedule(intervalMs, type, callback);
+ return streamTask.schedule(intervalMs, type, callback); }
+
+ @Override
+ public Cancellable schedule(
+ final Instant startTime,
+ final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback) throws IllegalArgumentException {
+ throwUnsupportedOperationExceptionIfStandby("schedule");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval,
"interval");
+ final long intervalMs = validateMillisecondDuration(interval,
msgPrefix);
+ if (intervalMs < 1) {
+ throw new IllegalArgumentException("The minimum supported
scheduling interval is 1 millisecond.");
+ }
+ if (startTime.isBefore(Instant.EPOCH)) {
+ throw new IllegalArgumentException("The minimum supported start
time is Instant.EPOCH.");
+ }
+ return streamTask.schedule(startTime, intervalMs, type, callback);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index 6cbc0b20e08..aa703be4658 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -69,7 +69,7 @@ public class PunctuationSchedule extends
Stamped<ProcessorNode<?, ?, ?, ?>> {
public PunctuationSchedule next(final long currTimestamp) {
long nextPunctuationTime = timestamp + interval;
if (currTimestamp >= nextPunctuationTime) {
- // we missed one ore more punctuations
+ // we missed one or more punctuations
// avoid scheduling a new punctuations immediately, this can
happen:
// - when using STREAM_TIME punctuation and there was a gap i.e.,
no data was
// received for at least 2*interval
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4ab672fd0c0..bea998c4711 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -53,6 +53,7 @@ import
org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.IOException;
+import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -1185,15 +1186,27 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
switch (type) {
case STREAM_TIME:
// align punctuation to 0L, punctuate as soon as we have data
- return schedule(0L, interval, type, punctuator);
+ return schedule(0L, interval, type, punctuator, false);
case WALL_CLOCK_TIME:
// align punctuation to now, punctuate after interval has
elapsed
- return schedule(time.milliseconds() + interval, interval,
type, punctuator);
+ return schedule(time.milliseconds() + interval, interval,
type, punctuator, false);
default:
throw new IllegalArgumentException("Unrecognized
PunctuationType: " + type);
}
}
+ /**
+ * Schedules a punctuation for the processor
+ *
+ * @param startTime time of the first punctuation
+ * @param interval the interval in milliseconds
+ * @param type the punctuation type
+ * @throws IllegalStateException if the current node is not null
+ */
+ public Cancellable schedule(final Instant startTime, final long interval,
final PunctuationType type, final Punctuator punctuator) {
+ return schedule(startTime.toEpochMilli(), interval, type, punctuator,
true);
+ }
+
/**
* Schedules a punctuation for the processor
*
@@ -1202,12 +1215,12 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
* @param type the punctuation type
* @throws IllegalStateException if the current node is not null
*/
- private Cancellable schedule(final long startTime, final long interval,
final PunctuationType type, final Punctuator punctuator) {
+ private Cancellable schedule(final long startTime, final long interval,
final PunctuationType type, final Punctuator punctuator, final boolean
anchored) {
if (processorContext.currentNode() == null) {
throw new IllegalStateException(String.format("%sCurrent node is
null", logPrefix));
}
- final PunctuationSchedule schedule = new
PunctuationSchedule(processorContext.currentNode(), startTime, interval,
punctuator);
+ final PunctuationSchedule schedule = getInitialSchedule(startTime,
interval, type, punctuator, anchored);
switch (type) {
case STREAM_TIME:
@@ -1222,6 +1235,15 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
}
}
+ // For anchored schedule, we want to have all punctuations only fire on
times based on combinations of startTime and interval
+ // This method ensures that the first anchored punctuation is not fired
prematurely due to startTime < now
+ private PunctuationSchedule getInitialSchedule(final long startTime, final
long interval, final PunctuationType type, final Punctuator punctuator, final
boolean anchored) {
+ final PunctuationSchedule originalSchedule = new
PunctuationSchedule(processorContext.currentNode(), startTime, interval,
punctuator);
+ final long now = (type == PunctuationType.WALL_CLOCK_TIME) ?
time.milliseconds() : streamTime();
+
+ return (anchored && startTime < now) ? originalSchedule.next(now) :
originalSchedule;
+ }
+
/**
* Possibly trigger registered stream-time punctuation functions if
* current partition group timestamp has reached the defined stamp
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 26b8d36f3b0..8c0a2717d32 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -44,6 +44,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
+import java.time.Instant;
import java.util.Properties;
import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
@@ -221,6 +222,14 @@ public class AbstractProcessorContextTest {
return null;
}
+ @Override
+ public Cancellable schedule(final Instant startTime,
+ final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback) {
+ return null;
+ }
+
@Override
public <K, V> void forward(final Record<K, V> record) {}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a142ebb9d95..9b8a3dc3335 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -95,6 +95,7 @@ import org.mockito.quality.Strictness;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
+import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -190,6 +191,8 @@ public class StreamTaskTest {
private final MockProcessorNode<Integer, Integer, ?, ?>
processorStreamTime = new MockProcessorNode<>(10L);
private final MockProcessorNode<Integer, Integer, ?, ?>
processorSystemTime = new MockProcessorNode<>(10L,
PunctuationType.WALL_CLOCK_TIME);
+ private final MockProcessorNode<Integer, Integer, ?, ?>
anchoredProcessorStreamTime = new MockProcessorNode<>(Instant.ofEpochMilli(15),
10L, PunctuationType.STREAM_TIME);
+
private final String storeName = "store";
private final MockKeyValueStore stateStore = new
MockKeyValueStore(storeName, false);
private final TopicPartition changelogPartition = new
TopicPartition("store-changelog", 1);
@@ -1267,6 +1270,70 @@ public class StreamTaskTest {
processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
20L, 142L, 155L, 160L);
}
+ @Test
+ public void shouldPunctuateUsingAnchoredStreamStartTime() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ final MockProcessorNode<Integer, Integer, ?, ?>
anchoredProcessorSystemTime = new MockProcessorNode<>(Instant.ofEpochMilli(15),
10L, PunctuationType.WALL_CLOCK_TIME); // Dummy
+ task = createStatelessTaskWithAnchoredPunctuation(createConfig(),
anchoredProcessorSystemTime);
+ task.initializeIfNeeded();
+ task.completeRestoration(noOpResetter -> { });
+
+ task.resumePollingForPartitionsWithAvailableSpace();
+
+ task.addRecords(partition1, asList(
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 14),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 24)
+ ));
+
+ task.addRecords(partition2, asList(
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 15),
+ getConsumerRecordWithOffsetAsTimestamp(partition2, 25)
+ ));
+
+ task.updateLags();
+
+ // st: -1
+ assertFalse(task.canPunctuateStreamTime());
+ assertFalse(task.maybePunctuateStreamTime()); // punctuate at 15
+
+ // st: 14
+ assertTrue(task.process(0L));
+ assertEquals(3, task.numBuffered());
+ assertEquals(1, source1.numReceived);
+ assertEquals(0, source2.numReceived);
+ assertFalse(task.canPunctuateStreamTime());
+ assertFalse(task.maybePunctuateStreamTime());
+
+ // st: 15
+ // punctuate at 15 due to startTime
+ assertTrue(task.process(0L));
+ assertEquals(2, task.numBuffered());
+ assertEquals(1, source1.numReceived);
+ assertEquals(1, source2.numReceived);
+ assertTrue(task.canPunctuateStreamTime());
+ assertTrue(task.maybePunctuateStreamTime());
+
+ // st: 24
+ assertTrue(task.process(0L));
+ assertEquals(1, task.numBuffered());
+ assertEquals(2, source1.numReceived);
+ assertEquals(1, source2.numReceived);
+ assertFalse(task.canPunctuateStreamTime());
+ assertFalse(task.maybePunctuateStreamTime());
+
+ // st: 25
+ // punctuate at 25 due to startTime + interval
+ assertTrue(task.process(0L));
+ assertEquals(0, task.numBuffered());
+ assertEquals(2, source1.numReceived);
+ assertEquals(2, source2.numReceived);
+ assertTrue(task.canPunctuateStreamTime());
+ assertTrue(task.maybePunctuateStreamTime());
+
+
anchoredProcessorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
15L, 25L);
+ }
+
@Test
public void shouldRespectPunctuateCancellationStreamTime() {
when(stateManager.taskId()).thenReturn(taskId);
@@ -1745,6 +1812,36 @@ public class StreamTaskTest {
processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
}
+ @Test
+ public void
shouldPunctuateUsingAnchoredSystemStartTimeWithStartTimeBeforeNow() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+
+ final long now = time.milliseconds();
+ final long testStartTime = now + (10L - (now % 10L)); // Used to
make test deterministic
+ time.setCurrentTimeMs(testStartTime);
+ final MockProcessorNode<Integer, Integer, ?, ?>
anchoredProcessorSystemTime = new
MockProcessorNode<>(Instant.ofEpochMilli(testStartTime - 10), 10L,
PunctuationType.WALL_CLOCK_TIME);
+ task = createStatelessTaskWithAnchoredPunctuation(createConfig("100"),
anchoredProcessorSystemTime);
+ task.initializeIfNeeded();
+ task.completeRestoration(noOpResetter -> { });
+
+ assertFalse(task.canPunctuateSystemTime());
+ assertFalse(task.maybePunctuateSystemTime());
+
+ // Expects the punctuations to happen at a time satisfying startTime +
n * interval
+ // where n is a positive number or 0
+ time.sleep(9);
+ assertFalse(task.canPunctuateSystemTime());
+ assertFalse(task.maybePunctuateSystemTime());
+ time.sleep(1);
+ assertTrue(task.canPunctuateSystemTime());
+ assertTrue(task.maybePunctuateSystemTime());
+ time.sleep(10);
+ assertTrue(task.canPunctuateSystemTime());
+ assertTrue(task.maybePunctuateSystemTime());
+
anchoredProcessorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
testStartTime + 10, testStartTime + 20);
+ }
+
@Test
public void
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime()
{
when(stateManager.taskId()).thenReturn(taskId);
@@ -2016,6 +2113,15 @@ public class StreamTaskTest {
task.schedule(1, PunctuationType.STREAM_TIME, timestamp -> { });
}
+ @Test
+ public void shouldNotThrowExceptionOnAnchoredSchedule() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig("100"));
+ task.processorContext().setCurrentNode(processorStreamTime);
+ task.schedule(Instant.ofEpochMilli(1000), 1,
PunctuationType.STREAM_TIME, timestamp -> { });
+ }
+
@Test
public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
when(stateManager.taskId()).thenReturn(taskId);
@@ -3352,6 +3458,46 @@ public class StreamTaskTest {
);
}
+ private StreamTask createStatelessTaskWithAnchoredPunctuation(
+ final StreamsConfig config,
+ final MockProcessorNode<Integer, Integer, ?, ?>
anchoredProcessorSystemTime
+ ) {
+ final ProcessorTopology topology = withSources(
+ asList(source1, source2, anchoredProcessorStreamTime,
anchoredProcessorSystemTime),
+ mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2))
+ );
+
+ source1.addChild(anchoredProcessorStreamTime);
+ source2.addChild(anchoredProcessorStreamTime);
+ source1.addChild(anchoredProcessorSystemTime);
+ source2.addChild(anchoredProcessorSystemTime);
+
+ final InternalProcessorContext<?, ?> context = new
ProcessorContextImpl(
+ taskId,
+ config,
+ stateManager,
+ streamsMetrics,
+ null
+ );
+
+ return new StreamTask(
+ taskId,
+ partitions,
+ topology,
+ consumer,
+ new TopologyConfig(null, config, new
Properties()).getTaskConfig(),
+ new StreamsMetricsImpl(metrics, "test", "processId",
"applicationId", time),
+ stateDirectory,
+ cache,
+ time,
+ stateManager,
+ recordCollector,
+ context,
+ logContext,
+ false
+ );
+ }
+
private StreamTask createStatelessTaskWithForwardingTopology(final
SourceNode<Integer, Integer> sourceNode) {
final ProcessorTopology topology = withSources(
asList(sourceNode, processorStreamTime),
diff --git
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 1361df9a09e..debe7066c2f 100644
---
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -58,6 +58,7 @@ import
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -324,6 +325,14 @@ public class InternalMockProcessorContext<KOut, VOut>
throw new UnsupportedOperationException("schedule() not supported.");
}
+ @Override
+ public Cancellable schedule(final Instant startTime,
+ final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback) throws
IllegalArgumentException {
+ throw new UnsupportedOperationException("schedule() not supported.");
+ }
+
@Override
public void commit() {}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
index fed0d625982..573b9b9f912 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
@@ -26,6 +26,7 @@ import
org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -46,6 +47,7 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut>
implements Processor<KIn, VI
private Cancellable scheduleCancellable;
private final PunctuationType punctuationType;
+ private final Instant startTime;
private final long scheduleInterval;
private boolean commitRequested = false;
@@ -54,6 +56,15 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut>
implements Processor<KIn, VI
public MockApiProcessor(final PunctuationType punctuationType,
final long scheduleInterval) {
this.punctuationType = punctuationType;
+ this.startTime = null; // unanchored schedule so start time is
undefined
+ this.scheduleInterval = scheduleInterval;
+ }
+
+ public MockApiProcessor(final PunctuationType punctuationType,
+ final Instant startTime,
+ final long scheduleInterval) {
+ this.punctuationType = punctuationType;
+ this.startTime = startTime;
this.scheduleInterval = scheduleInterval;
}
@@ -65,10 +76,17 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut>
implements Processor<KIn, VI
public void init(final ProcessorContext<KOut, VOut> context) {
this.context = context;
if (scheduleInterval > 0L) {
- scheduleCancellable = context.schedule(
- Duration.ofMillis(scheduleInterval),
- punctuationType,
- (punctuationType == PunctuationType.STREAM_TIME ?
punctuatedStreamTime : punctuatedSystemTime)::add
+ scheduleCancellable = (startTime == null)
+ ? context.schedule(
+ Duration.ofMillis(scheduleInterval),
+ punctuationType,
+ (punctuationType == PunctuationType.STREAM_TIME ?
punctuatedStreamTime : punctuatedSystemTime)::add
+ )
+ : context.schedule(
+ startTime,
+ Duration.ofMillis(scheduleInterval),
+ punctuationType,
+ (punctuationType == PunctuationType.STREAM_TIME ?
punctuatedStreamTime : punctuatedSystemTime)::add
);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 8c1812ccf14..a4832d4471f 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -37,6 +38,10 @@ public class MockProcessor<KIn, VIn, KOut, VOut> implements
Processor<KIn, VIn,
delegate = new MockApiProcessor<>(punctuationType, scheduleInterval);
}
+ public MockProcessor(final PunctuationType punctuationType, final Instant
startTime, final long scheduleInterval) {
+ delegate = new MockApiProcessor<>(punctuationType, startTime,
scheduleInterval);
+ }
+
public MockProcessor() {
delegate = new MockApiProcessor<>();
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 28b8f26bc93..dd1187ba129 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,6 +43,10 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends
ProcessorNode<KIn,
this(new MockProcessor<>(punctuationType, scheduleInterval));
}
+ public MockProcessorNode(final Instant startTime, final long
scheduleInterval, final PunctuationType punctuationType) {
+ this(new MockProcessor<>(punctuationType, startTime,
scheduleInterval));
+ }
+
public MockProcessorNode() {
this(new MockProcessor<>());
}
diff --git
a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 734f744f20f..38059cb93d7 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
import java.time.Duration;
+import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -79,6 +80,14 @@ public class NoOpProcessorContext extends
AbstractProcessorContext<Object, Objec
return null;
}
+ @Override
+ public Cancellable schedule(final Instant startTime,
+ final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback) {
+ return null;
+ }
+
@Override
public <K, V> void forward(final Record<K, V> record) {
forwardedValues.put(record.key(), record.value());
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 08cf542305c..4cd29b5bc64 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -34,6 +34,7 @@ import
org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import java.io.File;
import java.time.Duration;
+import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -80,17 +81,31 @@ public class MockProcessorContext implements
ProcessorContext, RecordCollector.S
* {@link CapturedPunctuator} holds captured punctuators, along with their
scheduling information.
*/
public static class CapturedPunctuator {
+ private final Instant startTime;
private final long intervalMs;
private final PunctuationType type;
private final Punctuator punctuator;
private boolean cancelled = false;
private CapturedPunctuator(final long intervalMs, final
PunctuationType type, final Punctuator punctuator) {
+ this.startTime = null; // unanchored punctuator so start time is
undefined
+ this.intervalMs = intervalMs;
+ this.type = type;
+ this.punctuator = punctuator;
+ }
+
+ private CapturedPunctuator(final Instant startTime, final long
intervalMs, final PunctuationType type, final Punctuator punctuator) {
+ this.startTime = startTime;
this.intervalMs = intervalMs;
this.type = type;
this.punctuator = punctuator;
}
+ @SuppressWarnings({"WeakerAccess", "unused"})
+ public Instant getStartTime() {
+ return startTime;
+ }
+
@SuppressWarnings({"WeakerAccess", "unused"})
public long getIntervalMs() {
return intervalMs;
@@ -463,9 +478,23 @@ public class MockProcessorContext implements
ProcessorContext, RecordCollector.S
throw new IllegalArgumentException("The minimum supported
scheduling interval is 1 millisecond.");
}
final CapturedPunctuator capturedPunctuator = new
CapturedPunctuator(intervalMs, type, callback);
-
punctuators.add(capturedPunctuator);
+ return capturedPunctuator::cancel; }
+ @Override
+ public Cancellable schedule(final Instant startTime,
+ final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback) throws
IllegalArgumentException {
+ final long intervalMs = ApiUtils.validateMillisecondDuration(interval,
"interval");
+ if (intervalMs < 1) {
+ throw new IllegalArgumentException("The minimum supported
scheduling interval is 1 millisecond.");
+ }
+ if (startTime.isBefore(Instant.EPOCH)) {
+ throw new IllegalArgumentException("The minimum supported start
time is Instant.EPOCH.");
+ }
+ final CapturedPunctuator capturedPunctuator = new
CapturedPunctuator(startTime, intervalMs, type, callback);
+ punctuators.add(capturedPunctuator);
return capturedPunctuator::cancel;
}
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
index 25f728cd567..48fad67f0f4 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -41,6 +41,7 @@ import
org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import java.io.File;
import java.time.Duration;
+import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -114,17 +115,30 @@ public class MockProcessorContext<KForward, VForward>
implements ProcessorContex
* {@link CapturedPunctuator} holds captured punctuators, along with their
scheduling information.
*/
public static final class CapturedPunctuator {
+ private final Instant startTime;
private final Duration interval;
private final PunctuationType type;
private final Punctuator punctuator;
private boolean cancelled = false;
private CapturedPunctuator(final Duration interval, final
PunctuationType type, final Punctuator punctuator) {
+ this.startTime = null; // unanchored punctuator so start time is
undefined
this.interval = interval;
this.type = type;
this.punctuator = punctuator;
}
+ private CapturedPunctuator(final Instant startTime, final Duration
interval, final PunctuationType type, final Punctuator punctuator) {
+ this.startTime = startTime;
+ this.interval = interval;
+ this.type = type;
+ this.punctuator = punctuator;
+ }
+
+ public Instant getStartTime() {
+ return startTime;
+ }
+
public Duration getInterval() {
return interval;
}
@@ -373,9 +387,16 @@ public class MockProcessorContext<KForward, VForward>
implements ProcessorContex
final PunctuationType type,
final Punctuator callback) {
final CapturedPunctuator capturedPunctuator = new
CapturedPunctuator(interval, type, callback);
-
punctuators.add(capturedPunctuator);
+ return capturedPunctuator::cancel; }
+ @Override
+ public Cancellable schedule(final Instant startTime,
+ final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback) {
+ final CapturedPunctuator capturedPunctuator = new
CapturedPunctuator(startTime, interval, type, callback);
+ punctuators.add(capturedPunctuator);
return capturedPunctuator::cancel;
}
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index c760e5dd1d6..cacedfafbf7 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.time.Duration;
+import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -445,6 +446,45 @@ public class MockProcessorContextTest {
assertTrue(context.committed());
}
+ @Test
+ public void shouldCaptureAnchoredPunctuator() {
+ final Transformer<String, Long, KeyValue<String, Long>> transformer =
new Transformer<>() {
+ @Override
+ public void init(final ProcessorContext context) {
+ context.schedule(
+ Instant.ofEpochMilli(1000),
+ Duration.ofSeconds(1L),
+ PunctuationType.WALL_CLOCK_TIME,
+ timestamp -> context.commit()
+ );
+ }
+
+ @Override
+ public KeyValue<String, Long> transform(final String key, final
Long value) {
+ return null;
+ }
+
+ @Override
+ public void close() { }
+ };
+
+ final MockProcessorContext context = new MockProcessorContext();
+
+ transformer.init(context);
+
+ final MockProcessorContext.CapturedPunctuator capturedPunctuator =
context.scheduledPunctuators().get(0);
+ assertEquals(Instant.ofEpochMilli(1000),
capturedPunctuator.getStartTime());
+ assertEquals(1000L, capturedPunctuator.getIntervalMs());
+ assertEquals(PunctuationType.WALL_CLOCK_TIME,
capturedPunctuator.getType());
+ assertFalse(capturedPunctuator.cancelled());
+
+ final Punctuator punctuator = capturedPunctuator.getPunctuator();
+ assertFalse(context.committed());
+
+ punctuator.punctuate(1234L);
+ assertTrue(context.committed());
+ }
+
@SuppressWarnings("resource")
@Test
public void fullConstructorShouldSetAllExpectedAttributes() {