This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b693b014ab39eb8e61db915adf9957c66774abb0 Author: gyao <[email protected]> AuthorDate: Tue Jul 24 15:48:27 2018 +0200 [FLINK-9159][runtime] Deprecate config key slotmanager.request-timeout - Replace config key slotmanager.request-timeout with slot.request.timeout because both keys have effectively the same semantics. - Rename key slotmanager.taskmanager-timeout to resourcemanager.taskmanager-timeout. This closes #6406. --- .../generated/resource_manager_configuration.html | 5 ++ .../generated/slot_manager_configuration.html | 21 ------- docs/ops/config.md | 6 -- .../flink/configuration/JobManagerOptions.java | 6 ++ .../configuration/ResourceManagerOptions.java | 25 +++++--- .../slotmanager/SlotManagerConfiguration.java | 22 ++++++- .../slotmanager/SlotManagerConfigurationTest.java | 67 ++++++++++++++++++++++ 7 files changed, 116 insertions(+), 36 deletions(-) diff --git a/docs/_includes/generated/resource_manager_configuration.html b/docs/_includes/generated/resource_manager_configuration.html index 1b82e51..9243fcd 100644 --- a/docs/_includes/generated/resource_manager_configuration.html +++ b/docs/_includes/generated/resource_manager_configuration.html @@ -32,5 +32,10 @@ <td style="word-wrap: break-word;">0</td> <td>Defines the network port to connect to for communication with the resource manager. By default, the port of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges.</td> </tr> + <tr> + <td><h5>resourcemanager.taskmanager-timeout</h5></td> + <td style="word-wrap: break-word;">30000</td> + <td>The timeout for an idle task manager to be released.</td> + </tr> </tbody> </table> diff --git a/docs/_includes/generated/slot_manager_configuration.html b/docs/_includes/generated/slot_manager_configuration.html deleted file mode 100644 index 1517a39..0000000 --- a/docs/_includes/generated/slot_manager_configuration.html +++ /dev/null @@ -1,21 +0,0 @@ -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left" style="width: 20%">Key</th> - <th class="text-left" style="width: 15%">Default</th> - <th class="text-left" style="width: 65%">Description</th> - </tr> - </thead> - <tbody> - <tr> - <td><h5>slotmanager.request-timeout</h5></td> - <td style="word-wrap: break-word;">600000</td> - <td>The timeout for a slot request to be discarded.</td> - </tr> - <tr> - <td><h5>slotmanager.taskmanager-timeout</h5></td> - <td style="word-wrap: break-word;">30000</td> - <td>The timeout for an idle task manager to be released.</td> - </tr> - </tbody> -</table> diff --git a/docs/ops/config.md b/docs/ops/config.md index 1e6be19..fd0df0c 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -170,12 +170,6 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated {% include generated/history_server_configuration.html %} -### Slot Manager - -The configuration keys in this section are relevant for the SlotManager running in the ResourceManager - -{% include generated/slot_manager_configuration.html %} - ## Legacy - `mode`: Execution mode of Flink. Possible values are `legacy` and `new`. In order to start the legacy components, you have to specify `legacy` (DEFAULT: `new`). diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index f78ed9d..2bb5732 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -145,11 +145,17 @@ public class JobManagerOptions { .defaultValue(60L * 60L) .withDescription("The time in seconds after which a completed job expires and is purged from the job store."); + /** + * The timeout in milliseconds for requesting a slot from Slot Pool. + */ public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT = key("slot.request.timeout") .defaultValue(5L * 60L * 1000L) .withDescription("The timeout in milliseconds for requesting a slot from Slot Pool."); + /** + * The timeout in milliseconds for a idle slot in Slot Pool. + */ public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT = key("slot.idle.timeout") // default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java index 4ce4981..5a203e3 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java @@ -19,16 +19,12 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.annotation.docs.ConfigGroup; -import org.apache.flink.annotation.docs.ConfigGroups; +import org.apache.flink.configuration.description.Description; /** * The set of configuration options relating to the ResourceManager. */ @PublicEvolving -@ConfigGroups(groups = { - @ConfigGroup(name = "SlotManager", keyPrefix = "slotmanager") -}) public class ResourceManagerOptions { /** @@ -72,21 +68,36 @@ public class ResourceManagerOptions { /** * The timeout for a slot request to be discarded, in milliseconds. + * @deprecated Use {@link JobManagerOptions#SLOT_REQUEST_TIMEOUT}. */ + @Deprecated public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT = ConfigOptions .key("slotmanager.request-timeout") - .defaultValue(600000L) + .defaultValue(-1L) .withDescription("The timeout for a slot request to be discarded."); /** * The timeout for an idle task manager to be released, in milliseconds. + * @deprecated Use {@link #TASK_MANAGER_TIMEOUT}. */ - public static final ConfigOption<Long> TASK_MANAGER_TIMEOUT = ConfigOptions + @Deprecated + public static final ConfigOption<Long> SLOT_MANAGER_TASK_MANAGER_TIMEOUT = ConfigOptions .key("slotmanager.taskmanager-timeout") .defaultValue(30000L) .withDescription("The timeout for an idle task manager to be released."); /** + * The timeout for an idle task manager to be released, in milliseconds. + */ + public static final ConfigOption<Long> TASK_MANAGER_TIMEOUT = ConfigOptions + .key("resourcemanager.taskmanager-timeout") + .defaultValue(30000L) + .withDeprecatedKeys(SLOT_MANAGER_TASK_MANAGER_TIMEOUT.key()) + .withDescription(Description.builder() + .text("The timeout for an idle task manager to be released.") + .build()); + + /** * Prefix for passing custom environment variables to Flink's master process. * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java index 2f8751a..1f11f3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java @@ -21,10 +21,14 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.util.ConfigurationException; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import scala.concurrent.duration.Duration; /** @@ -32,6 +36,8 @@ import scala.concurrent.duration.Duration; */ public class SlotManagerConfiguration { + private static final Logger LOGGER = LoggerFactory.getLogger(SlotManagerConfiguration.class); + private final Time taskManagerRequestTimeout; private final Time slotRequestTimeout; private final Time taskManagerTimeout; @@ -68,11 +74,23 @@ public class SlotManagerConfiguration { "value " + AkkaOptions.ASK_TIMEOUT + '.', e); } - final Time slotRequestTimeout = Time.milliseconds( - configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)); + final Time slotRequestTimeout = getSlotRequestTimeout(configuration); final Time taskManagerTimeout = Time.milliseconds( configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT)); return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout); } + + private static Time getSlotRequestTimeout(final Configuration configuration) { + final long slotRequestTimeoutMs; + if (configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) { + LOGGER.warn("Config key {} is deprecated; use {} instead.", + ResourceManagerOptions.SLOT_REQUEST_TIMEOUT, + JobManagerOptions.SLOT_REQUEST_TIMEOUT); + slotRequestTimeoutMs = configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT); + } else { + slotRequestTimeoutMs = configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT); + } + return Time.milliseconds(slotRequestTimeoutMs); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java new file mode 100644 index 0000000..ddd1ac8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link SlotManagerConfiguration}. + */ +public class SlotManagerConfigurationTest extends TestLogger { + + /** + * Tests that {@link SlotManagerConfiguration#getSlotRequestTimeout()} returns the value + * configured under key {@link JobManagerOptions#SLOT_REQUEST_TIMEOUT}. + */ + @Test + public void testSetSlotRequestTimeout() throws Exception { + final long slotIdleTimeout = 42; + + final Configuration configuration = new Configuration(); + configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, slotIdleTimeout); + final SlotManagerConfiguration slotManagerConfiguration = SlotManagerConfiguration.fromConfiguration(configuration); + + assertThat(slotManagerConfiguration.getSlotRequestTimeout().toMilliseconds(), is(equalTo(slotIdleTimeout))); + } + + /** + * Tests that {@link ResourceManagerOptions#SLOT_REQUEST_TIMEOUT} is preferred over + * {@link JobManagerOptions#SLOT_REQUEST_TIMEOUT} if set. + */ + @Test + public void testPreferLegacySlotRequestTimeout() throws Exception { + final long legacySlotIdleTimeout = 42; + + final Configuration configuration = new Configuration(); + configuration.setLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT, legacySlotIdleTimeout); + configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 300000L); + final SlotManagerConfiguration slotManagerConfiguration = SlotManagerConfiguration.fromConfiguration(configuration); + + assertThat(slotManagerConfiguration.getSlotRequestTimeout().toMilliseconds(), is(equalTo(legacySlotIdleTimeout))); + } +}
