This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 25250a47e105ccdfa18a7e34fc3108245ffd83c0 Author: Piotr Nowojski <[email protected]> AuthorDate: Fri Aug 9 10:31:39 2024 +0200 [FLINK-35886][task] Expose Clock from ProcessingTimeService Operators have to be serializable and they will also need an access to Clock to construct ProgressBlockingRelativeClock. Because we also want to be able to provide for testing purposes ManualClock we have to find a way how Operators could obtain a Clock instance. Exposing Clock from ProcessingTimeService sounds like a good place as it also will provide a since source of processing time for potential users (for example ProgressBlockingRelativeClock and firing timers) --- .../runtime/NeverFireProcessingTimeService.java | 6 +- .../runtime/tasks/ProcessingTimeService.java | 10 ++++ .../runtime/tasks/ProcessingTimeServiceImpl.java | 5 +- .../runtime/tasks/SystemProcessingTimeService.java | 6 +- .../runtime/tasks/TestProcessingTimeService.java | 66 +++++++++++++++++++--- 5 files changed, 79 insertions(+), 14 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java index 3f1860549d8..2bb2f06c55c 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java @@ -19,6 +19,8 @@ package org.apache.flink.state.api.runtime; import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.runtime.tasks.TimerService; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.NeverCompleteFuture; import java.util.concurrent.CompletableFuture; @@ -33,8 +35,8 @@ public final class NeverFireProcessingTimeService implements TimerService { private AtomicBoolean shutdown = new AtomicBoolean(true); @Override - public long getCurrentProcessingTime() { - return System.currentTimeMillis(); + public Clock getClock() { + return SystemClock.getInstance(); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java index bd551e854fb..743f48b38a3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.util.clock.Clock; import org.apache.flink.util.concurrent.ScheduledExecutor; import java.util.concurrent.CompletableFuture; @@ -32,6 +33,15 @@ import java.util.concurrent.TimeUnit; */ public interface ProcessingTimeService extends org.apache.flink.api.common.operators.ProcessingTimeService { + + @Override + default long getCurrentProcessingTime() { + return getClock().absoluteTimeMillis(); + } + + /** Returns {@link Clock} associated with this timer service. */ + Clock getClock(); + /** * Registers a task to be executed repeatedly at a fixed rate. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java index 3a42ed9f8b3..e3b5b9ed475 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; +import org.apache.flink.util.clock.Clock; import org.apache.flink.util.concurrent.NeverCompleteFuture; import java.util.concurrent.CompletableFuture; @@ -53,8 +54,8 @@ class ProcessingTimeServiceImpl implements ProcessingTimeService { } @Override - public long getCurrentProcessingTime() { - return timerService.getCurrentProcessingTime(); + public Clock getClock() { + return timerService.getClock(); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java index fee6a646475..73722249340 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.NeverCompleteFuture; import org.slf4j.Logger; @@ -86,8 +88,8 @@ public class SystemProcessingTimeService implements TimerService { } @Override - public long getCurrentProcessingTime() { - return System.currentTimeMillis(); + public Clock getClock() { + return SystemClock.getInstance(); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index 8229076393a..2b54390d986 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -19,7 +19,9 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.clock.Clock; +import java.time.Duration; import java.util.Comparator; import java.util.HashSet; import java.util.PriorityQueue; @@ -30,6 +32,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** @@ -38,14 +41,14 @@ import java.util.concurrent.atomic.AtomicReference; */ public class TestProcessingTimeService implements TimerService { - private volatile long currentTime = Long.MIN_VALUE; - private volatile boolean isTerminated; private volatile boolean isQuiesced; // sorts the timers by timestamp so that they are processed in the correct order. private final PriorityQueue<Tuple2<Long, CallbackTask>> priorityQueue; + private final ManualMSClock clock = new ManualMSClock(Long.MIN_VALUE); + public TestProcessingTimeService() { this.priorityQueue = new PriorityQueue<>( @@ -60,14 +63,19 @@ public class TestProcessingTimeService implements TimerService { } public void advance(long delta) throws Exception { - setCurrentTime(this.currentTime + delta); + clock.advanceTime(Duration.ofMillis(delta)); + maybeFireTimers(); } public void setCurrentTime(long timestamp) throws Exception { - this.currentTime = timestamp; + clock.setCurrentTime(timestamp, TimeUnit.MILLISECONDS); + maybeFireTimers(); + } + private void maybeFireTimers() throws Exception { if (!isQuiesced) { - while (!priorityQueue.isEmpty() && currentTime >= priorityQueue.peek().f0) { + while (!priorityQueue.isEmpty() + && getCurrentProcessingTime() >= priorityQueue.peek().f0) { Tuple2<Long, CallbackTask> entry = priorityQueue.poll(); CallbackTask callbackTask = entry.f1; @@ -88,8 +96,8 @@ public class TestProcessingTimeService implements TimerService { } @Override - public long getCurrentProcessingTime() { - return currentTime; + public Clock getClock() { + return clock; } @Override @@ -121,7 +129,8 @@ public class TestProcessingTimeService implements TimerService { PeriodicCallbackTask periodicCallbackTask = new PeriodicCallbackTask(callback, period); priorityQueue.offer( - Tuple2.<Long, CallbackTask>of(currentTime + initialDelay, periodicCallbackTask)); + Tuple2.<Long, CallbackTask>of( + getCurrentProcessingTime() + initialDelay, periodicCallbackTask)); return periodicCallbackTask; } @@ -264,4 +273,45 @@ public class TestProcessingTimeService implements TimerService { return currentTimestamp + period; } } + + /** + * Similar to {@link org.apache.flink.util.clock.ManualClock}, but with ms precision and thus + * greater range. This is needed to support registering and firing timers with {@link + * Long#MAX_VALUE}. + */ + private static class ManualMSClock extends Clock { + private final AtomicLong currentTime; + + public ManualMSClock(long startTime) { + this.currentTime = new AtomicLong(startTime); + } + + @Override + public long absoluteTimeMillis() { + return currentTime.get(); + } + + @Override + public long relativeTimeMillis() { + return currentTime.get(); + } + + @Override + public long relativeTimeNanos() { + return currentTime.get() * 1_000_000; + } + + /** + * Advances the time by the given duration. Time can also move backwards by supplying a + * negative value. This method performs no overflow check. + */ + public void advanceTime(Duration duration) { + currentTime.addAndGet(duration.toMillis()); + } + + /** Sets the time to the given value. */ + public void setCurrentTime(long time, TimeUnit timeUnit) { + currentTime.set(timeUnit.toMillis(time)); + } + } }
