This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 8b03294 [FLINK-16144] get client.timeout for the client, with a fallback to the akka.client.timeout. 8b03294 is described below commit 8b032941888079d5015fb40ad755429aa5297d90 Author: wangtong <wtgee...@163.com> AuthorDate: Fri May 15 21:56:42 2020 +0800 [FLINK-16144] get client.timeout for the client, with a fallback to the akka.client.timeout. This closes #12179. --- docs/_includes/generated/akka_configuration.html | 6 --- docs/_includes/generated/client_configuration.html | 24 ++++++++++ .../generated/execution_configuration.html | 12 ----- docs/ops/config.md | 4 ++ docs/ops/config.zh.md | 4 ++ .../org/apache/flink/client/cli/CliFrontend.java | 3 +- .../org/apache/flink/client/cli/ClientOptions.java | 45 ++++++++++++++++++ .../ApplicationDispatcherBootstrap.java | 6 +-- .../application/executors/EmbeddedExecutor.java | 4 +- .../executors/EmbeddedExecutorFactory.java | 4 +- .../apache/flink/client/cli/ClientOptionsTest.java | 53 ++++++++++++++++++++++ .../apache/flink/configuration/AkkaOptions.java | 6 ++- .../flink/configuration/ConfigConstants.java | 4 +- .../flink/configuration/ExecutionOptions.java | 15 ------ .../configuration/ConfigOptionsDocGenerator.java | 3 +- .../org/apache/flink/runtime/akka/AkkaUtils.scala | 4 -- .../apache/flink/runtime/akka/AkkaUtilsTest.scala | 2 +- 17 files changed, 148 insertions(+), 51 deletions(-) diff --git a/docs/_includes/generated/akka_configuration.html b/docs/_includes/generated/akka_configuration.html index 2baeb6f..3e59c13 100644 --- a/docs/_includes/generated/akka_configuration.html +++ b/docs/_includes/generated/akka_configuration.html @@ -39,12 +39,6 @@ <td>Min number of threads to cap factor-based number to.</td> </tr> <tr> - <td><h5>akka.client.timeout</h5></td> - <td style="word-wrap: break-word;">"60 s"</td> - <td>String</td> - <td>Timeout for all blocking calls on the client side.</td> - </tr> - <tr> <td><h5>akka.fork-join-executor.parallelism-factor</h5></td> <td style="word-wrap: break-word;">2.0</td> <td>Double</td> diff --git a/docs/_includes/generated/client_configuration.html b/docs/_includes/generated/client_configuration.html new file mode 100644 index 0000000..c623dde --- /dev/null +++ b/docs/_includes/generated/client_configuration.html @@ -0,0 +1,24 @@ +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>client.retry-period</h5></td> + <td style="word-wrap: break-word;">2 s</td> + <td>Duration</td> + <td>The interval (in ms) between consecutive retries of failed attempts to execute commands through the CLI or Flink's clients, wherever retry is supported (default 2sec).</td> + </tr> + <tr> + <td><h5>client.timeout</h5></td> + <td style="word-wrap: break-word;">1 min</td> + <td>Duration</td> + <td>Timeout on the client side.</td> + </tr> + </tbody> +</table> diff --git a/docs/_includes/generated/execution_configuration.html b/docs/_includes/generated/execution_configuration.html index f893a75..037960c 100644 --- a/docs/_includes/generated/execution_configuration.html +++ b/docs/_includes/generated/execution_configuration.html @@ -20,17 +20,5 @@ <td>Boolean</td> <td>Tells if we should use compression for the state snapshot data or not</td> </tr> - <tr> - <td><h5>execution.embedded-rpc-retry-period</h5></td> - <td style="word-wrap: break-word;">2 s</td> - <td>Duration</td> - <td>The retry period (in ms) between consecutive attempts to get the job status when executing applications in "Application Mode".</td> - </tr> - <tr> - <td><h5>execution.embedded-rpc-timeout</h5></td> - <td style="word-wrap: break-word;">1 h</td> - <td>Duration</td> - <td>The rpc timeout (in ms) when executing applications in "Application Mode". This affects all rpc's available through the Job Client and job submission.</td> - </tr> </tbody> </table> diff --git a/docs/ops/config.md b/docs/ops/config.md index f33d03f..9432f5d 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -423,6 +423,10 @@ These options may be removed in a future release. # Backup +#### Client + +{% include generated/client_configuration.html %} + #### Execution {% include generated/deployment_configuration.html %} diff --git a/docs/ops/config.zh.md b/docs/ops/config.zh.md index 80244fd..a4dd3d7 100644 --- a/docs/ops/config.zh.md +++ b/docs/ops/config.zh.md @@ -423,6 +423,10 @@ These options may be removed in a future release. # Backup +#### Client + +{% include generated/client_configuration.html %} + #### Execution {% include generated/deployment_configuration.html %} diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 7218977..1e2bae0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -46,7 +46,6 @@ import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.plugin.PluginUtils; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.security.SecurityConfiguration; @@ -141,7 +140,7 @@ public class CliFrontend { customCommandLine.addRunOptions(customCommandLineOptions); } - this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration); + this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT); this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java new file mode 100644 index 0000000..063421e --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import java.time.Duration; + +/** + * Describes a client configuration parameter. + */ +@PublicEvolving +public class ClientOptions { + + public static final ConfigOption<Duration> CLIENT_TIMEOUT = + ConfigOptions.key("client.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(60)) + .withDeprecatedKeys("akka.client.timeout") // the deprecated AkkaOptions.CLIENT_TIMEOUT + .withDescription("Timeout on the client side."); + + public static final ConfigOption<Duration> CLIENT_RETRY_PERIOD = + ConfigOptions.key("client.retry-period") + .durationType() + .defaultValue(Duration.ofMillis(2000)) + .withDescription("The interval (in ms) between consecutive retries of failed attempts to execute " + + "commands through the CLI or Flink's clients, wherever retry is supported (default 2sec)."); +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index 9ff1f54..2b4ddf3 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -23,11 +23,11 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; import org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.runtime.client.JobCancellationException; @@ -263,8 +263,8 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap final JobID jobId, final ScheduledExecutor scheduledExecutor) { - final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis()); - final Time retryPeriod = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_RETRY_PERIOD).toMillis()); + final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); + final Time retryPeriod = Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis()); return JobStatusPollingUtils.getJobResult( dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java index febf3ae..aa2db40 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java @@ -22,9 +22,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.client.deployment.executors.PipelineExecutorUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.PipelineExecutor; @@ -103,7 +103,7 @@ public class EmbeddedExecutor implements PipelineExecutor { } private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException { - final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis()); + final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); final JobID actualJobId = jobGraph.getJobID(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java index 4c11786..51d8bbc 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java @@ -21,9 +21,9 @@ package org.apache.flink.client.deployment.application.executors; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.client.deployment.application.EmbeddedJobClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.core.execution.PipelineExecutorFactory; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -80,7 +80,7 @@ public class EmbeddedExecutorFactory implements PipelineExecutorFactory { submittedJobIds, dispatcherGateway, jobId -> { - final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis()); + final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); return new EmbeddedJobClient(jobId, dispatcherGateway, retryExecutor, timeout); }); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/ClientOptionsTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/ClientOptionsTest.java new file mode 100644 index 0000000..9db564f --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/ClientOptionsTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli; + +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.time.Duration; + +import static org.junit.Assert.assertEquals; + +/** + * unit test for ClientOptions. + */ +@RunWith(JUnit4.class) +public class ClientOptionsTest { + + @Test + public void testGetClientTimeout() { + Configuration configuration = new Configuration(); + configuration.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofSeconds(10)); + + assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), Duration.ofSeconds(10)); + + configuration = new Configuration(); + configuration.set(AkkaOptions.CLIENT_TIMEOUT, "20 s"); + assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), Duration.ofSeconds(20)); + + configuration = new Configuration(); + assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), ClientOptions.CLIENT_TIMEOUT.defaultValue()); + } + +} diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index e9b3002..c553e0b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -156,12 +156,16 @@ public class AkkaOptions { /** * Timeout for all blocking calls on the client side. + * + * @deprecated Use the {@code ClientOptions.CLIENT_TIMEOUT} instead. */ + @Deprecated public static final ConfigOption<String> CLIENT_TIMEOUT = ConfigOptions .key("akka.client.timeout") .stringType() .defaultValue("60 s") - .withDescription("Timeout for all blocking calls on the client side."); + .withDescription("DEPRECATED: Use the \"client.timeout\" instead." + + " Timeout for all blocking calls on the client side."); /** * Exit JVM on fatal Akka errors. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index f1e2de8..4566211 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -939,7 +939,7 @@ public final class ConfigConstants { /** * Timeout for all blocking calls on the client side. * - * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead. + * @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead. */ @Deprecated public static final String AKKA_CLIENT_TIMEOUT = "akka.client.timeout"; @@ -1787,7 +1787,7 @@ public final class ConfigConstants { public static final String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s"; /** - * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead. + * @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead. */ @Deprecated public static final String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s"; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java index cb34583..60f6cd8 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java @@ -55,19 +55,4 @@ public class ExecutionOptions { "throughput") ) .build()); - - public static final ConfigOption<Duration> EMBEDDED_RPC_TIMEOUT = - ConfigOptions.key("execution.embedded-rpc-timeout") - .durationType() - .defaultValue(Duration.ofMillis(60 * 60 * 1000)) - .withDescription("The rpc timeout (in ms) when executing applications in \"Application Mode\". " + - "This affects all rpc's available through the Job Client and job submission."); - - public static final ConfigOption<Duration> EMBEDDED_RPC_RETRY_PERIOD = - ConfigOptions.key("execution.embedded-rpc-retry-period") - .durationType() - .defaultValue(Duration.ofMillis(2000)) - .withDescription("The retry period (in ms) between consecutive attempts to get the job status " + - "when executing applications in \"Application Mode\"."); - } diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java index b98b848..ac6b933 100644 --- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java +++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java @@ -77,7 +77,8 @@ public class ConfigOptionsDocGenerator { new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state"), new OptionsClassLocation("flink-table/flink-table-api-java", "org.apache.flink.table.api.config"), new OptionsClassLocation("flink-python", "org.apache.flink.python"), - new OptionsClassLocation("flink-kubernetes", "org.apache.flink.kubernetes.configuration") + new OptionsClassLocation("flink-kubernetes", "org.apache.flink.kubernetes.configuration"), + new OptionsClassLocation("flink-clients", "org.apache.flink.client.cli") }; static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList( diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index f046a19..ea3b4bf 100755 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -793,10 +793,6 @@ object AkkaUtils { TimeUtils.parseDuration(config.getString(AkkaOptions.LOOKUP_TIMEOUT)) } - def getClientTimeout(config: Configuration): time.Duration = { - TimeUtils.parseDuration(config.getString(AkkaOptions.CLIENT_TIMEOUT)) - } - /** Returns the address of the given [[ActorSystem]]. The [[Address]] object contains * the port and the host under which the actor system is reachable * diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index 4cb7557..0aa6a5e 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -105,7 +105,7 @@ class AkkaUtilsTest val IPv4AddressString = "192.168.0.1" val port = 1234 val address = new InetSocketAddress(IPv4AddressString, port) - + val url = s"akka://flink@$IPv4AddressString:$port/user/jobmanager" val result = AkkaUtils.getInetSocketAddressFromAkkaURL(url)