[ https://issues.apache.org/jira/browse/FLINK-10073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16572727#comment-16572727 ]
ASF GitHub Bot commented on FLINK-10073: ---------------------------------------- asfgit closed pull request #6506: [FLINK-10073] [sql-client] Allow setting a restart strategy in SQL Client URL: https://github.com/apache/flink/pull/6506 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md index d35aa591a4d..0e8d2d651b1 100644 --- a/docs/dev/table/sqlClient.md +++ b/docs/dev/table/sqlClient.md @@ -108,6 +108,8 @@ Greg, 1 Both result modes can be useful during the prototyping of SQL queries. +<span class="label label-danger">Attention</span> Queries that are executed in a batch environment, can only be retrieved using the `table` result mode. + After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. For this, a target system that stores the results needs to be specified using the [INSERT INTO statement](sqlClient.html#detached-sql-queries). The [configuration section](sqlClient.html#configuration) explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties. {% top %} @@ -204,6 +206,8 @@ execution: max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default) min-idle-state-retention: 0 # optional: table program's minimum idle state time max-idle-state-retention: 0 # optional: table program's maximum idle state time + restart-strategy: # optional: restart strategy + type: fallback # "fallback" to global restart strategy by default # Deployment properties allow for describing the cluster to which table programs are submitted to. @@ -227,7 +231,35 @@ Depending on the use case, a configuration can be split into multiple files. The CLI commands > session environment file > defaults environment file {% endhighlight %} -Queries that are executed in a batch environment, can only be retrieved using the `table` result mode. +#### Restart Strategies + +Restart strategies control how Flink jobs are restarted in case of a failure. Similar to [global restart strategies]({{ site.baseurl }}/dev/restart_strategies.html) for a Flink cluster, a more fine-grained restart configuration can be declared in an environment file. + +The following strategies are supported: + +{% highlight yaml %} +execution: + # falls back to the global strategy defined in flink-conf.yaml + restart-strategy: + type: fallback + + # job fails directly and no restart is attempted + restart-strategy: + type: none + + # attempts a given number of times to restart the job + restart-strategy: + type: fixed-delay + attempts: 3 # retries before job is declared as failed (default: Integer.MAX_VALUE) + delay: 10000 # delay in ms between retries (default: 10 s) + + # attempts as long as the maximum number of failures per time interval is not exceeded + restart-strategy: + type: failure-rate + max-failures-per-interval: 1 # retries in interval until failing (default: 1) + failure-rate-interval: 60000 # measuring interval in ms for failure rate + delay: 10000 # delay in ms between retries (default: 10 s) +{% endhighlight %} {% top %} diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml index 51e6e95bc1f..8be4ce63ecb 100644 --- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml @@ -74,6 +74,12 @@ execution: min-idle-state-retention: 0 # maximum idle state retention in ms max-idle-state-retention: 0 + # controls how table programs are restarted in case of a failures + restart-strategy: + # strategy type + # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default) + type: fallback + #============================================================================== # Deployment properties diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java index 0d6e6dd59ac..b7c28938121 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java @@ -18,6 +18,8 @@ package org.apache.flink.table.client.config; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.TimeCharacteristic; import java.util.Collections; @@ -57,10 +59,10 @@ public boolean isBatchExecution() { } public TimeCharacteristic getTimeCharacteristic() { - final String s = properties.getOrDefault( + final String characteristic = properties.getOrDefault( PropertyStrings.EXECUTION_TIME_CHARACTERISTIC, PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME); - switch (s) { + switch (characteristic) { case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME: return TimeCharacteristic.EventTime; case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME: @@ -90,6 +92,45 @@ public int getMaxParallelism() { return Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_PARALLELISM, Integer.toString(128))); } + public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() { + final String restartStrategy = properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE, + PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK); + switch (restartStrategy) { + case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE: + return RestartStrategies.noRestart(); + case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY: + final int attempts = Integer.parseInt( + properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_ATTEMPTS, + Integer.toString(Integer.MAX_VALUE))); + final long fixedDelay = Long.parseLong( + properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY, + Long.toString(10_000))); + return RestartStrategies.fixedDelayRestart(attempts, fixedDelay); + case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE: + final int failureRate = Integer.parseInt( + properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL, + Integer.toString(1))); + final long failureInterval = Long.parseLong( + properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL, + Long.toString(60_000))); + final long attemptDelay = Long.parseLong( + properties.getOrDefault( + PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY, + Long.toString(10_000))); + return RestartStrategies.failureRateRestart( + failureRate, + Time.milliseconds(failureInterval), + Time.milliseconds(attemptDelay)); + default: + return RestartStrategies.fallBackRestart(); + } + } + public boolean isChangelogMode() { return Objects.equals( properties.get(PropertyStrings.EXECUTION_RESULT_MODE), diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java index 76e52defb8b..2a6b001d5f4 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java @@ -57,6 +57,24 @@ private PropertyStrings() { public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table"; + public static final String EXECUTION_RESTART_STRATEGY_TYPE = "restart-strategy.type"; + + public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK = "fallback"; + + public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE = "none"; + + public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY = "fixed-delay"; + + public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE = "failure-rate"; + + public static final String EXECUTION_RESTART_STRATEGY_ATTEMPTS = "restart-strategy.attempts"; + + public static final String EXECUTION_RESTART_STRATEGY_DELAY = "restart-strategy.delay"; + + public static final String EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL = "restart-strategy.failure-rate-interval"; + + public static final String EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = "restart-strategy.max-failures-per-interval"; + public static final String DEPLOYMENT = "deployment"; public static final String DEPLOYMENT_TYPE = "type"; diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 4283953446e..9ff683755d2 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -361,12 +361,14 @@ private FlinkPlan createPlan(String name, Configuration flinkConfig) { private ExecutionEnvironment createExecutionEnvironment() { final ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment(); + execEnv.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy()); execEnv.setParallelism(mergedEnv.getExecution().getParallelism()); return execEnv; } private StreamExecutionEnvironment createStreamExecutionEnvironment() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy()); env.setParallelism(mergedEnv.getExecution().getParallelism()); env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism()); env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic()); diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java index bc29f6f2795..04575c696a2 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.client.gateway.local; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.configuration.Configuration; @@ -41,6 +42,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Test for {@link ExecutionContext}. @@ -53,7 +55,16 @@ public void testExecutionConfig() throws Exception { final ExecutionContext<?> context = createExecutionContext(); final ExecutionConfig config = context.createEnvironmentInstance().getExecutionConfig(); + assertEquals(99, config.getAutoWatermarkInterval()); + + final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy(); + assertTrue(restartConfig instanceof RestartStrategies.FailureRateRestartStrategyConfiguration); + final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy = + (RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig; + assertEquals(10, failureRateStrategy.getMaxFailureRate()); + assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds()); + assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds()); } @Test diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index d8452e42dbf..ed4ce7a9d8d 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -149,6 +149,10 @@ public void testGetSessionProperties() throws Exception { expectedProperties.put("execution.max-idle-state-retention", "0"); expectedProperties.put("execution.min-idle-state-retention", "0"); expectedProperties.put("execution.result-mode", "table"); + expectedProperties.put("execution.restart-strategy.type", "failure-rate"); + expectedProperties.put("execution.restart-strategy.max-failures-per-interval", "10"); + expectedProperties.put("execution.restart-strategy.failure-rate-interval", "99000"); + expectedProperties.put("execution.restart-strategy.delay", "1000"); expectedProperties.put("deployment.response-timeout", "5000"); assertEquals(expectedProperties, actualProperties); diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml index b759874939d..22351bc3959 100644 --- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml @@ -117,6 +117,11 @@ execution: min-idle-state-retention: 0 max-idle-state-retention: 0 result-mode: "$VAR_3" + restart-strategy: + type: failure-rate + max-failures-per-interval: 10 + failure-rate-interval: 99000 + delay: 1000 deployment: response-timeout: 5000 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow setting a restart strategy in SQL Client > ---------------------------------------------- > > Key: FLINK-10073 > URL: https://issues.apache.org/jira/browse/FLINK-10073 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Timo Walther > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, it is not possible to set a restart strategy per job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)