This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b8ab2d66e046fcea225ed20a1cc50100e2566281 Author: Piotr Nowojski <[email protected]> AuthorDate: Mon Aug 5 17:36:38 2024 +0200 [FLINK-35886][task] Implement PausableRelativeClock --- .../api/operators/util/PausableRelativeClock.java | 96 ++++++++++++++++++++++ .../operators/util/PausableRelativeClockTest.java | 80 ++++++++++++++++++ 2 files changed, 176 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/util/PausableRelativeClock.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/util/PausableRelativeClock.java new file mode 100644 index 00000000000..f7cc275b86d --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/util/PausableRelativeClock.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.metrics.TimerGauge; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.RelativeClock; + +import javax.annotation.concurrent.ThreadSafe; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link RelativeClock} whose time progress with respect to the wall clock can be paused and + * un-paused. It can be paused multiple times. If it is paused N times, it has to be un-paused also + * N times to resume progress. + */ +@Internal +@ThreadSafe +public class PausableRelativeClock implements RelativeClock, TimerGauge.StartStopListener { + private final Clock baseClock; + + private long accumulativeBlockedNanoTime; + private long currentBlockedNanoTimeStart; + /** How many times this clock has been paused. */ + private long pausedCounter; + + public PausableRelativeClock(Clock baseClock) { + this.baseClock = baseClock; + } + + @Override + public long relativeTimeMillis() { + return relativeTimeNanos() / 1_000_000; + } + + @Override + public synchronized long relativeTimeNanos() { + long now = baseClock.relativeTimeNanos(); + // we offset relativeTimeNanos by the time this clock has been paused. + return now - getBlockedTime(now); + } + + /** @return how long this {@link PausableRelativeClock} has been paused so far. */ + private long getBlockedTime(long now) { + long blockedTime = accumulativeBlockedNanoTime; + if (pausedCounter != 0) { + // If we are paused right now, add the time since when we were paused + blockedTime += now - currentBlockedNanoTimeStart; + } + return blockedTime; + } + + public synchronized void pause() { + if (pausedCounter == 0) { + currentBlockedNanoTimeStart = baseClock.relativeTimeNanos(); + } + pausedCounter++; + } + + public synchronized void unPause() { + checkState(pausedCounter >= 1); + pausedCounter--; + if (pausedCounter == 0) { + accumulativeBlockedNanoTime += + baseClock.relativeTimeNanos() - currentBlockedNanoTimeStart; + } + } + + @Override + public void markStart() { + pause(); + } + + @Override + public void markEnd() { + unPause(); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/util/PausableRelativeClockTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/util/PausableRelativeClockTest.java new file mode 100644 index 00000000000..a7ff6fef822 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/util/PausableRelativeClockTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.util; + +import org.apache.flink.util.clock.ManualClock; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +class PausableRelativeClockTest { + private static final long TIME_STEP = 14; + + @Test + void simpleTest() { + ManualClock baseClock = new ManualClock(); + PausableRelativeClock relativeClock = new PausableRelativeClock(baseClock); + + long startMillis = relativeClock.relativeTimeMillis(); + long startNanos = relativeClock.relativeTimeNanos(); + baseClock.advanceTime(Duration.ofMillis(TIME_STEP)); + long endNanos = relativeClock.relativeTimeNanos(); + long endMillis = relativeClock.relativeTimeMillis(); + + long durationNanos = endNanos - startNanos; + long durationMillis = endMillis - startMillis; + + assertThat((durationNanos) / 1_000_000).isEqualTo(TIME_STEP); + assertThat(durationMillis).isEqualTo(TIME_STEP); + } + + @Test + void pausedTest() throws Exception { + ManualClock baseClock = new ManualClock(); + PausableRelativeClock relativeClock = new PausableRelativeClock(baseClock); + + long startMillis = relativeClock.relativeTimeMillis(); + long startNanos = relativeClock.relativeTimeNanos(); + + baseClock.advanceTime(Duration.ofMillis(TIME_STEP)); + relativeClock.pause(); + baseClock.advanceTime(Duration.ofMillis(TIME_STEP)); // doesn't count + relativeClock.pause(); + baseClock.advanceTime(Duration.ofMillis(TIME_STEP)); // doesn't count + relativeClock.unPause(); + baseClock.advanceTime(Duration.ofMillis(TIME_STEP)); // doesn't count + relativeClock.unPause(); + baseClock.advanceTime(Duration.ofMillis(TIME_STEP)); + relativeClock.pause(); + baseClock.advanceTime(Duration.ofMillis(TIME_STEP)); // doesn't count + // leave the clock blocked + + long endNanos = relativeClock.relativeTimeNanos(); + long endMillis = relativeClock.relativeTimeMillis(); + + long durationNanos = endNanos - startNanos; + long durationMillis = endMillis - startMillis; + + assertThat((durationNanos) / 1_000_000).isEqualTo(TIME_STEP * 2); + assertThat(durationMillis).isEqualTo(TIME_STEP * 2); + } +}
