Repository: tez Updated Branches: refs/heads/master d0348b03b -> b0ba133ff
TEZ-2937. Can Processor.close() be called after closing inputs and outputs? (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b0ba133f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b0ba133f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b0ba133f Branch: refs/heads/master Commit: b0ba133ffcf1339b2c505878e9622084d4290bde Parents: d0348b0 Author: Jonathan Eagles <[email protected]> Authored: Fri Jan 15 10:58:37 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Fri Jan 15 10:58:37 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../runtime/LogicalIOProcessorRuntimeTask.java | 53 ++++++++++---------- 2 files changed, 29 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b0ba133f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d6a53aa..5226ea6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2937. Can Processor.close() be called after closing inputs and outputs? TEZ-3037. History URL should be set regardless of which history logging service is enabled. TEZ-3032. DAG start time getting logged using system time instead of recorded time in startTime field. @@ -314,6 +315,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-2937. Can Processor.close() be called after closing inputs and outputs? TEZ-3037. History URL should be set regardless of which history logging service is enabled. TEZ-3032. DAG start time getting logged using system time instead of recorded time in startTime field. TEZ-2129. Task and Attempt views should contain links to the logs http://git-wip-us.apache.org/repos/asf/tez/blob/b0ba133f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index df09fdb..7f546e6 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -357,10 +357,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { "Can only run while in RUNNING state. Current: " + this.state); this.state.set(State.CLOSED); - // Close the Processor. - processorClosed = true; - processor.close(); - // Close the Inputs. for (InputSpec inputSpec : inputSpecs) { String srcVertexName = inputSpec.getSourceVertexName(); @@ -380,6 +376,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), destVertexName, taskSpec.getTaskAttemptID()); } + + // Close the Processor. + processorClosed = true; + processor.close(); + } finally { setTaskDone(); if (eventRouterThread != null) { @@ -843,28 +844,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { LOG.debug("Num of outputs to be closed={}", initializedOutputs.size()); } - // Close processor - if (!processorClosed && processor != null) { - try { - processorClosed = true; - processor.close(); - LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}", - processor - .getContext().getTaskVertexName(), - processor.getContext().getTaskVertexIndex(), - Thread.currentThread().isInterrupted()); - maybeResetInterruptStatus(); - } catch (InterruptedException ie) { - //reset the status - LOG.info("Resetting interrupt for processor"); - Thread.currentThread().interrupt(); - } catch (Throwable e) { - LOG.warn( - "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" + - e.getClass().getName(), e.getMessage()); - } - } - // Close the remaining inited Inputs. Iterator<Map.Entry<String, LogicalInput>> inputIterator = initializedInputs.entrySet().iterator(); while (inputIterator.hasNext()) { @@ -917,6 +896,28 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { printThreads(); } + // Close processor + if (!processorClosed && processor != null) { + try { + processorClosed = true; + processor.close(); + LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}", + processor + .getContext().getTaskVertexName(), + processor.getContext().getTaskVertexIndex(), + Thread.currentThread().isInterrupted()); + maybeResetInterruptStatus(); + } catch (InterruptedException ie) { + //reset the status + LOG.info("Resetting interrupt for processor"); + Thread.currentThread().interrupt(); + } catch (Throwable e) { + LOG.warn( + "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" + + e.getClass().getName(), e.getMessage()); + } + } + try { closeContexts(); // Cleanup references which may be held by misbehaved tasks.
