Repository: tez Updated Branches: refs/heads/master c07f284ac -> 4561b8252
TEZ-2952. NPE in TestOnFileUnorderedKVOutput (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4561b825 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4561b825 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4561b825 Branch: refs/heads/master Commit: 4561b82524ca6ee484910349a6c95703883757b6 Parents: c07f284 Author: Bikas Saha <[email protected]> Authored: Thu Nov 19 10:17:22 2015 -0800 Committer: Bikas Saha <[email protected]> Committed: Thu Nov 19 10:17:22 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../runtime/api/impl/TestProcessorContext.java | 20 +++++++++---- .../output/TestOnFileUnorderedKVOutput.java | 31 ++++++++++++-------- 3 files changed, 35 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4561b825/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ea88aff..9644a1e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES TEZ-2948. Stop using dagName in the dagComplete notification to TaskCommunicators. ALL CHANGES: + TEZ-2952. NPE in TestOnFileUnorderedKVOutput TEZ-2480. Exception when closing output is ignored. TEZ-2944. NPE in TestProcessorContext. TEZ-2945. TEZ-2740 addendum to update API with currently supported parameters http://git-wip-us.apache.org/repos/asf/tez/blob/4561b825/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java index 40bc257..f0c1e66 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java @@ -16,17 +16,17 @@ package org.apache.tez.runtime.api.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.Map; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -43,7 +43,7 @@ import org.junit.Test; public class TestProcessorContext { @Test (timeout = 5000) - public void testDagNumber() { + public void testDagNumber() throws IOException { String[] localDirs = new String[] {"dummyLocalDir"}; int appAttemptNumber = 1; TezUmbilical tezUmbilical = mock(TezUmbilical.class); @@ -57,8 +57,14 @@ public class TestProcessorContext { TezTaskID taskId = TezTaskID.getInstance(vertexId, 4); TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 2); - LogicalIOProcessorRuntimeTask runtimeTask = mock(LogicalIOProcessorRuntimeTask.class); - doReturn(new TezCounters()).when(runtimeTask).addAndGetTezCounter(any(String.class)); + TaskSpec mockSpec = mock(TaskSpec.class); + when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class))); + when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class))); + LogicalIOProcessorRuntimeTask runtimeTask = new LogicalIOProcessorRuntimeTask( + mockSpec, 1, + new Configuration(), new String[]{"/"}, + tezUmbilical, null, null, null, null, "", null, 1024, false); + LogicalIOProcessorRuntimeTask mockTask = spy(runtimeTask); Map<String, ByteBuffer> serviceConsumerMetadata = Maps.newHashMap(); Map<String, String> auxServiceEnv = Maps.newHashMap(); MemoryDistributor memDist = mock(MemoryDistributor.class); @@ -96,5 +102,9 @@ public class TestProcessorContext { assertEquals(vertexName, procContext.getTaskVertexName()); assertEquals(vertexId.getId(), procContext.getTaskVertexIndex()); assertTrue(Arrays.equals(localDirs, procContext.getWorkDirs())); + + // test auto call of notifyProgress + procContext.setProgress(0.1f); + verify(mockTask, times(1)).notifyProgressInvocation(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/4561b825/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index 2b25daf..32a3619 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -33,6 +33,7 @@ import static org.junit.Assert.assertFalse; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,7 +50,6 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.tez.common.TezUtils; -import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.records.TezDAGID; @@ -62,7 +62,9 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; -import org.apache.tez.runtime.api.impl.TaskStatistics; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezOutputContextImpl; import org.apache.tez.runtime.api.impl.TezUmbilical; import org.apache.tez.runtime.common.resources.MemoryDistributor; @@ -91,7 +93,7 @@ public class TestOnFileUnorderedKVOutput { private static Path workDir = null; private static final int shufflePort = 2112; - TaskStatistics stats; + LogicalIOProcessorRuntimeTask task; static { defaultConf.set("fs.defaultFS", "file:///"); @@ -108,7 +110,6 @@ public class TestOnFileUnorderedKVOutput { @Before public void setup() throws Exception { - stats = new TaskStatistics(); localFs.mkdirs(workDir); } @@ -139,8 +140,8 @@ public class TestOnFileUnorderedKVOutput { } events = kvOutput.close(); - assertEquals(45, stats.getIOStatistics().values().iterator().next().getDataSize()); - assertEquals(5, stats.getIOStatistics().values().iterator().next().getItemsProcessed()); + assertEquals(45, task.getTaskStatistics().getIOStatistics().values().iterator().next().getDataSize()); + assertEquals(5, task.getTaskStatistics().getIOStatistics().values().iterator().next().getItemsProcessed()); assertTrue(events != null && events.size() == 1); CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0); @@ -212,12 +213,18 @@ public class TestOnFileUnorderedKVOutput { TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1); - TezCounters counters = new TezCounters(); UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); - LogicalIOProcessorRuntimeTask runtimeTask = mock(LogicalIOProcessorRuntimeTask.class); - when(runtimeTask.addAndGetTezCounter(destinationVertexName)).thenReturn(counters); - when(runtimeTask.getTaskStatistics()).thenReturn(stats); - + + TaskSpec mockSpec = mock(TaskSpec.class); + when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class))); + when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class))); + task = new LogicalIOProcessorRuntimeTask( + mockSpec, appAttemptNumber, + new Configuration(), new String[]{"/"}, + tezUmbilical, null, null, null, null, "", null, 1024, false); + + LogicalIOProcessorRuntimeTask runtimeTask = spy(task); + Map<String, String> auxEnv = new HashMap<String, String>(); ByteBuffer bb = ByteBuffer.allocate(4); bb.putInt(shufflePort); @@ -236,7 +243,7 @@ public class TestOnFileUnorderedKVOutput { verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName); verify(runtimeTask, times(1)).getTaskStatistics(); // verify output stats object got created - Assert.assertTrue(stats.getIOStatistics().containsKey(destinationVertexName)); + Assert.assertTrue(task.getTaskStatistics().getIOStatistics().containsKey(destinationVertexName)); OutputContext outputContext = spy(realOutputContext); doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable {
