[hotfix] [runtim] Minor code cleanups.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24771613 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24771613 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24771613 Branch: refs/heads/master Commit: 2477161352e12e75e2f0f85b5833ad04dc6d31f2 Parents: f8cd9ba Author: Stephan Ewen <[email protected]> Authored: Mon Jul 11 20:36:44 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Jul 14 21:11:48 2016 +0200 ---------------------------------------------------------------------- .../jobgraph/tasks/AbstractInvokable.java | 26 +++++++++----------- .../runtime/tasks/StreamTaskState.java | 10 ++++---- 2 files changed, 16 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/24771613/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index d7dfaf5..f63a762 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -22,8 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.BatchTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This is the abstract base class for every task that can be executed by a @@ -36,8 +34,6 @@ import org.slf4j.LoggerFactory; */ public abstract class AbstractInvokable { - private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokable.class); - /** The environment assigned to this invokable. */ private Environment environment; @@ -57,6 +53,17 @@ public abstract class AbstractInvokable { public abstract void invoke() throws Exception; /** + * This method is called when a task is canceled either as a result of a user abort or an execution failure. It can + * be overwritten to respond to shut down the user code properly. + * + * @throws Exception + * thrown if any exception occurs during the execution of the user code + */ + public void cancel() throws Exception { + // The default implementation does nothing. + } + + /** * Sets the environment of this task. * * @param environment @@ -126,15 +133,4 @@ public abstract class AbstractInvokable { public ExecutionConfig getExecutionConfig() { return this.environment.getExecutionConfig(); } - - /** - * This method is called when a task is canceled either as a result of a user abort or an execution failure. It can - * be overwritten to respond to shut down the user code properly. - * - * @throws Exception - * thrown if any exception occurs during the execution of the user code - */ - public void cancel() throws Exception { - // The default implementation does nothing. - } } http://git-wip-us.apache.org/repos/asf/flink/blob/24771613/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java index b90f823..c9e29d3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java @@ -95,7 +95,11 @@ public class StreamTaskState implements Serializable { StateHandle<?> operatorState = this.operatorState; StateHandle<?> functionState = this.functionState; HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = this.kvStates; - + + this.operatorState = null; + this.functionState = null; + this.kvStates = null; + if (operatorState != null) { operatorState.discardState(); } @@ -117,9 +121,5 @@ public class StreamTaskState implements Serializable { } } } - - this.operatorState = null; - this.functionState = null; - this.kvStates = null; } }
