[ 
https://issues.apache.org/jira/browse/FLINK-10073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16572727#comment-16572727
 ] 

ASF GitHub Bot commented on FLINK-10073:
----------------------------------------

asfgit closed pull request #6506: [FLINK-10073] [sql-client] Allow setting a 
restart strategy in SQL Client
URL: https://github.com/apache/flink/pull/6506
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index d35aa591a4d..0e8d2d651b1 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -108,6 +108,8 @@ Greg, 1
 
 Both result modes can be useful during the prototyping of SQL queries.
 
+<span class="label label-danger">Attention</span> Queries that are executed in 
a batch environment, can only be retrieved using the `table` result mode.
+
 After a query is defined, it can be submitted to the cluster as a 
long-running, detached Flink job. For this, a target system that stores the 
results needs to be specified using the [INSERT INTO 
statement](sqlClient.html#detached-sql-queries). The [configuration 
section](sqlClient.html#configuration) explains how to declare table sources 
for reading data, how to declare table sinks for writing data, and how to 
configure other table program properties.
 
 {% top %}
@@ -204,6 +206,8 @@ execution:
   max-parallelism: 16               # optional: Flink's maximum parallelism 
(128 by default)
   min-idle-state-retention: 0       # optional: table program's minimum idle 
state time
   max-idle-state-retention: 0       # optional: table program's maximum idle 
state time
+  restart-strategy:                 # optional: restart strategy
+    type: fallback                  #           "fallback" to global restart 
strategy by default
 
 # Deployment properties allow for describing the cluster to which table 
programs are submitted to.
 
@@ -227,7 +231,35 @@ Depending on the use case, a configuration can be split 
into multiple files. The
 CLI commands > session environment file > defaults environment file
 {% endhighlight %}
 
-Queries that are executed in a batch environment, can only be retrieved using 
the `table` result mode. 
+#### Restart Strategies
+
+Restart strategies control how Flink jobs are restarted in case of a failure. 
Similar to [global restart strategies]({{ site.baseurl 
}}/dev/restart_strategies.html) for a Flink cluster, a more fine-grained 
restart configuration can be declared in an environment file.
+
+The following strategies are supported:
+
+{% highlight yaml %}
+execution:
+  # falls back to the global strategy defined in flink-conf.yaml
+  restart-strategy:
+    type: fallback
+
+  # job fails directly and no restart is attempted
+  restart-strategy:
+    type: none
+
+  # attempts a given number of times to restart the job
+  restart-strategy:
+    type: fixed-delay
+    attempts: 3      # retries before job is declared as failed (default: 
Integer.MAX_VALUE)
+    delay: 10000     # delay in ms between retries (default: 10 s)
+
+  # attempts as long as the maximum number of failures per time interval is 
not exceeded
+  restart-strategy:
+    type: failure-rate
+    max-failures-per-interval: 1   # retries in interval until failing 
(default: 1)
+    failure-rate-interval: 60000   # measuring interval in ms for failure rate
+    delay: 10000                   # delay in ms between retries (default: 10 
s)
+{% endhighlight %}
 
 {% top %}
 
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml 
b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 51e6e95bc1f..8be4ce63ecb 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -74,6 +74,12 @@ execution:
   min-idle-state-retention: 0
   # maximum idle state retention in ms
   max-idle-state-retention: 0
+  # controls how table programs are restarted in case of a failures
+  restart-strategy:
+    # strategy type
+    # possible values are "fixed-delay", "failure-rate", "none", or "fallback" 
(default)
+    type: fallback
+
 
 #==============================================================================
 # Deployment properties
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index 0d6e6dd59ac..b7c28938121 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.client.config;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 
 import java.util.Collections;
@@ -57,10 +59,10 @@ public boolean isBatchExecution() {
        }
 
        public TimeCharacteristic getTimeCharacteristic() {
-               final String s = properties.getOrDefault(
+               final String characteristic = properties.getOrDefault(
                        PropertyStrings.EXECUTION_TIME_CHARACTERISTIC,
                        
PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME);
-               switch (s) {
+               switch (characteristic) {
                        case 
PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME:
                                return TimeCharacteristic.EventTime;
                        case 
PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME:
@@ -90,6 +92,45 @@ public int getMaxParallelism() {
                return 
Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_PARALLELISM,
 Integer.toString(128)));
        }
 
+       public RestartStrategies.RestartStrategyConfiguration 
getRestartStrategy() {
+               final String restartStrategy = properties.getOrDefault(
+                       PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE,
+                       
PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK);
+               switch (restartStrategy) {
+                       case 
PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE:
+                               return RestartStrategies.noRestart();
+                       case 
PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY:
+                               final int attempts = Integer.parseInt(
+                                       properties.getOrDefault(
+                                               
PropertyStrings.EXECUTION_RESTART_STRATEGY_ATTEMPTS,
+                                               
Integer.toString(Integer.MAX_VALUE)));
+                               final long fixedDelay = Long.parseLong(
+                                       properties.getOrDefault(
+                                               
PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY,
+                                               Long.toString(10_000)));
+                               return 
RestartStrategies.fixedDelayRestart(attempts, fixedDelay);
+                       case 
PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE:
+                               final int failureRate = Integer.parseInt(
+                                       properties.getOrDefault(
+                                               
PropertyStrings.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL,
+                                               Integer.toString(1)));
+                               final long failureInterval = Long.parseLong(
+                                       properties.getOrDefault(
+                                               
PropertyStrings.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL,
+                                               Long.toString(60_000)));
+                               final long attemptDelay = Long.parseLong(
+                                       properties.getOrDefault(
+                                               
PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY,
+                                               Long.toString(10_000)));
+                               return RestartStrategies.failureRateRestart(
+                                       failureRate,
+                                       Time.milliseconds(failureInterval),
+                                       Time.milliseconds(attemptDelay));
+                       default:
+                               return RestartStrategies.fallBackRestart();
+               }
+       }
+
        public boolean isChangelogMode() {
                return Objects.equals(
                        properties.get(PropertyStrings.EXECUTION_RESULT_MODE),
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
index 76e52defb8b..2a6b001d5f4 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
@@ -57,6 +57,24 @@ private PropertyStrings() {
 
        public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
 
+       public static final String EXECUTION_RESTART_STRATEGY_TYPE = 
"restart-strategy.type";
+
+       public static final String 
EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK = "fallback";
+
+       public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE = 
"none";
+
+       public static final String 
EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY = "fixed-delay";
+
+       public static final String 
EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE = "failure-rate";
+
+       public static final String EXECUTION_RESTART_STRATEGY_ATTEMPTS = 
"restart-strategy.attempts";
+
+       public static final String EXECUTION_RESTART_STRATEGY_DELAY = 
"restart-strategy.delay";
+
+       public static final String 
EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL = 
"restart-strategy.failure-rate-interval";
+
+       public static final String 
EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = 
"restart-strategy.max-failures-per-interval";
+
        public static final String DEPLOYMENT = "deployment";
 
        public static final String DEPLOYMENT_TYPE = "type";
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 4283953446e..9ff683755d2 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -361,12 +361,14 @@ private FlinkPlan createPlan(String name, Configuration 
flinkConfig) {
 
                private ExecutionEnvironment createExecutionEnvironment() {
                        final ExecutionEnvironment execEnv = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
execEnv.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy());
                        
execEnv.setParallelism(mergedEnv.getExecution().getParallelism());
                        return execEnv;
                }
 
                private StreamExecutionEnvironment 
createStreamExecutionEnvironment() {
                        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       
env.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy());
                        
env.setParallelism(mergedEnv.getExecution().getParallelism());
                        
env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism());
                        
env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic());
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index bc29f6f2795..04575c696a2 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
@@ -41,6 +42,7 @@
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test for {@link ExecutionContext}.
@@ -53,7 +55,16 @@
        public void testExecutionConfig() throws Exception {
                final ExecutionContext<?> context = createExecutionContext();
                final ExecutionConfig config = 
context.createEnvironmentInstance().getExecutionConfig();
+
                assertEquals(99, config.getAutoWatermarkInterval());
+
+               final RestartStrategies.RestartStrategyConfiguration 
restartConfig = config.getRestartStrategy();
+               assertTrue(restartConfig instanceof 
RestartStrategies.FailureRateRestartStrategyConfiguration);
+               final RestartStrategies.FailureRateRestartStrategyConfiguration 
failureRateStrategy =
+                       
(RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig;
+               assertEquals(10, failureRateStrategy.getMaxFailureRate());
+               assertEquals(99_000, 
failureRateStrategy.getFailureInterval().toMilliseconds());
+               assertEquals(1_000, 
failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
        }
 
        @Test
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index d8452e42dbf..ed4ce7a9d8d 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -149,6 +149,10 @@ public void testGetSessionProperties() throws Exception {
                expectedProperties.put("execution.max-idle-state-retention", 
"0");
                expectedProperties.put("execution.min-idle-state-retention", 
"0");
                expectedProperties.put("execution.result-mode", "table");
+               expectedProperties.put("execution.restart-strategy.type", 
"failure-rate");
+               
expectedProperties.put("execution.restart-strategy.max-failures-per-interval", 
"10");
+               
expectedProperties.put("execution.restart-strategy.failure-rate-interval", 
"99000");
+               expectedProperties.put("execution.restart-strategy.delay", 
"1000");
                expectedProperties.put("deployment.response-timeout", "5000");
 
                assertEquals(expectedProperties, actualProperties);
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index b759874939d..22351bc3959 100644
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -117,6 +117,11 @@ execution:
   min-idle-state-retention: 0
   max-idle-state-retention: 0
   result-mode: "$VAR_3"
+  restart-strategy:
+    type: failure-rate
+    max-failures-per-interval: 10
+    failure-rate-interval: 99000
+    delay: 1000
 
 deployment:
   response-timeout: 5000


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow setting a restart strategy in SQL Client
> ----------------------------------------------
>
>                 Key: FLINK-10073
>                 URL: https://issues.apache.org/jira/browse/FLINK-10073
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> Currently, it is not possible to set a restart strategy per job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to