[hotfix] [runtim] Minor code cleanups.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24771613
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24771613
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24771613

Branch: refs/heads/master
Commit: 2477161352e12e75e2f0f85b5833ad04dc6d31f2
Parents: f8cd9ba
Author: Stephan Ewen <[email protected]>
Authored: Mon Jul 11 20:36:44 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Thu Jul 14 21:11:48 2016 +0200

----------------------------------------------------------------------
 .../jobgraph/tasks/AbstractInvokable.java       | 26 +++++++++-----------
 .../runtime/tasks/StreamTaskState.java          | 10 ++++----
 2 files changed, 16 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24771613/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index d7dfaf5..f63a762 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.BatchTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This is the abstract base class for every task that can be executed by a
@@ -36,8 +34,6 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractInvokable {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractInvokable.class);
-
        /** The environment assigned to this invokable. */
        private Environment environment;
 
@@ -57,6 +53,17 @@ public abstract class AbstractInvokable {
        public abstract void invoke() throws Exception;
 
        /**
+        * This method is called when a task is canceled either as a result of 
a user abort or an execution failure. It can
+        * be overwritten to respond to shut down the user code properly.
+        *
+        * @throws Exception
+        *         thrown if any exception occurs during the execution of the 
user code
+        */
+       public void cancel() throws Exception {
+               // The default implementation does nothing.
+       }
+       
+       /**
         * Sets the environment of this task.
         * 
         * @param environment
@@ -126,15 +133,4 @@ public abstract class AbstractInvokable {
        public ExecutionConfig getExecutionConfig() {
                return this.environment.getExecutionConfig();
        }
-
-       /**
-        * This method is called when a task is canceled either as a result of 
a user abort or an execution failure. It can
-        * be overwritten to respond to shut down the user code properly.
-        * 
-        * @throws Exception
-        *         thrown if any exception occurs during the execution of the 
user code
-        */
-       public void cancel() throws Exception {
-               // The default implementation does nothing.
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24771613/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
index b90f823..c9e29d3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
@@ -95,7 +95,11 @@ public class StreamTaskState implements Serializable {
                StateHandle<?> operatorState = this.operatorState;
                StateHandle<?> functionState = this.functionState;
                HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = 
this.kvStates;
-               
+
+               this.operatorState = null;
+               this.functionState = null;
+               this.kvStates = null;
+       
                if (operatorState != null) {
                        operatorState.discardState();
                }
@@ -117,9 +121,5 @@ public class StreamTaskState implements Serializable {
                                }
                        }
                }
-
-               this.operatorState = null;
-               this.functionState = null;
-               this.kvStates = null;
        }
 }

Reply via email to