Repository: tez Updated Branches: refs/heads/branch-0.7 b1cbae024 -> 73a677a12
TEZ-2937. Can Processor.close() be called after closing inputs and outputs? (jeagles) (cherry picked from commit b0ba133ffcf1339b2c505878e9622084d4290bde) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/73a677a1 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/73a677a1 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/73a677a1 Branch: refs/heads/branch-0.7 Commit: 73a677a1265a42e3d99b61650a60757e25429104 Parents: b1cbae0 Author: Jonathan Eagles <[email protected]> Authored: Fri Jan 15 10:58:37 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Fri Jan 15 11:07:54 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../runtime/LogicalIOProcessorRuntimeTask.java | 46 ++++++++++++-------- 2 files changed, 28 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/73a677a1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 503d718..09b29cc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES + TEZ-2937. Can Processor.close() be called after closing inputs and outputs? TEZ-3037. History URL should be set regardless of which history logging service is enabled. TEZ-2129. Task and Attempt views should contain links to the logs TEZ-3025. InputInitializer creation should use the dag ugi. http://git-wip-us.apache.org/repos/asf/tez/blob/73a677a1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index f393769..e85eaeb 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -350,10 +350,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { "Can only run while in RUNNING state. Current: " + this.state); this.state.set(State.CLOSED); - // Close the Processor. - processorClosed = true; - processor.close(); - // Close the Inputs. for (InputSpec inputSpec : inputSpecs) { String srcVertexName = inputSpec.getSourceVertexName(); @@ -373,6 +369,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), destVertexName, taskSpec.getTaskAttemptID()); } + + // Close the Processor. + processorClosed = true; + processor.close(); + } finally { setTaskDone(); if (eventRouterThread != null) { @@ -806,21 +807,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { LOG.debug("Num of inputs to be closed={}", initializedInputs.size()); LOG.debug("Num of outputs to be closed={}", initializedOutputs.size()); } - // Close processor - if (!processorClosed && processor != null) { - try { - processorClosed = true; - processor.close(); - LOG.info("Closed processor for vertex={}, index={}", - processor - .getContext().getTaskVertexName(), - processor.getContext().getTaskVertexIndex()); - } catch (Throwable e) { - LOG.warn( - "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}", - e.getClass().getName(), e.getMessage()); - } - } // Close the remaining inited Inputs. Iterator<Map.Entry<String, LogicalInput>> inputIterator = initializedInputs.entrySet().iterator(); @@ -858,6 +844,28 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } } + // Close processor + if (!processorClosed && processor != null) { + try { + processorClosed = true; + processor.close(); + LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}", + processor + .getContext().getTaskVertexName(), + processor.getContext().getTaskVertexIndex(), + Thread.currentThread().isInterrupted()); + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt for processor"); + Thread.currentThread().interrupt(); + } catch (Throwable e) { + LOG.warn( + "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" + + e.getClass().getName(), e.getMessage()); + } + } + try { closeContexts(); // Cleanup references which may be held by misbehaved tasks.
