This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 85f3be9dc0848d4f0b9e5337583d40f4fed2df85 Author: Igal Shilman <igalshil...@gmail.com> AuthorDate: Thu Feb 13 14:16:09 2020 +0100 [FLINK-15956] Add an exponential backoff This commit adds an exponential backoff mechanism with a total upper limit for the execution time. --- .../backpressure/BoundedExponentialBackoff.java | 66 ++++++++++++++++++++++ .../flink/core/backpressure/SystemNanoTimer.java | 50 ++++++++++++++++ .../statefun/flink/core/backpressure/Timer.java | 26 +++++++++ .../BoundedExponentialBackoffTest.java | 65 +++++++++++++++++++++ 4 files changed, 207 insertions(+) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoff.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoff.java new file mode 100644 index 0000000..52622eb --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoff.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.backpressure; + +import java.time.Duration; +import java.util.Objects; +import org.apache.flink.annotation.VisibleForTesting; + +public final class BoundedExponentialBackoff { + private final Timer timer; + private final long requestStartTimeInNanos; + private final long maxRequestDurationInNanos; + + private long nextSleepTimeNanos; + + public BoundedExponentialBackoff(Duration initialBackoffDuration, Duration maxRequestDuration) { + this(SystemNanoTimer.instance(), initialBackoffDuration, maxRequestDuration); + } + + @VisibleForTesting + BoundedExponentialBackoff( + Timer timer, Duration initialBackoffDuration, Duration maxRequestDuration) { + this.timer = Objects.requireNonNull(timer); + this.requestStartTimeInNanos = timer.now(); + this.maxRequestDurationInNanos = maxRequestDuration.toNanos(); + this.nextSleepTimeNanos = initialBackoffDuration.toNanos(); + } + + public boolean applyNow() { + final long remainingNanos = remainingNanosUntilDeadLine(); + final long nextAmountOfNanosToSleep = nextAmountOfNanosToSleep(); + final long actualSleep = Math.min(remainingNanos, nextAmountOfNanosToSleep); + if (actualSleep <= 0) { + return false; + } + timer.sleep(actualSleep); + return true; + } + + private long remainingNanosUntilDeadLine() { + final long totalElapsedTime = timer.now() - requestStartTimeInNanos; + return maxRequestDurationInNanos - totalElapsedTime; + } + + private long nextAmountOfNanosToSleep() { + final long current = nextSleepTimeNanos; + nextSleepTimeNanos *= 2; + return current; + } +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/SystemNanoTimer.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/SystemNanoTimer.java new file mode 100644 index 0000000..3eaefe3 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/SystemNanoTimer.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.backpressure; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** A {@code Timer} backed by {@link System#nanoTime()}. */ +final class SystemNanoTimer implements Timer { + private static final SystemNanoTimer INSTANCE = new SystemNanoTimer(); + + public static SystemNanoTimer instance() { + return INSTANCE; + } + + private SystemNanoTimer() {} + + @Override + public long now() { + return System.nanoTime(); + } + + @Override + public void sleep(long sleepTimeNanos) { + try { + final long sleepTimeMs = NANOSECONDS.toMillis(sleepTimeNanos); + Thread.sleep(sleepTimeMs); + } catch (InterruptedException ex) { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + throw new RuntimeException("interrupted while sleeping", ex); + } + } + } +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/Timer.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/Timer.java new file mode 100644 index 0000000..2651a52 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/Timer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.backpressure; + +interface Timer { + + long now(); + + void sleep(long durationNanos); +} diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoffTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoffTest.java new file mode 100644 index 0000000..6b0eed5 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoffTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.flink.core.backpressure; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import java.time.Duration; +import org.junit.Test; + +public class BoundedExponentialBackoffTest { + private final FakeNanoClock fakeTime = new FakeNanoClock(); + private final BoundedExponentialBackoff backoffUnderTest = + new BoundedExponentialBackoff(fakeTime, Duration.ofSeconds(1), Duration.ofMinutes(1)); + + @Test + public void simpleUsage() { + assertThat(backoffUnderTest.applyNow(), is(true)); + assertThat(fakeTime.now(), greaterThan(0L)); + } + + @Test + public void timeoutExpired() { + fakeTime.now = Duration.ofMinutes(1).toNanos(); + assertThat(backoffUnderTest.applyNow(), is(false)); + } + + @Test + @SuppressWarnings("StatementWithEmptyBody") + public void totalNumberOfBackoffsIsEqualToTimeout() { + while (backoffUnderTest.applyNow()) {} + + assertThat(fakeTime.now(), is(Duration.ofMinutes(1).toNanos())); + } + + private static final class FakeNanoClock implements Timer { + long now; + + @Override + public long now() { + return now; + } + + @Override + public void sleep(long durationNano) { + now += durationNano; + } + } +}