[FLINK-8549] [config] Move TimerServiceOptions into TaskManagerOptions This closes #5402
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85bfc073 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85bfc073 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85bfc073 Branch: refs/heads/master Commit: 85bfc073680667e9daa16439d33fa86c1c0be133 Parents: 0644f4f Author: Stephan Ewen <[email protected]> Authored: Fri Feb 2 09:38:37 2018 +0100 Committer: Stephan Ewen <[email protected]> Committed: Sun Feb 18 22:24:34 2018 +0100 ---------------------------------------------------------------------- .../flink/configuration/TaskManagerOptions.java | 9 +++++ .../configuration/TimerServiceOptions.java | 38 -------------------- .../streaming/runtime/tasks/StreamTask.java | 4 +-- 3 files changed, 11 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/85bfc073/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index 2f009a3..e01cf0f 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -343,6 +343,15 @@ public class TaskManagerOptions { " leads to a fatal TaskManager error. A value of 0 deactivates" + " the watch dog."); /** + * This configures how long we wait for the timers to finish all pending timer threads + * when the stream task is cancelled . + */ + public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT_TIMERS = ConfigOptions + .key("task.cancellation.timers.timeout") + .defaultValue(7500L) + .withDeprecatedKeys("timerservice.exceptional.shutdown.timeout"); + + /** * The maximum number of bytes that a checkpoint alignment may buffer. * If the checkpoint alignment buffers more than the configured amount of * data, the checkpoint is aborted (skipped). http://git-wip-us.apache.org/repos/asf/flink/blob/85bfc073/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java deleted file mode 100644 index 835adce..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.configuration; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; - -/** - * Timer service configuration options. - */ -@PublicEvolving -public class TimerServiceOptions { - - /** - * This configures how long we wait for the {@link org.apache.flink.streaming.runtime.tasks.ProcessingTimeService} - * to finish all pending timer threads when the stream task performs a failover shutdown. See FLINK-5465. - */ - public static final ConfigOption<Long> TIMER_SERVICE_TERMINATION_AWAIT_MS = ConfigOptions - .key("timerservice.exceptional.shutdown.timeout") - .defaultValue(7500L); -} http://git-wip-us.apache.org/repos/asf/flink/blob/85bfc073/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 4a85fb9..d6fb6c0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FileSystemSafetyNet; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -48,7 +49,6 @@ import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl; -import org.apache.flink.streaming.configuration.TimerServiceOptions; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; @@ -343,7 +343,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> try { final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration(). - getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS); + getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS); // wait for a reasonable time for all pending timer threads to finish boolean timerShutdownComplete =
