Repository: tez
Updated Branches:
  refs/heads/branch-0.7 b1cbae024 -> 73a677a12


TEZ-2937. Can Processor.close() be called after closing inputs and outputs? 
(jeagles)

(cherry picked from commit b0ba133ffcf1339b2c505878e9622084d4290bde)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/73a677a1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/73a677a1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/73a677a1

Branch: refs/heads/branch-0.7
Commit: 73a677a1265a42e3d99b61650a60757e25429104
Parents: b1cbae0
Author: Jonathan Eagles <[email protected]>
Authored: Fri Jan 15 10:58:37 2016 -0600
Committer: Jonathan Eagles <[email protected]>
Committed: Fri Jan 15 11:07:54 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 46 ++++++++++++--------
 2 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/73a677a1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 503d718..09b29cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES
+  TEZ-2937. Can Processor.close() be called after closing inputs and outputs?
   TEZ-3037. History URL should be set regardless of which history logging 
service is enabled.
   TEZ-2129. Task and Attempt views should contain links to the logs
   TEZ-3025. InputInitializer creation should use the dag ugi.

http://git-wip-us.apache.org/repos/asf/tez/blob/73a677a1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index f393769..e85eaeb 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -350,10 +350,6 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
           "Can only run while in RUNNING state. Current: " + this.state);
       this.state.set(State.CLOSED);
 
-      // Close the Processor.
-      processorClosed = true;
-      processor.close();
-
       // Close the Inputs.
       for (InputSpec inputSpec : inputSpecs) {
         String srcVertexName = inputSpec.getSourceVertexName();
@@ -373,6 +369,11 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
             EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
             destVertexName, taskSpec.getTaskAttemptID());
       }
+
+      // Close the Processor.
+      processorClosed = true;
+      processor.close();
+
     } finally {
       setTaskDone();
       if (eventRouterThread != null) {
@@ -806,21 +807,6 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
       LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
       LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
     }
-    // Close processor
-    if (!processorClosed && processor != null) {
-      try {
-        processorClosed = true;
-        processor.close();
-        LOG.info("Closed processor for vertex={}, index={}",
-            processor
-                .getContext().getTaskVertexName(),
-            processor.getContext().getTaskVertexIndex());
-      } catch (Throwable e) {
-        LOG.warn(
-            "Ignoring Exception when closing processor(cleanup). Exception 
class={}, message={}",
-                e.getClass().getName(), e.getMessage());
-      }
-    }
 
     // Close the remaining inited Inputs.
     Iterator<Map.Entry<String, LogicalInput>> inputIterator = 
initializedInputs.entrySet().iterator();
@@ -858,6 +844,28 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
       }
     }
 
+    // Close processor
+    if (!processorClosed && processor != null) {
+      try {
+        processorClosed = true;
+        processor.close();
+        LOG.info("Closed processor for vertex={}, index={}, 
interruptedStatus={}",
+            processor
+            .getContext().getTaskVertexName(),
+            processor.getContext().getTaskVertexIndex(),
+            Thread.currentThread().isInterrupted());
+        maybeResetInterruptStatus();
+      } catch (InterruptedException ie) {
+        //reset the status
+        LOG.info("Resetting interrupt for processor");
+        Thread.currentThread().interrupt();
+      } catch (Throwable e) {
+        LOG.warn(
+            "Ignoring Exception when closing processor(cleanup). Exception 
class={}, message={}" +
+            e.getClass().getName(), e.getMessage());
+      }
+    }
+
     try {
       closeContexts();
       // Cleanup references which may be held by misbehaved tasks.

Reply via email to