Repository: tez Updated Branches: refs/heads/master c8ef2442d -> bd9b8d951
http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index 3e0f6ea..41eb9a4 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import com.google.protobuf.ByteString; + import org.apache.commons.lang.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,12 +56,13 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezOutputContextImpl; import org.apache.tez.runtime.api.impl.TezUmbilical; import org.apache.tez.runtime.common.resources.MemoryDistributor; @@ -72,6 +74,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovem import org.apache.tez.runtime.library.testutils.KVDataGen; import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -87,6 +90,8 @@ public class TestOnFileUnorderedKVOutput { private static Path workDir = null; private static final int shufflePort = 2112; + TaskStatistics stats; + static { defaultConf.set("fs.defaultFS", "file:///"); try { @@ -102,6 +107,7 @@ public class TestOnFileUnorderedKVOutput { @Before public void setup() throws Exception { + stats = new TaskStatistics(); localFs.mkdirs(workDir); } @@ -126,12 +132,13 @@ public class TestOnFileUnorderedKVOutput { assertTrue(events != null && events.size() == 0); KeyValueWriter kvWriter = kvOutput.getWriter(); - List<KVPair> data = KVDataGen.generateTestData(true); + List<KVPair> data = KVDataGen.generateTestData(true, 0); for (KVPair kvp : data) { kvWriter.write(kvp.getKey(), kvp.getvalue()); } events = kvOutput.close(); + assertEquals(45, stats.getIOStatistics().values().iterator().next().getDataSize()); assertTrue(events != null && events.size() == 1); CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0); @@ -205,9 +212,9 @@ public class TestOnFileUnorderedKVOutput { TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1); TezCounters counters = new TezCounters(); UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); - RuntimeTask runtimeTask = mock(RuntimeTask.class); + LogicalIOProcessorRuntimeTask runtimeTask = mock(LogicalIOProcessorRuntimeTask.class); when(runtimeTask.addAndGetTezCounter(destinationVertexName)).thenReturn(counters); - + when(runtimeTask.getTaskStatistics()).thenReturn(stats); Map<String, String> auxEnv = new HashMap<String, String>(); ByteBuffer bb = ByteBuffer.allocate(4); @@ -225,6 +232,9 @@ public class TestOnFileUnorderedKVOutput { null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null, new ExecutionContextImpl("localhost"), 2048); verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName); + verify(runtimeTask, times(1)).getTaskStatistics(); + // verify output stats object got created + Assert.assertTrue(stats.getIOStatistics().containsKey(destinationVertexName)); OutputContext outputContext = spy(realOutputContext); doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable {
