TEZ-2952. NPE in TestOnFileUnorderedKVOutput (bikas)
(cherry picked from commit 4561b82524ca6ee484910349a6c95703883757b6)

Conflicts:
        CHANGES.txt
        
tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a625b9f0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a625b9f0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a625b9f0

Branch: refs/heads/branch-0.7
Commit: a625b9f02723669678f30e76e5fcd6e700a14158
Parents: 95865b2
Author: Bikas Saha <[email protected]>
Authored: Thu Nov 19 10:17:22 2015 -0800
Committer: Bikas Saha <[email protected]>
Committed: Mon Dec 14 14:54:28 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/api/TezConfiguration.java    |  1 -
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  6 ++--
 .../runtime/api/impl/TestProcessorContext.java  | 24 +++++++++------
 .../output/TestOnFileUnorderedKVOutput.java     | 31 ++++++++++++--------
 5 files changed, 38 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a625b9f0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ffa0c5f..9f9868c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES
   TEZ-2979. FlakyTest: org.apache.tez.history.TestHistoryParser.
+  TEZ-2952. NPE in TestOnFileUnorderedKVOutput
   TEZ-808. Handle task attempts that are not making progress
   TEZ-2684. ShuffleVertexManager.parsePartitionStats throws 
IllegalStateException: Stats should be initialized.
   TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the 
partition sizes from the source.

http://git-wip-us.apache.org/repos/asf/tez/blob/a625b9f0/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 782bbe0..a001735 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -651,7 +651,6 @@ public class TezConfiguration extends Configuration {
    * A config value <=0 disables this.
    */
   @ConfigurationScope(Scope.VERTEX)
-  @ConfigurationProperty
   public static final String TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS = 
TEZ_TASK_PREFIX + 
     "progress.stuck.interval-ms";
   public static final long TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT = -1;

http://git-wip-us.apache.org/repos/asf/tez/blob/a625b9f0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 9d935f6..4370f50 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -787,7 +787,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskCommunicatorManagerInterface taListener = 
createMockTaskAttemptListener();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -807,9 +807,9 @@ public class TestTaskAttempt {
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
 
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), 
mock(TaskCommunicatorManagerInterface.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
-    containers.addContainerIfNew(container, 0, 0, 0);
+    containers.addContainerIfNew(container);
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();

http://git-wip-us.apache.org/repos/asf/tez/blob/a625b9f0/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
index ff94e7f..f2f9281 100644
--- 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
+++ 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
@@ -16,17 +16,17 @@ package org.apache.tez.runtime.api.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -43,7 +43,7 @@ import org.junit.Test;
 public class TestProcessorContext {
 
   @Test (timeout = 5000)
-  public void testDagNumber() {
+  public void testDagNumber() throws IOException {
     String[] localDirs = new String[] {"dummyLocalDir"};
     int appAttemptNumber = 1;
     TezUmbilical tezUmbilical = mock(TezUmbilical.class);
@@ -57,8 +57,14 @@ public class TestProcessorContext {
     TezTaskID taskId = TezTaskID.getInstance(vertexId, 4);
     TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 2);
 
-    LogicalIOProcessorRuntimeTask runtimeTask = 
mock(LogicalIOProcessorRuntimeTask.class);
-    doReturn(new 
TezCounters()).when(runtimeTask).addAndGetTezCounter(any(String.class));
+    TaskSpec mockSpec = mock(TaskSpec.class);
+    
when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
+    
when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
+    LogicalIOProcessorRuntimeTask runtimeTask = new 
LogicalIOProcessorRuntimeTask(
+        mockSpec, 1, 
+        new Configuration(), new String[]{"/"}, 
+        tezUmbilical, null, null, null, null, "", null, 1024);
+    LogicalIOProcessorRuntimeTask mockTask = spy(runtimeTask);
     Map<String, ByteBuffer> serviceConsumerMetadata = Maps.newHashMap();
     Map<String, String> auxServiceEnv = Maps.newHashMap();
     MemoryDistributor memDist = mock(MemoryDistributor.class);
@@ -96,9 +102,9 @@ public class TestProcessorContext {
     assertEquals(vertexName, procContext.getTaskVertexName());
     assertEquals(vertexId.getId(), procContext.getTaskVertexIndex());
     assertTrue(Arrays.equals(localDirs, procContext.getWorkDirs()));
-    
-    // test auto call of notifyProgress
-    procContext.setProgress(0.1f);
-    verify(runtimeTask, times(1)).notifyProgressInvocation();
+
+     // test auto call of notifyProgress
+     procContext.setProgress(0.1f);
+     verify(mockTask, times(1)).notifyProgressInvocation();
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a625b9f0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 2b25daf..58be59e 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.assertFalse;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -49,7 +50,6 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezDAGID;
@@ -62,7 +62,9 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.api.impl.TaskStatistics;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
@@ -91,7 +93,7 @@ public class TestOnFileUnorderedKVOutput {
   private static Path workDir = null;
   private static final int shufflePort = 2112;
 
-  TaskStatistics stats;
+  LogicalIOProcessorRuntimeTask task;
 
   static {
     defaultConf.set("fs.defaultFS", "file:///");
@@ -108,7 +110,6 @@ public class TestOnFileUnorderedKVOutput {
 
   @Before
   public void setup() throws Exception {
-    stats = new TaskStatistics();
     localFs.mkdirs(workDir);
   }
 
@@ -139,8 +140,8 @@ public class TestOnFileUnorderedKVOutput {
     }
 
     events = kvOutput.close();
-    assertEquals(45, 
stats.getIOStatistics().values().iterator().next().getDataSize());
-    assertEquals(5, 
stats.getIOStatistics().values().iterator().next().getItemsProcessed());
+    assertEquals(45, 
task.getTaskStatistics().getIOStatistics().values().iterator().next().getDataSize());
+    assertEquals(5, 
task.getTaskStatistics().getIOStatistics().values().iterator().next().getItemsProcessed());
     assertTrue(events != null && events.size() == 1);
     CompositeDataMovementEvent dmEvent = 
(CompositeDataMovementEvent)events.get(0);
 
@@ -212,12 +213,18 @@ public class TestOnFileUnorderedKVOutput {
     TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
     TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
-    TezCounters counters = new TezCounters();
     UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-    LogicalIOProcessorRuntimeTask runtimeTask = 
mock(LogicalIOProcessorRuntimeTask.class);
-    
when(runtimeTask.addAndGetTezCounter(destinationVertexName)).thenReturn(counters);
-    when(runtimeTask.getTaskStatistics()).thenReturn(stats);
-
+    
+    TaskSpec mockSpec = mock(TaskSpec.class);
+    
when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
+    
when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
+    task = new LogicalIOProcessorRuntimeTask(
+        mockSpec, appAttemptNumber, 
+        new Configuration(), new String[]{"/"}, 
+        tezUmbilical, null, null, null, null, "", null, 1024);
+    
+    LogicalIOProcessorRuntimeTask runtimeTask = spy(task);
+    
     Map<String, String> auxEnv = new HashMap<String, String>();
     ByteBuffer bb = ByteBuffer.allocate(4);
     bb.putInt(shufflePort);
@@ -236,7 +243,7 @@ public class TestOnFileUnorderedKVOutput {
     verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName);
     verify(runtimeTask, times(1)).getTaskStatistics();
     // verify output stats object got created
-    
Assert.assertTrue(stats.getIOStatistics().containsKey(destinationVertexName));
+    
Assert.assertTrue(task.getTaskStatistics().getIOStatistics().containsKey(destinationVertexName));
     OutputContext outputContext = spy(realOutputContext);
     doAnswer(new Answer() {
       @Override public Object answer(InvocationOnMock invocation) throws 
Throwable {

Reply via email to