This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 13b7b3e [FLINK-19773][runtime] Implement
ExponentialDelayRestartBackoffTimeStrategy
13b7b3e is described below
commit 13b7b3e6050ee716d01c9d3d74d08702f93953a8
Author: Marek Sabo <[email protected]>
AuthorDate: Mon Dec 7 16:50:15 2020 +0100
[FLINK-19773][runtime] Implement ExponentialDelayRestartBackoffTimeStrategy
Apply spotless formatting
This closes #14425.
---
...ntial_delay_restart_strategy_configuration.html | 42 ++++
.../generated/restart_strategy_configuration.html | 2 +-
docs/dev/task_failure_recovery.md | 54 +++++
.../common/restartstrategy/RestartStrategies.java | 131 ++++++++++++
.../configuration/RestartStrategyOptions.java | 75 +++++++
...ExponentialDelayRestartBackoffTimeStrategy.java | 230 +++++++++++++++++++++
.../RestartBackoffTimeStrategyFactoryLoader.java | 18 ++
...nentialDelayRestartBackoffTimeStrategyTest.java | 228 ++++++++++++++++++++
...estartBackoffTimeStrategyFactoryLoaderTest.java | 39 ++++
...overyExponentialDelayRestartStrategyITBase.java | 61 ++++++
10 files changed, 879 insertions(+), 1 deletion(-)
diff --git
a/docs/_includes/generated/exponential_delay_restart_strategy_configuration.html
b/docs/_includes/generated/exponential_delay_restart_strategy_configuration.html
new file mode 100644
index 0000000..a0b51c7
--- /dev/null
+++
b/docs/_includes/generated/exponential_delay_restart_strategy_configuration.html
@@ -0,0 +1,42 @@
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+
<td><h5>restart-strategy.exponential-delay.backoff-multiplier</h5></td>
+ <td style="word-wrap: break-word;">2.0</td>
+ <td>Double</td>
+ <td>Backoff value is multiplied by this value after every
failure,until max backoff is reached if <span
markdown="span">`restart-strategy`</span> has been set to <span
markdown="span">`exponential-delay`</span>.</td>
+ </tr>
+ <tr>
+
<td><h5>restart-strategy.exponential-delay.initial-backoff</h5></td>
+ <td style="word-wrap: break-word;">1 s</td>
+ <td>Duration</td>
+ <td>Starting duration between restarts if <span
markdown="span">`restart-strategy`</span> has been set to <span
markdown="span">`exponential-delay`</span>. It can be specified using notation:
"1 min", "20 s"</td>
+ </tr>
+ <tr>
+ <td><h5>restart-strategy.exponential-delay.jitter-factor</h5></td>
+ <td style="word-wrap: break-word;">0.1</td>
+ <td>Double</td>
+ <td>Jitter specified as a portion of the backoff if <span
markdown="span">`restart-strategy`</span> has been set to <span
markdown="span">`exponential-delay`</span>. It represents how large random
value will be added or subtracted to the backoff. Useful when you want to avoid
restarting multiple jobs at the same time.</td>
+ </tr>
+ <tr>
+ <td><h5>restart-strategy.exponential-delay.max-backoff</h5></td>
+ <td style="word-wrap: break-word;">5 min</td>
+ <td>Duration</td>
+ <td>The highest possible duration between restarts if <span
markdown="span">`restart-strategy`</span> has been set to <span
markdown="span">`exponential-delay`</span>. It can be specified using notation:
"1 min", "20 s"</td>
+ </tr>
+ <tr>
+
<td><h5>restart-strategy.exponential-delay.reset-backoff-threshold</h5></td>
+ <td style="word-wrap: break-word;">1 h</td>
+ <td>Duration</td>
+ <td>Threshold when the backoff is reset to its initial value if
<span markdown="span">`restart-strategy`</span> has been set to <span
markdown="span">`exponential-delay`</span>. It specifies how long the job must
be running without failure to reset the exponentially increasing backoff to its
initial value. It can be specified using notation: "1 min", "20 s"</td>
+ </tr>
+ </tbody>
+</table>
diff --git a/docs/_includes/generated/restart_strategy_configuration.html
b/docs/_includes/generated/restart_strategy_configuration.html
index f8a0748..f690cba 100644
--- a/docs/_includes/generated/restart_strategy_configuration.html
+++ b/docs/_includes/generated/restart_strategy_configuration.html
@@ -12,7 +12,7 @@
<td><h5>restart-strategy</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Defines the restart strategy to use in case of job
failures.<br />Accepted values are:<ul><li><span markdown="span">`none`</span>,
<span markdown="span">`off`</span>, <span markdown="span">`disable`</span>: No
restart strategy.</li><li><span markdown="span">`fixeddelay`</span>, <span
markdown="span">`fixed-delay`</span>: Fixed delay restart strategy. More
details can be found <a
href="../dev/task_failure_recovery.html#fixed-delay-restart-strategy">here</a>.</li><li><span
[...]
+ <td>Defines the restart strategy to use in case of job
failures.<br />Accepted values are:<ul><li><span markdown="span">`none`</span>,
<span markdown="span">`off`</span>, <span markdown="span">`disable`</span>: No
restart strategy.</li><li><span markdown="span">`fixeddelay`</span>, <span
markdown="span">`fixed-delay`</span>: Fixed delay restart strategy. More
details can be found <a
href="../dev/task_failure_recovery.html#fixed-delay-restart-strategy">here</a>.</li><li><span
[...]
</tr>
</tbody>
</table>
diff --git a/docs/dev/task_failure_recovery.md
b/docs/dev/task_failure_recovery.md
index 7cb88df..42c2903 100644
--- a/docs/dev/task_failure_recovery.md
+++ b/docs/dev/task_failure_recovery.md
@@ -125,6 +125,60 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
</div>
+### Exponential Delay Restart Strategy
+
+The exponential delay restart strategy attempts to restart the job infinitely,
with increasing delay up to the maximum delay.
+The job never fails.
+In-between two consecutive restart attempts, the restart strategy keeps
exponentially increasing until the maximum number is reached.
+Then, it keeps the delay at the maximum number.
+
+When the job executes correctly, the exponential delay value resets after some
time; this threshold is configurable.
+
+{% highlight yaml %}
+restart-strategy: exponential-delay
+{% endhighlight %}
+
+{% include generated/exponential_delay_restart_strategy_configuration.html %}
+
+For example:
+
+{% highlight yaml %}
+restart-strategy.exponential-delay.initial-backoff: 10 s
+restart-strategy.exponential-delay.max-backoff: 2 min
+restart-strategy.exponential-delay.backoff-multiplier: 2.0
+restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
+restart-strategy.exponential-delay.jitter-factor: 0.1
+{% endhighlight %}
+
+The exponential delay restart strategy can also be set programmatically:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
+ Time.milliseconds(1),
+ Time.milliseconds(1000),
+ 1.1, // exponential multiplier
+ Time.milliseconds(2000), // threshold duration to reset delay to its initial
value
+ 0.1 // jitter
+));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
+ Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts
+ Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts
+ 1.1, // exponential multiplier
+ Time.of(2, TimeUnit.SECONDS), // threshold duration to reset delay to its
initial value
+ 0.1 // jitter
+))
+{% endhighlight %}
+</div>
+</div>
+
### Failure Rate Restart Strategy
The failure rate restart strategy restarts job after failure, but when
`failure rate` (failures per time interval) is exceeded, the job eventually
fails.
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
index 5bd1663..54a9c21 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -92,6 +92,25 @@ public class RestartStrategies {
failureRate, failureInterval, delayInterval);
}
+ /**
+ * Generates a ExponentialDelayRestartStrategyConfiguration.
+ *
+ * @param initialBackoff Starting duration between restarts
+ * @param maxBackoff The highest possible duration between restarts
+ * @param backoffMultiplier Delay multiplier how many times is the delay
longer than before
+ * @param resetBackoffThreshold How long the job must run smoothly to
reset the time interval
+ * @param jitterFactor How much the delay may differ (in percentage)
+ */
+ public static ExponentialDelayRestartStrategyConfiguration
exponentialDelayRestart(
+ Time initialBackoff,
+ Time maxBackoff,
+ double backoffMultiplier,
+ Time resetBackoffThreshold,
+ double jitterFactor) {
+ return new ExponentialDelayRestartStrategyConfiguration(
+ initialBackoff, maxBackoff, backoffMultiplier,
resetBackoffThreshold, jitterFactor);
+ }
+
/** Abstract configuration for restart strategies. */
public abstract static class RestartStrategyConfiguration implements
Serializable {
private static final long serialVersionUID = 6285853591578313960L;
@@ -188,6 +207,90 @@ public class RestartStrategies {
}
}
+ /** Configuration representing an exponential delay restart strategy. */
+ public static final class ExponentialDelayRestartStrategyConfiguration
+ extends RestartStrategyConfiguration {
+ private static final long serialVersionUID = 1467941615941965194L;
+
+ private final Time initialBackoff;
+ private final Time maxBackoff;
+ private final double backoffMultiplier;
+ private final Time resetBackoffThreshold;
+ private final double jitterFactor;
+
+ public ExponentialDelayRestartStrategyConfiguration(
+ Time initialBackoff,
+ Time maxBackoff,
+ double backoffMultiplier,
+ Time resetBackoffThreshold,
+ double jitterFactor) {
+ this.initialBackoff = initialBackoff;
+ this.maxBackoff = maxBackoff;
+ this.backoffMultiplier = backoffMultiplier;
+ this.resetBackoffThreshold = resetBackoffThreshold;
+ this.jitterFactor = jitterFactor;
+ }
+
+ public Time getInitialBackoff() {
+ return initialBackoff;
+ }
+
+ public Time getMaxBackoff() {
+ return maxBackoff;
+ }
+
+ public double getBackoffMultiplier() {
+ return backoffMultiplier;
+ }
+
+ public Time getResetBackoffThreshold() {
+ return resetBackoffThreshold;
+ }
+
+ public double getJitterFactor() {
+ return jitterFactor;
+ }
+
+ @Override
+ public String getDescription() {
+ return String.format(
+ "Restart with exponential delay: starting at %s,
increasing by %f, with maximum %s. "
+ + "Delay resets after %s with jitter %f",
+ initialBackoff,
+ backoffMultiplier,
+ maxBackoff,
+ resetBackoffThreshold,
+ jitterFactor);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExponentialDelayRestartStrategyConfiguration that =
+ (ExponentialDelayRestartStrategyConfiguration) o;
+ return Double.compare(that.backoffMultiplier, backoffMultiplier)
== 0
+ && Double.compare(that.jitterFactor, jitterFactor) == 0
+ && Objects.equals(initialBackoff, that.initialBackoff)
+ && Objects.equals(maxBackoff, that.maxBackoff)
+ && Objects.equals(resetBackoffThreshold,
that.resetBackoffThreshold);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = initialBackoff.hashCode();
+ result = 31 * result + maxBackoff.hashCode();
+ result = 31 * result + (int) backoffMultiplier;
+ result = 31 * result + resetBackoffThreshold.hashCode();
+ result = 31 * result + (int) jitterFactor;
+ return result;
+ }
+ }
+
/** Configuration representing a failure rate restart strategy. */
public static final class FailureRateRestartStrategyConfiguration
extends RestartStrategyConfiguration {
@@ -304,6 +407,34 @@ public class RestartStrategies {
configuration.get(
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY);
return fixedDelayRestart(attempts, delay.toMillis());
+ case "exponentialdelay":
+ case "exponential-delay":
+ Duration initialBackoff =
+ configuration.get(
+ RestartStrategyOptions
+
.RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF);
+ Duration maxBackoff =
+ configuration.get(
+ RestartStrategyOptions
+
.RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF);
+ double backoffMultiplier =
+ configuration.get(
+ RestartStrategyOptions
+
.RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER);
+ Duration resetBackoffThreshold =
+ configuration.get(
+ RestartStrategyOptions
+
.RESTART_STRATEGY_EXPONENTIAL_DELAY_RESET_BACKOFF_THRESHOLD);
+ double jitter =
+ configuration.get(
+ RestartStrategyOptions
+
.RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR);
+ return exponentialDelayRestart(
+ Time.milliseconds(initialBackoff.toMillis()),
+ Time.milliseconds(maxBackoff.toMillis()),
+ backoffMultiplier,
+ Time.milliseconds(resetBackoffThreshold.toMillis()),
+ jitter);
case "failurerate":
case "failure-rate":
int maxFailures =
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java
index 198fba2..c78511c 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java
@@ -34,6 +34,9 @@ import static
org.apache.flink.configuration.description.TextElement.text;
@ConfigGroups(
groups = {
@ConfigGroup(
+ name = "ExponentialDelayRestartStrategy",
+ keyPrefix = "restart-strategy.exponential-delay"),
+ @ConfigGroup(
name = "FixedDelayRestartStrategy",
keyPrefix = "restart-strategy.fixed-delay"),
@ConfigGroup(
@@ -68,6 +71,13 @@ public class RestartStrategyOptions {
code("failure-rate"),
link(
"../dev/task_failure_recovery.html#failure-rate-restart-strategy",
+ "here")),
+ text(
+ "%s, %s: Exponential delay
restart strategy. More details can be found %s.",
+ code("exponentialdelay"),
+ code("exponential-delay"),
+ link(
+
"../dev/task_failure_recovery.html#exponential-delay-restart-strategy",
"here")))
.text(
"If checkpointing is disabled, the
default value is %s. "
@@ -139,6 +149,71 @@ public class RestartStrategyOptions {
code(RESTART_STRATEGY.key()),
code("failure-rate"))
.build());
+ public static final ConfigOption<Duration>
RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF =
+
ConfigOptions.key("restart-strategy.exponential-delay.initial-backoff")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription(
+ Description.builder()
+ .text(
+ "Starting duration between
restarts if %s has been set to %s. "
+ + "It can be specified
using notation: \"1 min\", \"20 s\"",
+ code(RESTART_STRATEGY.key()),
code("exponential-delay"))
+ .build());
+
+ public static final ConfigOption<Duration>
RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF =
+ ConfigOptions.key("restart-strategy.exponential-delay.max-backoff")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(5))
+ .withDescription(
+ Description.builder()
+ .text(
+ "The highest possible duration
between restarts if %s has been set to %s. "
+ + "It can be specified
using notation: \"1 min\", \"20 s\"",
+ code(RESTART_STRATEGY.key()),
code("exponential-delay"))
+ .build());
+
+ public static final ConfigOption<Double>
RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER =
+
ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier")
+ .doubleType()
+ .defaultValue(2.0)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Backoff value is multiplied by
this value after every failure,"
+ + "until max backoff is
reached if %s has been set to %s.",
+ code(RESTART_STRATEGY.key()),
code("exponential-delay"))
+ .build());
+
+ public static final ConfigOption<Duration>
+ RESTART_STRATEGY_EXPONENTIAL_DELAY_RESET_BACKOFF_THRESHOLD =
+
ConfigOptions.key("restart-strategy.exponential-delay.reset-backoff-threshold")
+ .durationType()
+ .defaultValue(Duration.ofHours(1))
+ .withDescription(
+ Description.builder()
+ .text(
+ "Threshold when the
backoff is reset to its initial value if %s has been set to %s. "
+ + "It specifies
how long the job must be running without failure "
+ + "to reset the
exponentially increasing backoff to its initial value. "
+ + "It can be
specified using notation: \"1 min\", \"20 s\"",
+
code(RESTART_STRATEGY.key()),
+ code("exponential-delay"))
+ .build());
+
+ public static final ConfigOption<Double>
RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR =
+
ConfigOptions.key("restart-strategy.exponential-delay.jitter-factor")
+ .doubleType()
+ .defaultValue(0.1)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Jitter specified as a portion of
the backoff if %s has been set to %s. "
+ + "It represents how large
random value will be added or subtracted to the backoff. "
+ + "Useful when you want to
avoid restarting multiple jobs at the same time.",
+ code(RESTART_STRATEGY.key()),
code("exponential-delay"))
+ .build());
+
private RestartStrategyOptions() {
throw new UnsupportedOperationException("This class should never be
instantiated.");
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExponentialDelayRestartBackoffTimeStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExponentialDelayRestartBackoffTimeStrategy.java
new file mode 100644
index 0000000..a519453
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExponentialDelayRestartBackoffTimeStrategy.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Restart strategy which tries to restart indefinitely number of times with
an exponential backoff
+ * time in between. The delay starts at initial value and keeps increasing
(multiplying by backoff
+ * multiplier) until maximum delay is reached.
+ *
+ * <p>If the tasks are running smoothly for some time, backoff is reset to its
initial value.
+ */
+public class ExponentialDelayRestartBackoffTimeStrategy implements
RestartBackoffTimeStrategy {
+
+ private final long initialBackoffMS;
+
+ private final long maxBackoffMS;
+
+ private final double backoffMultiplier;
+
+ private final long resetBackoffThresholdMS;
+
+ private final double jitterFactor;
+
+ private final String strategyString;
+
+ private final Clock clock;
+
+ private long currentBackoffMS;
+
+ private long lastFailureTimestamp;
+
+ ExponentialDelayRestartBackoffTimeStrategy(
+ Clock clock,
+ long initialBackoffMS,
+ long maxBackoffMS,
+ double backoffMultiplier,
+ long resetBackoffThresholdMS,
+ double jitterFactor) {
+
+ checkArgument(initialBackoffMS >= 1, "Initial backoff must be at least
1.");
+ checkArgument(maxBackoffMS >= 1, "Maximum backoff must be at least
1.");
+ checkArgument(
+ initialBackoffMS <= maxBackoffMS,
+ "Initial backoff cannot be higher than maximum backoff.");
+ checkArgument(backoffMultiplier > 1, "Backoff multiplier must be
greater than 1.");
+ checkArgument(
+ resetBackoffThresholdMS >= 1,
+ "Threshold duration for exponential backoff reset must be at
least 1.");
+ checkArgument(
+ 0 <= jitterFactor && jitterFactor <= 1, "Jitter factor must be
>= 0 and <= 1.");
+
+ this.initialBackoffMS = initialBackoffMS;
+ setInitialBackoff();
+ this.maxBackoffMS = maxBackoffMS;
+ this.backoffMultiplier = backoffMultiplier;
+ this.resetBackoffThresholdMS = resetBackoffThresholdMS;
+ this.jitterFactor = jitterFactor;
+
+ this.clock = checkNotNull(clock);
+ this.lastFailureTimestamp = 0;
+ this.strategyString = generateStrategyString();
+ }
+
+ @Override
+ public boolean canRestart() {
+ return true;
+ }
+
+ @Override
+ public long getBackoffTime() {
+ long backoffWithJitter = currentBackoffMS + calculateJitterBackoffMS();
+ return Math.min(backoffWithJitter, maxBackoffMS);
+ }
+
+ @Override
+ public void notifyFailure(Throwable cause) {
+ long now = clock.absoluteTimeMillis();
+ if ((now - lastFailureTimestamp) >= (resetBackoffThresholdMS +
currentBackoffMS)) {
+ setInitialBackoff();
+ } else {
+ increaseBackoff();
+ }
+ lastFailureTimestamp = now;
+ }
+
+ @Override
+ public String toString() {
+ return strategyString;
+ }
+
+ private void setInitialBackoff() {
+ currentBackoffMS = initialBackoffMS;
+ }
+
+ private void increaseBackoff() {
+ if (currentBackoffMS < maxBackoffMS) {
+ currentBackoffMS *= backoffMultiplier;
+ }
+ currentBackoffMS = Math.max(initialBackoffMS,
Math.min(currentBackoffMS, maxBackoffMS));
+ }
+
+ /**
+ * Calculate jitter offset to avoid thundering herd scenario. The offset
range increases with
+ * the number of restarts.
+ *
+ * <p>F.e. for backoff time 8 with jitter 0.25, it generates random number
in range [-2, 2].
+ *
+ * @return random value in interval [-n, n], where n represents jitter *
current backoff
+ */
+ private long calculateJitterBackoffMS() {
+ if (jitterFactor == 0) {
+ return 0;
+ } else {
+ long offset = (long) (currentBackoffMS * jitterFactor);
+ return ThreadLocalRandom.current().nextLong(-offset, offset + 1);
+ }
+ }
+
+ private String generateStrategyString() {
+ return "ExponentialDelayRestartBackoffTimeStrategy(initialBackoffMS="
+ + initialBackoffMS
+ + ", maxBackoffMS="
+ + maxBackoffMS
+ + ", backoffMultiplier="
+ + backoffMultiplier
+ + ", resetBackoffThresholdMS="
+ + resetBackoffThresholdMS
+ + ", jitterFactor="
+ + jitterFactor
+ + ", currentBackoffMS="
+ + currentBackoffMS
+ + ", lastFailureTimestamp="
+ + lastFailureTimestamp
+ + ")";
+ }
+
+ public static ExponentialDelayRestartBackoffTimeStrategyFactory
createFactory(
+ final Configuration configuration) {
+ long initialBackoffMS =
+ configuration
+ .get(
+ RestartStrategyOptions
+
.RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF)
+ .toMillis();
+ long maxBackoffMS =
+ configuration
+
.get(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF)
+ .toMillis();
+ double backoffMultiplier =
+ configuration.get(
+ RestartStrategyOptions
+
.RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER);
+ long resetBackoffThresholdMS =
+ configuration
+ .get(
+ RestartStrategyOptions
+
.RESTART_STRATEGY_EXPONENTIAL_DELAY_RESET_BACKOFF_THRESHOLD)
+ .toMillis();
+ double jitterFactor =
+ configuration.get(
+
RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR);
+
+ return new ExponentialDelayRestartBackoffTimeStrategyFactory(
+ initialBackoffMS,
+ maxBackoffMS,
+ backoffMultiplier,
+ resetBackoffThresholdMS,
+ jitterFactor);
+ }
+
+ /** The factory for creating {@link
ExponentialDelayRestartBackoffTimeStrategy}. */
+ public static class ExponentialDelayRestartBackoffTimeStrategyFactory
implements Factory {
+
+ private final long initialBackoffMS;
+ private final long maxBackoffMS;
+ private final double backoffMultiplier;
+ private final long resetBackoffThresholdMS;
+ private final double jitterFactor;
+
+ public ExponentialDelayRestartBackoffTimeStrategyFactory(
+ long initialBackoffMS,
+ long maxBackoffMS,
+ double backoffMultiplier,
+ long resetBackoffThresholdMS,
+ double jitterFactor) {
+ this.initialBackoffMS = initialBackoffMS;
+ this.maxBackoffMS = maxBackoffMS;
+ this.backoffMultiplier = backoffMultiplier;
+ this.resetBackoffThresholdMS = resetBackoffThresholdMS;
+ this.jitterFactor = jitterFactor;
+ }
+
+ @Override
+ public RestartBackoffTimeStrategy create() {
+ return new ExponentialDelayRestartBackoffTimeStrategy(
+ SystemClock.getInstance(),
+ initialBackoffMS,
+ maxBackoffMS,
+ backoffMultiplier,
+ resetBackoffThresholdMS,
+ jitterFactor);
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
index 8333eea..bdaf710 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph.failover.flip1;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import
org.apache.flink.api.common.restartstrategy.RestartStrategies.ExponentialDelayRestartStrategyConfiguration;
import
org.apache.flink.api.common.restartstrategy.RestartStrategies.FailureRateRestartStrategyConfiguration;
import
org.apache.flink.api.common.restartstrategy.RestartStrategies.FallbackRestartStrategyConfiguration;
import
org.apache.flink.api.common.restartstrategy.RestartStrategies.FixedDelayRestartStrategyConfiguration;
@@ -104,6 +105,18 @@ public final class RestartBackoffTimeStrategyFactoryLoader
{
failureRateConfig.getDelayBetweenAttemptsInterval().toMilliseconds()));
} else if (restartStrategyConfiguration instanceof
FallbackRestartStrategyConfiguration) {
return Optional.empty();
+ } else if (restartStrategyConfiguration
+ instanceof ExponentialDelayRestartStrategyConfiguration) {
+ final ExponentialDelayRestartStrategyConfiguration
exponentialDelayConfig =
+ (ExponentialDelayRestartStrategyConfiguration)
restartStrategyConfiguration;
+ return Optional.of(
+ new ExponentialDelayRestartBackoffTimeStrategy
+ .ExponentialDelayRestartBackoffTimeStrategyFactory(
+
exponentialDelayConfig.getInitialBackoff().toMilliseconds(),
+
exponentialDelayConfig.getMaxBackoff().toMilliseconds(),
+ exponentialDelayConfig.getBackoffMultiplier(),
+
exponentialDelayConfig.getResetBackoffThreshold().toMilliseconds(),
+ exponentialDelayConfig.getJitterFactor()));
} else {
throw new IllegalArgumentException(
"Unknown restart strategy configuration " +
restartStrategyConfiguration + ".");
@@ -133,6 +146,11 @@ public final class RestartBackoffTimeStrategyFactoryLoader
{
case "failure-rate":
return Optional.of(
FailureRateRestartBackoffTimeStrategy.createFactory(clusterConfiguration));
+ case "exponentialdelay":
+ case "exponential-delay":
+ return Optional.of(
+
ExponentialDelayRestartBackoffTimeStrategy.createFactory(
+ clusterConfiguration));
default:
throw new IllegalArgumentException(
"Unknown restart strategy " + restartStrategyName +
".");
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExponentialDelayRestartBackoffTimeStrategyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExponentialDelayRestartBackoffTimeStrategyTest.java
new file mode 100644
index 0000000..c519f88
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExponentialDelayRestartBackoffTimeStrategyTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** Unit tests for {@link ExponentialDelayRestartBackoffTimeStrategy}. */
+public class ExponentialDelayRestartBackoffTimeStrategyTest extends TestLogger
{
+
+ private final Exception failure = new Exception();
+
+ @Test
+ public void testAlwaysRestart() throws Exception {
+ final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+ new ExponentialDelayRestartBackoffTimeStrategy(
+ new ManualClock(), 1L, 3L, 2.0, 4L, 0.25);
+
+ for (int i = 0; i < 13; i++) {
+ assertTrue(restartStrategy.canRestart());
+ restartStrategy.notifyFailure(failure);
+ }
+ }
+
+ @Test
+ public void testInitialBackoff() throws Exception {
+ long initialBackoffMS = 42L;
+
+ final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+ new ExponentialDelayRestartBackoffTimeStrategy(
+ new ManualClock(), initialBackoffMS, 45L, 2.0, 8L, 0);
+
+ assertThat(restartStrategy.getBackoffTime(), is(initialBackoffMS));
+ }
+
+ @Test
+ public void testMaxBackoff() throws Exception {
+ final long maxBackoffMS = 6L;
+
+ final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+ new ExponentialDelayRestartBackoffTimeStrategy(
+ new ManualClock(), 1L, maxBackoffMS, 2.0, 8L, 0.25);
+
+ for (int i = 0; i < 10; i++) {
+ restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.getBackoffTime(),
lessThanOrEqualTo(maxBackoffMS));
+ }
+ }
+
+ @Test
+ public void testResetBackoff() throws Exception {
+ final long initialBackoffMS = 1L;
+ final long resetBackoffThresholdMS = 8L;
+ final ManualClock clock = new ManualClock();
+
+ final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+ new ExponentialDelayRestartBackoffTimeStrategy(
+ clock, initialBackoffMS, 5L, 2.0,
resetBackoffThresholdMS, 0.25);
+
+ clock.advanceTime(
+ resetBackoffThresholdMS + restartStrategy.getBackoffTime() - 1,
+ TimeUnit.MILLISECONDS);
+ restartStrategy.notifyFailure(failure);
+ assertThat("Backoff should be increased",
restartStrategy.getBackoffTime(), is(2L));
+
+ clock.advanceTime(
+ resetBackoffThresholdMS + restartStrategy.getBackoffTime(),
TimeUnit.MILLISECONDS);
+ restartStrategy.notifyFailure(failure);
+ assertThat(
+ "Backoff should be reset", restartStrategy.getBackoffTime(),
is(initialBackoffMS));
+ }
+
+ @Test
+ public void testBackoffMultiplier() throws Exception {
+ long initialBackoffMS = 4L;
+ double jitterFactor = 0;
+ double backoffMultiplier = 2.3;
+ long maxBackoffMS = 300L;
+
+ final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+ new ExponentialDelayRestartBackoffTimeStrategy(
+ new ManualClock(),
+ initialBackoffMS,
+ maxBackoffMS,
+ backoffMultiplier,
+ 8L,
+ jitterFactor);
+
+ restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.getBackoffTime(), is(9L)); // 4 * 2.3
+
+ restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.getBackoffTime(), is(20L)); // 9 * 2.3
+ }
+
+ @Test
+ public void testJitter() throws Exception {
+ final long initialBackoffMS = 2L;
+ final long maxBackoffMS = 7L;
+
+ final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+ new ExponentialDelayRestartBackoffTimeStrategy(
+ new ManualClock(), initialBackoffMS, maxBackoffMS,
2.0, 1L, 0.25);
+
+ restartStrategy.notifyFailure(failure);
+ assertCorrectRandomRange(restartStrategy::getBackoffTime, 3L, 4L, 5L);
+
+ restartStrategy.notifyFailure(failure);
+ assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L);
+
+ restartStrategy.notifyFailure(failure);
+ assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L);
+ }
+
+ @Test
+ public void testJitterNoHigherThanMax() throws Exception {
+ double jitterFactor = 1;
+ long maxBackoffMS = 7L;
+
+ final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+ new ExponentialDelayRestartBackoffTimeStrategy(
+ new ManualClock(), 1L, maxBackoffMS, 2.0, 8L,
jitterFactor);
+
+ assertCorrectRandomRange(restartStrategy::getBackoffTime, 0L, 1L, 2L);
+
+ restartStrategy.notifyFailure(failure);
+ assertCorrectRandomRange(restartStrategy::getBackoffTime, 0L, 1L, 2L,
3L, 4L);
+
+ restartStrategy.notifyFailure(failure);
+ assertCorrectRandomRange(restartStrategy::getBackoffTime, 0L, 1L, 2L,
3L, 4L, 5L, 6L, 7L);
+ }
+
+ @Test
+ public void testMultipleSettings() throws Exception {
+ ManualClock clock = new ManualClock();
+ final long initialBackoffMS = 1L;
+ final long maxBackoffMS = 9L;
+ double backoffMultiplier = 2.0;
+ final long resetBackoffThresholdMS = 8L;
+ double jitterFactor = 0.25;
+
+ final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+ new ExponentialDelayRestartBackoffTimeStrategy(
+ clock,
+ initialBackoffMS,
+ maxBackoffMS,
+ backoffMultiplier,
+ resetBackoffThresholdMS,
+ jitterFactor);
+
+ // ensure initial data
+ assertTrue(restartStrategy.canRestart());
+ assertThat(restartStrategy.getBackoffTime(), is(initialBackoffMS));
+
+ // ensure backoff time is initial after the first failure
+ clock.advanceTime(50, TimeUnit.MILLISECONDS);
+ restartStrategy.notifyFailure(failure);
+ assertTrue(restartStrategy.canRestart());
+ assertThat(restartStrategy.getBackoffTime(), is(initialBackoffMS));
+
+ // ensure backoff increases until threshold is reached
+ clock.advanceTime(4, TimeUnit.MILLISECONDS);
+ restartStrategy.notifyFailure(failure);
+ assertTrue(restartStrategy.canRestart());
+ assertThat(restartStrategy.getBackoffTime(), is(2L));
+
+ clock.advanceTime(3, TimeUnit.MILLISECONDS);
+ restartStrategy.notifyFailure(failure);
+ assertTrue(restartStrategy.canRestart());
+ assertCorrectRandomRange(restartStrategy::getBackoffTime, 3L, 4L, 5L);
+
+ clock.advanceTime(7, TimeUnit.MILLISECONDS);
+ restartStrategy.notifyFailure(failure);
+ assertTrue(restartStrategy.canRestart());
+ assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L, 8L,
9L);
+
+ // ensure backoff is reset after threshold is reached
+ clock.advanceTime(resetBackoffThresholdMS + 9 + 1,
TimeUnit.MILLISECONDS);
+ restartStrategy.notifyFailure(failure);
+ assertTrue(restartStrategy.canRestart());
+ assertThat(restartStrategy.getBackoffTime(), is(1L));
+
+ // ensure backoff still increases
+ restartStrategy.notifyFailure(failure);
+ assertTrue(restartStrategy.canRestart());
+ assertThat(restartStrategy.getBackoffTime(), is(2L));
+ }
+
+ private void assertCorrectRandomRange(Callable<Long> numberGenerator,
Long... expectedNumbers)
+ throws Exception {
+ Set<Long> generatedNumbers = new HashSet<>();
+ for (int i = 0; i < 1000; i++) {
+ long generatedNumber = numberGenerator.call();
+ generatedNumbers.add(generatedNumber);
+ }
+ assertThat(generatedNumbers, is(equalTo(new
HashSet<>(Arrays.asList(expectedNumbers)))));
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoaderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoaderTest.java
index d096b86..9492557 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoaderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoaderTest.java
@@ -68,6 +68,29 @@ public class RestartBackoffTimeStrategyFactoryLoaderTest
extends TestLogger {
}
@Test
+ public void testExponentialDelayRestartStrategySpecifiedInJobConfig() {
+ final Configuration conf = new Configuration();
+ conf.setString(RestartStrategyOptions.RESTART_STRATEGY,
"failure-rate");
+
+ final RestartBackoffTimeStrategy.Factory factory =
+
RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
+ RestartStrategies.exponentialDelayRestart(
+ Time.milliseconds(1),
+ Time.milliseconds(1000),
+ 1.1,
+ Time.milliseconds(2000),
+ 0),
+ conf,
+ false);
+
+ assertThat(
+ factory,
+ instanceOf(
+ ExponentialDelayRestartBackoffTimeStrategy
+
.ExponentialDelayRestartBackoffTimeStrategyFactory.class));
+ }
+
+ @Test
public void testFailureRateRestartStrategySpecifiedInJobConfig() {
final Configuration conf = new Configuration();
conf.setString(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
@@ -116,6 +139,22 @@ public class RestartBackoffTimeStrategyFactoryLoaderTest
extends TestLogger {
}
@Test
+ public void testExponentialDelayStrategySpecifiedInClusterConfig() {
+ final Configuration conf = new Configuration();
+ conf.setString(RestartStrategyOptions.RESTART_STRATEGY,
"exponential-delay");
+
+ final RestartBackoffTimeStrategy.Factory factory =
+
RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
+ DEFAULT_JOB_LEVEL_RESTART_CONFIGURATION, conf, false);
+
+ assertThat(
+ factory,
+ instanceOf(
+ ExponentialDelayRestartBackoffTimeStrategy
+
.ExponentialDelayRestartBackoffTimeStrategyFactory.class));
+ }
+
+ @Test
public void testFailureRateStrategySpecifiedInClusterConfig() {
final Configuration conf = new Configuration();
conf.setString(RestartStrategyOptions.RESTART_STRATEGY,
"failure-rate");
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryExponentialDelayRestartStrategyITBase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryExponentialDelayRestartStrategyITBase.java
new file mode 100644
index 0000000..cbdebff
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryExponentialDelayRestartStrategyITBase.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.ClassRule;
+
+import java.time.Duration;
+
+/** Test cluster configuration with exponential-delay recovery. */
+public class SimpleRecoveryExponentialDelayRestartStrategyITBase extends
SimpleRecoveryITCaseBase {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(2)
+ .build());
+
+ private static Configuration getConfiguration() {
+ Configuration config = new Configuration();
+ config.setString(RestartStrategyOptions.RESTART_STRATEGY,
"exponential-delay");
+ config.set(
+
RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF,
+ Duration.ofMillis(5));
+ config.set(
+
RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF,
+ Duration.ofMillis(100));
+ config.setDouble(
+
RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER,
2);
+ config.set(
+
RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_RESET_BACKOFF_THRESHOLD,
+ Duration.ofMillis(1000));
+ config.setDouble(
+
RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR, 0.2);
+
+ return config;
+ }
+}