[FLINK-8549] [config] Move TimerServiceOptions into TaskManagerOptions

This closes #5402


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85bfc073
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85bfc073
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85bfc073

Branch: refs/heads/master
Commit: 85bfc073680667e9daa16439d33fa86c1c0be133
Parents: 0644f4f
Author: Stephan Ewen <[email protected]>
Authored: Fri Feb 2 09:38:37 2018 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Sun Feb 18 22:24:34 2018 +0100

----------------------------------------------------------------------
 .../flink/configuration/TaskManagerOptions.java |  9 +++++
 .../configuration/TimerServiceOptions.java      | 38 --------------------
 .../streaming/runtime/tasks/StreamTask.java     |  4 +--
 3 files changed, 11 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/85bfc073/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 2f009a3..e01cf0f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -343,6 +343,15 @@ public class TaskManagerOptions {
                                " leads to a fatal TaskManager error. A value 
of 0 deactivates" +
                                " the watch dog.");
        /**
+        * This configures how long we wait for the timers to finish all 
pending timer threads
+        * when the stream task is cancelled .
+        */
+       public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT_TIMERS 
= ConfigOptions
+                       .key("task.cancellation.timers.timeout")
+                       .defaultValue(7500L)
+                       
.withDeprecatedKeys("timerservice.exceptional.shutdown.timeout");
+
+       /**
         * The maximum number of bytes that a checkpoint alignment may buffer.
         * If the checkpoint alignment buffers more than the configured amount 
of
         * data, the checkpoint is aborted (skipped).

http://git-wip-us.apache.org/repos/asf/flink/blob/85bfc073/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
deleted file mode 100644
index 835adce..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.configuration;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-/**
- * Timer service configuration options.
- */
-@PublicEvolving
-public class TimerServiceOptions {
-
-       /**
-        * This configures how long we wait for the {@link 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService}
-        * to finish all pending timer threads when the stream task performs a 
failover shutdown. See FLINK-5465.
-        */
-       public static final ConfigOption<Long> 
TIMER_SERVICE_TERMINATION_AWAIT_MS = ConfigOptions
-               .key("timerservice.exceptional.shutdown.timeout")
-               .defaultValue(7500L);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/85bfc073/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4a85fb9..d6fb6c0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FileSystemSafetyNet;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -48,7 +49,6 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
-import org.apache.flink.streaming.configuration.TimerServiceOptions;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
@@ -343,7 +343,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                try {
 
                                        final long timeoutMs = 
getEnvironment().getTaskManagerInfo().getConfiguration().
-                                               
getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS);
+                                               
getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
 
                                        // wait for a reasonable time for all 
pending timer threads to finish
                                        boolean timerShutdownComplete =

Reply via email to