Repository: tez Updated Branches: refs/heads/master ca83804f9 -> 5f63de8ee
TEZ-2383. Cleanup input/output/processor contexts in LogicalIOProcessorRuntimeTask (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5f63de8e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5f63de8e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5f63de8e Branch: refs/heads/master Commit: 5f63de8eecbc8c0f487da122714f0aca1639ac4f Parents: ca83804 Author: Rajesh Balamohan <[email protected]> Authored: Thu Apr 30 05:48:00 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Apr 30 05:48:00 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../runtime/LogicalIOProcessorRuntimeTask.java | 50 +++++++++- .../runtime/api/impl/TezInputContextImpl.java | 18 +++- .../runtime/api/impl/TezOutputContextImpl.java | 16 +++- .../api/impl/TezProcessorContextImpl.java | 18 +++- .../runtime/api/impl/TezTaskContextImpl.java | 22 +++-- .../TestLogicalIOProcessorRuntimeTask.java | 96 ++++++++++++++------ 7 files changed, 176 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 16a7e08..3a1867e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2383. Cleanup input/output/processor contexts in LogicalIOProcessorRuntimeTask. TEZ-2084. Tez UI: Stacktrace format info is lost in diagnostics TEZ-2374. Fix build break against hadoop-2.2 due to TEZ-2325. TEZ-2314. Tez task attempt failures due to bad event serialization http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 80c2717..f465d3c 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -37,6 +38,8 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.runtime.api.TaskContext; +import org.apache.tez.runtime.api.impl.TezProcessorContextImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -77,7 +80,6 @@ import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezInputContextImpl; import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl; import org.apache.tez.runtime.api.impl.TezOutputContextImpl; -import org.apache.tez.runtime.api.impl.TezProcessorContextImpl; import org.apache.tez.runtime.api.impl.TezUmbilical; import org.apache.tez.runtime.common.resources.MemoryDistributor; @@ -97,12 +99,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private final String[] localDirs; /** Responsible for maintaining order of Inputs */ private final List<InputSpec> inputSpecs; - private final ConcurrentHashMap<String, LogicalInput> inputsMap; - private final ConcurrentHashMap<String, InputContext> inputContextMap; + private final Map<String, LogicalInput> inputsMap; + private final Map<String, InputContext> inputContextMap; /** Responsible for maintaining order of Outputs */ private final List<OutputSpec> outputSpecs; - private final ConcurrentHashMap<String, LogicalOutput> outputsMap; - private final ConcurrentHashMap<String, OutputContext> outputContextMap; + private final Map<String, LogicalOutput> outputsMap; + private final Map<String, OutputContext> outputContextMap; private final List<GroupInputSpec> groupInputSpecs; private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap; @@ -692,7 +694,45 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { eventRouterThread.start(); } + private void cleanupInputOutputs() { + if (groupInputsMap != null) { + groupInputsMap.clear(); + } + inputsMap.clear(); + outputsMap.clear(); + } + + private void closeContexts() throws IOException { + closeContext(inputContextMap); + closeContext(outputContextMap); + closeContext(processorContext); + } + + private void closeContext(Map<String, ? extends TaskContext> contextMap) throws IOException { + if (contextMap == null) { + return; + } + + for(TaskContext context : contextMap.values()) { + closeContext(context); + } + contextMap.clear(); + } + + private void closeContext(TaskContext context) throws IOException { + if (context != null && (context instanceof Closeable)) { + ((Closeable) context).close(); + } + } + public synchronized void cleanup() { + try { + cleanupInputOutputs(); + closeContexts(); + } catch (IOException e) { + LOG.info("Error while cleaning up contexts ", e); + } + LOG.info("Final Counters : " + getCounters().toShortString()); setTaskDone(); if (eventRouterThread != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index f6330f3..101aeb9 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import static com.google.common.base.Preconditions.checkNotNull; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -46,16 +47,20 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.ObjectRegistry; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.common.resources.MemoryDistributor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TezInputContextImpl extends TezTaskContextImpl implements InputContext { - private final UserPayload userPayload; + private static final Logger LOG = LoggerFactory.getLogger(TezInputContextImpl.class); + + private UserPayload userPayload; private final String sourceVertexName; private final EventMetaData sourceInfo; private final int inputIndex; private final Map<String, LogicalInput> inputs; - private final InputReadyTracker inputReadyTracker; + private InputReadyTracker inputReadyTracker; private final InputStatisticsReporterImpl statsReporter; class InputStatisticsReporterImpl implements InputStatisticsReporter { @@ -161,4 +166,13 @@ public class TezInputContextImpl extends TezTaskContextImpl public InputStatisticsReporter getStatisticsReporter() { return statsReporter; } + + @Override + public void close() throws IOException { + super.close(); + this.userPayload = null; + this.inputReadyTracker = null; + inputs.clear(); + LOG.info("Cleared TezInputContextImpl related information"); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index 4045113..b46cfd2 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import static com.google.common.base.Preconditions.checkNotNull; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -44,16 +45,20 @@ import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.OutputStatisticsReporter; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.common.resources.MemoryDistributor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TezOutputContextImpl extends TezTaskContextImpl implements OutputContext { - private final UserPayload userPayload; + private static final Logger LOG = LoggerFactory.getLogger(TezOutputContextImpl.class); + + private UserPayload userPayload; private final String destinationVertexName; private final EventMetaData sourceInfo; private final int outputIndex; private final OutputStatisticsReporterImpl statsReporter; - + class OutputStatisticsReporterImpl implements OutputStatisticsReporter { @Override @@ -146,4 +151,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl public OutputStatisticsReporter getStatisticsReporter() { return statsReporter; } + + @Override + public void close() throws IOException { + super.close(); + this.userPayload = null; + LOG.info("Cleared TezOutputContextImpl related information"); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index a74ccac..d6b3ec5 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import static com.google.common.base.Preconditions.checkNotNull; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -45,12 +46,16 @@ import org.apache.tez.runtime.api.ObjectRegistry; import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.common.resources.MemoryDistributor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TezProcessorContextImpl extends TezTaskContextImpl implements ProcessorContext { - private final UserPayload userPayload; + private static final Logger LOG = LoggerFactory.getLogger(TezProcessorContextImpl.class); + + private UserPayload userPayload; + private InputReadyTracker inputReadyTracker; private final EventMetaData sourceInfo; - private final InputReadyTracker inputReadyTracker; public TezProcessorContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber, TezUmbilical tezUmbilical, String dagName, String vertexName, @@ -110,4 +115,13 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException { inputReadyTracker.waitForAllInputsReady(inputs); } + + @Override + public void close() throws IOException { + super.close(); + this.userPayload = null; + this.inputReadyTracker = null; + LOG.info("Cleared TezProcessorContextImpl related information"); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java index a156f54..170741a 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java @@ -20,6 +20,8 @@ package org.apache.tez.runtime.api.impl; import static com.google.common.base.Preconditions.checkNotNull; +import java.io.Closeable; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; @@ -35,7 +37,6 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.EntityDescriptor; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; -import org.apache.tez.runtime.RuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.ObjectRegistry; @@ -44,25 +45,24 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor; import com.google.common.base.Preconditions; -public abstract class TezTaskContextImpl implements TaskContext { +public abstract class TezTaskContextImpl implements TaskContext, Closeable { private static final AtomicInteger ID_GEN = new AtomicInteger(10000); - - private final Configuration conf; + protected final String taskVertexName; protected final TezTaskAttemptID taskAttemptID; private final TezCounters counters; private String[] workDirs; private String uniqueIdentifier; - protected final LogicalIOProcessorRuntimeTask runtimeTask; + protected LogicalIOProcessorRuntimeTask runtimeTask; protected final TezUmbilical tezUmbilical; private final Map<String, ByteBuffer> serviceConsumerMetadata; private final int appAttemptNumber; private final Map<String, String> auxServiceEnv; - protected final MemoryDistributor initialMemoryDistributor; + protected MemoryDistributor initialMemoryDistributor; protected final EntityDescriptor<?> descriptor; private final String dagName; - private final ObjectRegistry objectRegistry; + private ObjectRegistry objectRegistry; private final int vertexParallelism; private final ExecutionContext ExecutionContext; private final long memAvailable; @@ -84,7 +84,6 @@ public abstract class TezTaskContextImpl implements TaskContext { checkNotNull(auxServiceEnv, "auxServiceEnv is null"); checkNotNull(memDist, "memDist is null"); checkNotNull(descriptor, "descriptor is null"); - this.conf = conf; this.dagName = dagName; this.taskVertexName = taskVertexName; this.taskAttemptID = taskAttemptID; @@ -223,4 +222,11 @@ public abstract class TezTaskContextImpl implements TaskContext { private int generateId() { return ID_GEN.incrementAndGet(); } + + @Override + public void close() throws IOException { + this.runtimeTask = null; + this.objectRegistry = null; + this.initialMemoryDistributor = null; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/5f63de8e/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index 4d165b5..df932cf 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -19,9 +19,12 @@ package org.apache.tez.runtime; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,6 +41,7 @@ import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.AbstractLogicalOutput; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.Reader; @@ -80,38 +84,78 @@ public class TestLogicalIOProcessorRuntimeTask { umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null, "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory()); - lio1.initialize(); - lio1.run(); - lio1.close(); + try { + lio1.initialize(); + lio1.run(); + lio1.close(); + + // Input should've been started, Output should not have been started + assertEquals(1, TestProcessor.runCount); + assertEquals(1, TestInput.startCount); + assertEquals(0, TestOutput.startCount); + assertEquals(30, TestInput.vertexParallelism); + assertEquals(0, TestOutput.vertexParallelism); + assertEquals(30, lio1.getProcessorContext().getVertexParallelism()); + assertEquals(30, lio1.getInputContexts().iterator().next().getVertexParallelism()); + assertEquals(30, lio1.getOutputContexts().iterator().next().getVertexParallelism()); + } catch(Exception e) { + fail(); + } finally { + cleanupAndTest(lio1); + } + + - // Input should've been started, Output should not have been started - assertEquals(1, TestProcessor.runCount); - assertEquals(1, TestInput.startCount); - assertEquals(0, TestOutput.startCount); - assertEquals(30, TestInput.vertexParallelism); - assertEquals(0, TestOutput.vertexParallelism); - assertEquals(30, lio1.getProcessorContext().getVertexParallelism()); - assertEquals(30, lio1.getInputContexts().iterator().next().getVertexParallelism()); - assertEquals(30, lio1.getOutputContexts().iterator().next().getVertexParallelism()); LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null, umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null, "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory()); - lio2.initialize(); - lio2.run(); - lio2.close(); - - // Input should not have been started again, Output should not have been started - assertEquals(2, TestProcessor.runCount); - assertEquals(1, TestInput.startCount); - assertEquals(0, TestOutput.startCount); - assertEquals(30, TestInput.vertexParallelism); - assertEquals(0, TestOutput.vertexParallelism); - //Check if parallelism is available in processor/ i/p / o/p contexts - assertEquals(10, lio2.getProcessorContext().getVertexParallelism()); - assertEquals(10, lio2.getInputContexts().iterator().next().getVertexParallelism()); - assertEquals(10, lio2.getOutputContexts().iterator().next().getVertexParallelism()); + try { + lio2.initialize(); + lio2.run(); + lio2.close(); + + // Input should not have been started again, Output should not have been started + assertEquals(2, TestProcessor.runCount); + assertEquals(1, TestInput.startCount); + assertEquals(0, TestOutput.startCount); + assertEquals(30, TestInput.vertexParallelism); + assertEquals(0, TestOutput.vertexParallelism); + //Check if parallelism is available in processor/ i/p / o/p contexts + assertEquals(10, lio2.getProcessorContext().getVertexParallelism()); + assertEquals(10, lio2.getInputContexts().iterator().next().getVertexParallelism()); + assertEquals(10, lio2.getOutputContexts().iterator().next().getVertexParallelism()); + } catch(Exception e) { + fail(); + } finally { + cleanupAndTest(lio2); + } + + } + + private void cleanupAndTest(LogicalIOProcessorRuntimeTask lio) { + + lio.cleanup(); + + assertTrue(lio.getProcessorContext().getUserPayload() == null); + assertTrue(lio.getProcessorContext().getObjectRegistry() == null); + + try { + lio.getProcessorContext().waitForAnyInputReady(Collections.<Input>emptyList()); + fail("Processor context should have been already cleanup"); + } catch (Throwable t) { + assertTrue(t instanceof NullPointerException); + } + + try { + lio.getProcessorContext().requestInitialMemory(0, null); + fail("Processor context should have been already cleanup"); + } catch (Throwable t) { + assertTrue(t instanceof NullPointerException); + } + assertTrue(lio.getInputContexts().size() == 0); + assertTrue(lio.getOutputContexts().size() == 0); } private TaskSpec createTaskSpec(TezTaskAttemptID taskAttemptID,
