This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 85f3be9dc0848d4f0b9e5337583d40f4fed2df85
Author: Igal Shilman <igalshil...@gmail.com>
AuthorDate: Thu Feb 13 14:16:09 2020 +0100

    [FLINK-15956] Add an exponential backoff
    
    This commit adds an exponential backoff mechanism with
    a total upper limit for the execution time.
---
 .../backpressure/BoundedExponentialBackoff.java    | 66 ++++++++++++++++++++++
 .../flink/core/backpressure/SystemNanoTimer.java   | 50 ++++++++++++++++
 .../statefun/flink/core/backpressure/Timer.java    | 26 +++++++++
 .../BoundedExponentialBackoffTest.java             | 65 +++++++++++++++++++++
 4 files changed, 207 insertions(+)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoff.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoff.java
new file mode 100644
index 0000000..52622eb
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoff.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.backpressure;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.annotation.VisibleForTesting;
+
+public final class BoundedExponentialBackoff {
+  private final Timer timer;
+  private final long requestStartTimeInNanos;
+  private final long maxRequestDurationInNanos;
+
+  private long nextSleepTimeNanos;
+
+  public BoundedExponentialBackoff(Duration initialBackoffDuration, Duration 
maxRequestDuration) {
+    this(SystemNanoTimer.instance(), initialBackoffDuration, 
maxRequestDuration);
+  }
+
+  @VisibleForTesting
+  BoundedExponentialBackoff(
+      Timer timer, Duration initialBackoffDuration, Duration 
maxRequestDuration) {
+    this.timer = Objects.requireNonNull(timer);
+    this.requestStartTimeInNanos = timer.now();
+    this.maxRequestDurationInNanos = maxRequestDuration.toNanos();
+    this.nextSleepTimeNanos = initialBackoffDuration.toNanos();
+  }
+
+  public boolean applyNow() {
+    final long remainingNanos = remainingNanosUntilDeadLine();
+    final long nextAmountOfNanosToSleep = nextAmountOfNanosToSleep();
+    final long actualSleep = Math.min(remainingNanos, 
nextAmountOfNanosToSleep);
+    if (actualSleep <= 0) {
+      return false;
+    }
+    timer.sleep(actualSleep);
+    return true;
+  }
+
+  private long remainingNanosUntilDeadLine() {
+    final long totalElapsedTime = timer.now() - requestStartTimeInNanos;
+    return maxRequestDurationInNanos - totalElapsedTime;
+  }
+
+  private long nextAmountOfNanosToSleep() {
+    final long current = nextSleepTimeNanos;
+    nextSleepTimeNanos *= 2;
+    return current;
+  }
+}
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/SystemNanoTimer.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/SystemNanoTimer.java
new file mode 100644
index 0000000..3eaefe3
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/SystemNanoTimer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.backpressure;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/** A {@code Timer} backed by {@link System#nanoTime()}. */
+final class SystemNanoTimer implements Timer {
+  private static final SystemNanoTimer INSTANCE = new SystemNanoTimer();
+
+  public static SystemNanoTimer instance() {
+    return INSTANCE;
+  }
+
+  private SystemNanoTimer() {}
+
+  @Override
+  public long now() {
+    return System.nanoTime();
+  }
+
+  @Override
+  public void sleep(long sleepTimeNanos) {
+    try {
+      final long sleepTimeMs = NANOSECONDS.toMillis(sleepTimeNanos);
+      Thread.sleep(sleepTimeMs);
+    } catch (InterruptedException ex) {
+      if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException("interrupted while sleeping", ex);
+      }
+    }
+  }
+}
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/Timer.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/Timer.java
new file mode 100644
index 0000000..2651a52
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/Timer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.backpressure;
+
+interface Timer {
+
+  long now();
+
+  void sleep(long durationNanos);
+}
diff --git 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoffTest.java
 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoffTest.java
new file mode 100644
index 0000000..6b0eed5
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoffTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.backpressure;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.time.Duration;
+import org.junit.Test;
+
+public class BoundedExponentialBackoffTest {
+  private final FakeNanoClock fakeTime = new FakeNanoClock();
+  private final BoundedExponentialBackoff backoffUnderTest =
+      new BoundedExponentialBackoff(fakeTime, Duration.ofSeconds(1), 
Duration.ofMinutes(1));
+
+  @Test
+  public void simpleUsage() {
+    assertThat(backoffUnderTest.applyNow(), is(true));
+    assertThat(fakeTime.now(), greaterThan(0L));
+  }
+
+  @Test
+  public void timeoutExpired() {
+    fakeTime.now = Duration.ofMinutes(1).toNanos();
+    assertThat(backoffUnderTest.applyNow(), is(false));
+  }
+
+  @Test
+  @SuppressWarnings("StatementWithEmptyBody")
+  public void totalNumberOfBackoffsIsEqualToTimeout() {
+    while (backoffUnderTest.applyNow()) {}
+
+    assertThat(fakeTime.now(), is(Duration.ofMinutes(1).toNanos()));
+  }
+
+  private static final class FakeNanoClock implements Timer {
+    long now;
+
+    @Override
+    public long now() {
+      return now;
+    }
+
+    @Override
+    public void sleep(long durationNano) {
+      now += durationNano;
+    }
+  }
+}

Reply via email to