This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b693b014ab39eb8e61db915adf9957c66774abb0
Author: gyao <[email protected]>
AuthorDate: Tue Jul 24 15:48:27 2018 +0200

    [FLINK-9159][runtime] Deprecate config key slotmanager.request-timeout
    
    - Replace config key slotmanager.request-timeout with slot.request.timeout 
because
    both keys have effectively the same semantics.
    - Rename key slotmanager.taskmanager-timeout to
    resourcemanager.taskmanager-timeout.
    
    This closes #6406.
---
 .../generated/resource_manager_configuration.html  |  5 ++
 .../generated/slot_manager_configuration.html      | 21 -------
 docs/ops/config.md                                 |  6 --
 .../flink/configuration/JobManagerOptions.java     |  6 ++
 .../configuration/ResourceManagerOptions.java      | 25 +++++---
 .../slotmanager/SlotManagerConfiguration.java      | 22 ++++++-
 .../slotmanager/SlotManagerConfigurationTest.java  | 67 ++++++++++++++++++++++
 7 files changed, 116 insertions(+), 36 deletions(-)

diff --git a/docs/_includes/generated/resource_manager_configuration.html 
b/docs/_includes/generated/resource_manager_configuration.html
index 1b82e51..9243fcd 100644
--- a/docs/_includes/generated/resource_manager_configuration.html
+++ b/docs/_includes/generated/resource_manager_configuration.html
@@ -32,5 +32,10 @@
             <td style="word-wrap: break-word;">0</td>
             <td>Defines the network port to connect to for communication with 
the resource manager. By default, the port of the JobManager, because the same 
ActorSystem is used. Its not possible to use this configuration key to define 
port ranges.</td>
         </tr>
+        <tr>
+            <td><h5>resourcemanager.taskmanager-timeout</h5></td>
+            <td style="word-wrap: break-word;">30000</td>
+            <td>The timeout for an idle task manager to be released.</td>
+        </tr>
     </tbody>
 </table>
diff --git a/docs/_includes/generated/slot_manager_configuration.html 
b/docs/_includes/generated/slot_manager_configuration.html
deleted file mode 100644
index 1517a39..0000000
--- a/docs/_includes/generated/slot_manager_configuration.html
+++ /dev/null
@@ -1,21 +0,0 @@
-<table class="table table-bordered">
-    <thead>
-        <tr>
-            <th class="text-left" style="width: 20%">Key</th>
-            <th class="text-left" style="width: 15%">Default</th>
-            <th class="text-left" style="width: 65%">Description</th>
-        </tr>
-    </thead>
-    <tbody>
-        <tr>
-            <td><h5>slotmanager.request-timeout</h5></td>
-            <td style="word-wrap: break-word;">600000</td>
-            <td>The timeout for a slot request to be discarded.</td>
-        </tr>
-        <tr>
-            <td><h5>slotmanager.taskmanager-timeout</h5></td>
-            <td style="word-wrap: break-word;">30000</td>
-            <td>The timeout for an idle task manager to be released.</td>
-        </tr>
-    </tbody>
-</table>
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 1e6be19..fd0df0c 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -170,12 +170,6 @@ You have to configure `jobmanager.archive.fs.dir` in order 
to archive terminated
 
 {% include generated/history_server_configuration.html %}
 
-### Slot Manager
-
-The configuration keys in this section are relevant for the SlotManager 
running in the ResourceManager
-
-{% include generated/slot_manager_configuration.html %}
-
 ## Legacy
 
 - `mode`: Execution mode of Flink. Possible values are `legacy` and `new`. In 
order to start the legacy components, you have to specify `legacy` (DEFAULT: 
`new`).
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index f78ed9d..2bb5732 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -145,11 +145,17 @@ public class JobManagerOptions {
                .defaultValue(60L * 60L)
                .withDescription("The time in seconds after which a completed 
job expires and is purged from the job store.");
 
+       /**
+        * The timeout in milliseconds for requesting a slot from Slot Pool.
+        */
        public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT =
                key("slot.request.timeout")
                .defaultValue(5L * 60L * 1000L)
                .withDescription("The timeout in milliseconds for requesting a 
slot from Slot Pool.");
 
+       /**
+        * The timeout in milliseconds for a idle slot in Slot Pool.
+        */
        public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT =
                key("slot.idle.timeout")
                        // default matches heartbeat.timeout so that sticky 
allocation is not lost on timeouts for local recovery
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index 4ce4981..5a203e3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -19,16 +19,12 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.docs.ConfigGroup;
-import org.apache.flink.annotation.docs.ConfigGroups;
+import org.apache.flink.configuration.description.Description;
 
 /**
  * The set of configuration options relating to the ResourceManager.
  */
 @PublicEvolving
-@ConfigGroups(groups = {
-       @ConfigGroup(name = "SlotManager", keyPrefix = "slotmanager")
-})
 public class ResourceManagerOptions {
 
        /**
@@ -72,21 +68,36 @@ public class ResourceManagerOptions {
 
        /**
         * The timeout for a slot request to be discarded, in milliseconds.
+        * @deprecated Use {@link JobManagerOptions#SLOT_REQUEST_TIMEOUT}.
         */
+       @Deprecated
        public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT = 
ConfigOptions
                .key("slotmanager.request-timeout")
-               .defaultValue(600000L)
+               .defaultValue(-1L)
                .withDescription("The timeout for a slot request to be 
discarded.");
 
        /**
         * The timeout for an idle task manager to be released, in milliseconds.
+        * @deprecated Use {@link #TASK_MANAGER_TIMEOUT}.
         */
-       public static final ConfigOption<Long> TASK_MANAGER_TIMEOUT = 
ConfigOptions
+       @Deprecated
+       public static final ConfigOption<Long> 
SLOT_MANAGER_TASK_MANAGER_TIMEOUT = ConfigOptions
                .key("slotmanager.taskmanager-timeout")
                .defaultValue(30000L)
                .withDescription("The timeout for an idle task manager to be 
released.");
 
        /**
+        * The timeout for an idle task manager to be released, in milliseconds.
+        */
+       public static final ConfigOption<Long> TASK_MANAGER_TIMEOUT = 
ConfigOptions
+               .key("resourcemanager.taskmanager-timeout")
+               .defaultValue(30000L)
+               .withDeprecatedKeys(SLOT_MANAGER_TASK_MANAGER_TIMEOUT.key())
+               .withDescription(Description.builder()
+                       .text("The timeout for an idle task manager to be 
released.")
+                       .build());
+
+       /**
         * Prefix for passing custom environment variables to Flink's master 
process.
         * For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
         * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index 2f8751a..1f11f3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -21,10 +21,14 @@ package 
org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.Duration;
 
 /**
@@ -32,6 +36,8 @@ import scala.concurrent.duration.Duration;
  */
 public class SlotManagerConfiguration {
 
+       private static final Logger LOGGER = 
LoggerFactory.getLogger(SlotManagerConfiguration.class);
+
        private final Time taskManagerRequestTimeout;
        private final Time slotRequestTimeout;
        private final Time taskManagerTimeout;
@@ -68,11 +74,23 @@ public class SlotManagerConfiguration {
                                "value " + AkkaOptions.ASK_TIMEOUT + '.', e);
                }
 
-               final Time slotRequestTimeout = Time.milliseconds(
-                               
configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT));
+               final Time slotRequestTimeout = 
getSlotRequestTimeout(configuration);
                final Time taskManagerTimeout = Time.milliseconds(
                                
configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));
 
                return new SlotManagerConfiguration(rpcTimeout, 
slotRequestTimeout, taskManagerTimeout);
        }
+
+       private static Time getSlotRequestTimeout(final Configuration 
configuration) {
+               final long slotRequestTimeoutMs;
+               if 
(configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) {
+                       LOGGER.warn("Config key {} is deprecated; use {} 
instead.",
+                               ResourceManagerOptions.SLOT_REQUEST_TIMEOUT,
+                               JobManagerOptions.SLOT_REQUEST_TIMEOUT);
+                       slotRequestTimeoutMs = 
configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT);
+               } else {
+                       slotRequestTimeoutMs = 
configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT);
+               }
+               return Time.milliseconds(slotRequestTimeoutMs);
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java
new file mode 100644
index 0000000..ddd1ac8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link SlotManagerConfiguration}.
+ */
+public class SlotManagerConfigurationTest extends TestLogger {
+
+       /**
+        * Tests that {@link SlotManagerConfiguration#getSlotRequestTimeout()} 
returns the value
+        * configured under key {@link JobManagerOptions#SLOT_REQUEST_TIMEOUT}.
+        */
+       @Test
+       public void testSetSlotRequestTimeout() throws Exception {
+               final long slotIdleTimeout = 42;
+
+               final Configuration configuration = new Configuration();
+               configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 
slotIdleTimeout);
+               final SlotManagerConfiguration slotManagerConfiguration = 
SlotManagerConfiguration.fromConfiguration(configuration);
+
+               
assertThat(slotManagerConfiguration.getSlotRequestTimeout().toMilliseconds(), 
is(equalTo(slotIdleTimeout)));
+       }
+
+       /**
+        * Tests that {@link ResourceManagerOptions#SLOT_REQUEST_TIMEOUT} is 
preferred over
+        * {@link JobManagerOptions#SLOT_REQUEST_TIMEOUT} if set.
+        */
+       @Test
+       public void testPreferLegacySlotRequestTimeout() throws Exception {
+               final long legacySlotIdleTimeout = 42;
+
+               final Configuration configuration = new Configuration();
+               
configuration.setLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT, 
legacySlotIdleTimeout);
+               configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 
300000L);
+               final SlotManagerConfiguration slotManagerConfiguration = 
SlotManagerConfiguration.fromConfiguration(configuration);
+
+               
assertThat(slotManagerConfiguration.getSlotRequestTimeout().toMilliseconds(), 
is(equalTo(legacySlotIdleTimeout)));
+       }
+}

Reply via email to