TEZ-2265. All inputs/outputs in a task share the same counter object (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/09a96088 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/09a96088 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/09a96088 Branch: refs/heads/TEZ-2003 Commit: 09a96088c94008e68fdbbffe0045d548fa2c57f5 Parents: 212de07 Author: Bikas Saha <[email protected]> Authored: Thu Apr 2 18:15:12 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Thu Apr 2 18:16:39 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../runtime/LogicalIOProcessorRuntimeTask.java | 10 ++++++---- .../java/org/apache/tez/runtime/RuntimeTask.java | 18 +++++++++++++++++- .../tez/runtime/api/impl/TezInputContextImpl.java | 7 ++++--- .../runtime/api/impl/TezOutputContextImpl.java | 7 ++++--- .../runtime/api/impl/TezProcessorContextImpl.java | 4 ++-- .../output/TestOnFileUnorderedKVOutput.java | 5 ++++- .../java/org/apache/tez/test/TestAMRecovery.java | 6 ++++++ .../test/java/org/apache/tez/test/TestInput.java | 3 +++ .../test/java/org/apache/tez/test/TestOutput.java | 2 ++ 10 files changed, 49 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e844f60..a669147 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES TEZ-2176. Move all logging to slf4j. (commons-logging jar no longer part of Tez tar) ALL CHANGES: + TEZ-2265. All inputs/outputs in a task share the same counter object TEZ-2251. Race condition in VertexImpl & Edge causes DAG to hang. TEZ-2264. Remove unused taskUmbilical reference in TezTaskRunner, register as running late. TEZ-2149. Optimizations for the timed version of DAGClient.getStatus. http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 2d27c8c..56b2627 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.RunnableWithNdc; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -85,6 +86,7 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -488,7 +490,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { inputSpec.getSourceVertexName(), taskSpec.getVertexParallelism(), taskSpec.getTaskAttemptID(), - tezCounters, inputIndex, + inputIndex, inputSpec.getInputDescriptor().getUserPayload(), this, serviceConsumerMetadata, envMap, initialMemoryDistributor, inputSpec.getInputDescriptor(), inputMap, inputReadyTracker, objectRegistry, @@ -503,7 +505,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { outputSpec.getDestinationVertexName(), taskSpec.getVertexParallelism(), taskSpec.getTaskAttemptID(), - tezCounters, outputIndex, + outputIndex, outputSpec.getOutputDescriptor().getUserPayload(), this, serviceConsumerMetadata, envMap, initialMemoryDistributor, outputSpec.getOutputDescriptor(), objectRegistry, ExecutionContext, @@ -517,7 +519,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { taskSpec.getDAGName(), taskSpec.getVertexName(), taskSpec.getVertexParallelism(), taskSpec.getTaskAttemptID(), - tezCounters, processorDescriptor.getUserPayload(), this, + processorDescriptor.getUserPayload(), this, serviceConsumerMetadata, envMap, initialMemoryDistributor, processorDescriptor, inputReadyTracker, objectRegistry, ExecutionContext, memAvailable); return processorContext; @@ -694,7 +696,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } public synchronized void cleanup() { - LOG.info("Final Counters : " + tezCounters.toShortString()); + LOG.info("Final Counters : " + getCounters().toShortString()); setTaskDone(); if (eventRouterThread != null) { eventRouterThread.interrupt(); http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index 4dfa936..4777b71 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime; import java.util.Collection; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -31,6 +32,8 @@ import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezUmbilical; import org.apache.tez.runtime.metrics.TaskCounterUpdater; +import com.google.common.collect.Maps; + public abstract class RuntimeTask { protected AtomicBoolean hasFatalError = new AtomicBoolean(false); @@ -38,6 +41,8 @@ public abstract class RuntimeTask { protected String fatalErrorMessage = null; protected float progress; protected final TezCounters tezCounters; + private final Map<String, TezCounters> counterMap = Maps.newConcurrentMap(); + protected final TaskSpec taskSpec; protected final Configuration tezConf; protected final TezUmbilical tezUmbilical; @@ -63,6 +68,12 @@ public abstract class RuntimeTask { protected final AtomicReference<State> state = new AtomicReference<State>(); + public TezCounters addAndGetTezCounter(String name) { + TezCounters counter = new TezCounters(); + counterMap.put(name, counter); + return counter; + } + public String getVertexName() { return taskSpec.getVertexName(); } @@ -90,7 +101,12 @@ public abstract class RuntimeTask { } public TezCounters getCounters() { - return this.tezCounters; + TezCounters fullCounters = new TezCounters(); + fullCounters.incrAllCounters(tezCounters); + for (TezCounters counter : counterMap.values()) { + fullCounters.incrAllCounters(counter); + } + return fullCounters; } public TezTaskAttemptID getTaskAttemptID() { http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index a15e072..bd41aed 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -62,7 +62,7 @@ public class TezInputContextImpl extends TezTaskContextImpl TezUmbilical tezUmbilical, String dagName, String taskVertexName, String sourceVertexName, int vertexParallelism, TezTaskAttemptID taskAttemptID, - TezCounters counters, int inputIndex, @Nullable UserPayload userPayload, + int inputIndex, @Nullable UserPayload userPayload, RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, @@ -70,7 +70,7 @@ public class TezInputContextImpl extends TezTaskContextImpl InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry, ExecutionContext ExecutionContext, long memAvailable) { super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, - vertexParallelism, taskAttemptID, wrapCounters(counters, + vertexParallelism, taskAttemptID, wrapCounters(runtimeTask, taskVertexName, sourceVertexName, conf), runtimeTask, tezUmbilical, serviceConsumerMetadata, auxServiceEnv, memDist, inputDescriptor, objectRegistry, ExecutionContext, memAvailable); @@ -88,8 +88,9 @@ public class TezInputContextImpl extends TezTaskContextImpl this.inputReadyTracker = inputReadyTracker; } - private static TezCounters wrapCounters(TezCounters tezCounters, String taskVertexName, + private static TezCounters wrapCounters(RuntimeTask task, String taskVertexName, String edgeVertexName, Configuration conf) { + TezCounters tezCounters = task.addAndGetTezCounter(edgeVertexName); if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO_DEFAULT)) { return new TezCountersDelegate(tezCounters, taskVertexName, edgeVertexName, "INPUT"); http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index d376b88..8d758f0 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -58,7 +58,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl String taskVertexName, String destinationVertexName, int vertexParallelism, - TezTaskAttemptID taskAttemptID, TezCounters counters, int outputIndex, + TezTaskAttemptID taskAttemptID, int outputIndex, @Nullable UserPayload userPayload, RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, @@ -66,7 +66,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl ExecutionContext ExecutionContext, long memAvailable) { super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, vertexParallelism, taskAttemptID, - wrapCounters(counters, taskVertexName, destinationVertexName, conf), + wrapCounters(runtimeTask, taskVertexName, destinationVertexName, conf), runtimeTask, tezUmbilical, serviceConsumerMetadata, auxServiceEnv, memDist, outputDescriptor, objectRegistry, ExecutionContext, memAvailable); checkNotNull(outputIndex, "outputIndex is null"); @@ -78,8 +78,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl taskVertexName, destinationVertexName, taskAttemptID); } - private static TezCounters wrapCounters(TezCounters tezCounters, String taskVertexName, + private static TezCounters wrapCounters(RuntimeTask runtimeTask, String taskVertexName, String edgeVertexName, Configuration conf) { + TezCounters tezCounters = runtimeTask.addAndGetTezCounter(edgeVertexName); if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO_DEFAULT)) { return new TezCountersDelegate(tezCounters, taskVertexName, edgeVertexName, "OUTPUT"); http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index 16f9a45..edfd8c9 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -54,14 +54,14 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce public TezProcessorContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber, TezUmbilical tezUmbilical, String dagName, String vertexName, - int vertexParallelism, TezTaskAttemptID taskAttemptID, TezCounters counters, + int vertexParallelism, TezTaskAttemptID taskAttemptID, @Nullable UserPayload userPayload, RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry, ExecutionContext ExecutionContext, long memAvailable) { super(conf, workDirs, appAttemptNumber, dagName, vertexName, vertexParallelism, taskAttemptID, - counters, runtimeTask, tezUmbilical, serviceConsumerMetadata, + runtimeTask.addAndGetTezCounter(vertexName), runtimeTask, tezUmbilical, serviceConsumerMetadata, auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext, memAvailable); checkNotNull(inputReadyTracker, "inputReadyTracker is null"); this.userPayload = userPayload; http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index 509c23d..3e0f6ea 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.junit.Assert.assertEquals; @@ -205,6 +206,7 @@ public class TestOnFileUnorderedKVOutput { TezCounters counters = new TezCounters(); UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); RuntimeTask runtimeTask = mock(RuntimeTask.class); + when(runtimeTask.addAndGetTezCounter(destinationVertexName)).thenReturn(counters); Map<String, String> auxEnv = new HashMap<String, String>(); @@ -219,9 +221,10 @@ public class TestOnFileUnorderedKVOutput { OutputContext realOutputContext = new TezOutputContextImpl(conf, new String[] {workDir.toString()}, appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName, - -1, taskAttemptID, counters, 0, userPayload, runtimeTask, + -1, taskAttemptID, 0, userPayload, runtimeTask, null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null, new ExecutionContextImpl("localhost"), 2048); + verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName); OutputContext outputContext = spy(realOutputContext); doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java index 66d8373..9e0c02f 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java @@ -40,6 +40,7 @@ import org.apache.tez.client.TezClientUtils; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.common.counters.DAGCounter; +import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; @@ -363,6 +364,11 @@ public class TestAMRecovery { TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED); assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue()); assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue()); + TezCounter outputCounter = counters.findCounter(TestOutput.COUNTER_NAME, TestOutput.COUNTER_NAME); + TezCounter inputCounter = counters.findCounter(TestInput.COUNTER_NAME, TestInput.COUNTER_NAME); + // verify that processor, input and output counters, are all being collected + Assert.assertTrue(outputCounter.getValue() > 0); + Assert.assertTrue(inputCounter.getValue() > 0); List<HistoryEvent> historyEvents1 = readRecoveryLog(1); List<HistoryEvent> historyEvents2 = readRecoveryLog(2); http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-tests/src/test/java/org/apache/tez/test/TestInput.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java index 68ac8e0..eeb565c 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java @@ -57,6 +57,8 @@ public class TestInput extends AbstractLogicalInput { private static final Logger LOG = LoggerFactory .getLogger(TestInput.class); + public static final String COUNTER_NAME = "TestInput"; + Configuration conf; int numCompletedInputs = 0; int[] completedInputVersion; @@ -369,6 +371,7 @@ public class TestInput extends AbstractLogicalInput { @Override public List<Event> close() throws Exception { + getContext().getCounters().findCounter(COUNTER_NAME, COUNTER_NAME).increment(1);; return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java index 737f8e7..69dea7e 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java @@ -37,6 +37,7 @@ import com.google.common.collect.Lists; public class TestOutput extends AbstractLogicalOutput { private static final Logger LOG = LoggerFactory.getLogger(TestOutput.class); + public static final String COUNTER_NAME = "TestOutput"; public TestOutput(OutputContext outputContext, int numPhysicalOutputs) { super(outputContext, numPhysicalOutputs); } @@ -77,6 +78,7 @@ public class TestOutput extends AbstractLogicalOutput { @Override public List<Event> close() throws Exception { LOG.info("Sending data movement event with value: " + output); + getContext().getCounters().findCounter(COUNTER_NAME, COUNTER_NAME).increment(1);; ByteBuffer result = ByteBuffer.allocate(4).putInt(output); result.flip(); List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs());
