TEZ-2952. NPE in TestOnFileUnorderedKVOutput (bikas)
(cherry picked from commit 4561b82524ca6ee484910349a6c95703883757b6)
Conflicts:
CHANGES.txt
tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a625b9f0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a625b9f0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a625b9f0
Branch: refs/heads/branch-0.7
Commit: a625b9f02723669678f30e76e5fcd6e700a14158
Parents: 95865b2
Author: Bikas Saha <[email protected]>
Authored: Thu Nov 19 10:17:22 2015 -0800
Committer: Bikas Saha <[email protected]>
Committed: Mon Dec 14 14:54:28 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 1 -
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 6 ++--
.../runtime/api/impl/TestProcessorContext.java | 24 +++++++++------
.../output/TestOnFileUnorderedKVOutput.java | 31 ++++++++++++--------
5 files changed, 38 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/a625b9f0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ffa0c5f..9f9868c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES
TEZ-2979. FlakyTest: org.apache.tez.history.TestHistoryParser.
+ TEZ-2952. NPE in TestOnFileUnorderedKVOutput
TEZ-808. Handle task attempts that are not making progress
TEZ-2684. ShuffleVertexManager.parsePartitionStats throws
IllegalStateException: Stats should be initialized.
TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the
partition sizes from the source.
http://git-wip-us.apache.org/repos/asf/tez/blob/a625b9f0/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 782bbe0..a001735 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -651,7 +651,6 @@ public class TezConfiguration extends Configuration {
* A config value <=0 disables this.
*/
@ConfigurationScope(Scope.VERTEX)
- @ConfigurationProperty
public static final String TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS =
TEZ_TASK_PREFIX +
"progress.stuck.interval-ms";
public static final long TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT = -1;
http://git-wip-us.apache.org/repos/asf/tez/blob/a625b9f0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 9d935f6..4370f50 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -787,7 +787,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskCommunicatorManagerInterface taListener =
createMockTaskAttemptListener();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -807,9 +807,9 @@ public class TestTaskAttempt {
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class),
mock(TaskCommunicatorManagerInterface.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
- containers.addContainerIfNew(container, 0, 0, 0);
+ containers.addContainerIfNew(container);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
http://git-wip-us.apache.org/repos/asf/tez/blob/a625b9f0/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
index ff94e7f..f2f9281 100644
---
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
+++
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
@@ -16,17 +16,17 @@ package org.apache.tez.runtime.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Map;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -43,7 +43,7 @@ import org.junit.Test;
public class TestProcessorContext {
@Test (timeout = 5000)
- public void testDagNumber() {
+ public void testDagNumber() throws IOException {
String[] localDirs = new String[] {"dummyLocalDir"};
int appAttemptNumber = 1;
TezUmbilical tezUmbilical = mock(TezUmbilical.class);
@@ -57,8 +57,14 @@ public class TestProcessorContext {
TezTaskID taskId = TezTaskID.getInstance(vertexId, 4);
TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 2);
- LogicalIOProcessorRuntimeTask runtimeTask =
mock(LogicalIOProcessorRuntimeTask.class);
- doReturn(new
TezCounters()).when(runtimeTask).addAndGetTezCounter(any(String.class));
+ TaskSpec mockSpec = mock(TaskSpec.class);
+
when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
+
when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
+ LogicalIOProcessorRuntimeTask runtimeTask = new
LogicalIOProcessorRuntimeTask(
+ mockSpec, 1,
+ new Configuration(), new String[]{"/"},
+ tezUmbilical, null, null, null, null, "", null, 1024);
+ LogicalIOProcessorRuntimeTask mockTask = spy(runtimeTask);
Map<String, ByteBuffer> serviceConsumerMetadata = Maps.newHashMap();
Map<String, String> auxServiceEnv = Maps.newHashMap();
MemoryDistributor memDist = mock(MemoryDistributor.class);
@@ -96,9 +102,9 @@ public class TestProcessorContext {
assertEquals(vertexName, procContext.getTaskVertexName());
assertEquals(vertexId.getId(), procContext.getTaskVertexIndex());
assertTrue(Arrays.equals(localDirs, procContext.getWorkDirs()));
-
- // test auto call of notifyProgress
- procContext.setProgress(0.1f);
- verify(runtimeTask, times(1)).notifyProgressInvocation();
+
+ // test auto call of notifyProgress
+ procContext.setProgress(0.1f);
+ verify(mockTask, times(1)).notifyProgressInvocation();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a625b9f0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 2b25daf..58be59e 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -49,7 +50,6 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezDAGID;
@@ -62,7 +62,9 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.api.impl.TaskStatistics;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.resources.MemoryDistributor;
@@ -91,7 +93,7 @@ public class TestOnFileUnorderedKVOutput {
private static Path workDir = null;
private static final int shufflePort = 2112;
- TaskStatistics stats;
+ LogicalIOProcessorRuntimeTask task;
static {
defaultConf.set("fs.defaultFS", "file:///");
@@ -108,7 +110,6 @@ public class TestOnFileUnorderedKVOutput {
@Before
public void setup() throws Exception {
- stats = new TaskStatistics();
localFs.mkdirs(workDir);
}
@@ -139,8 +140,8 @@ public class TestOnFileUnorderedKVOutput {
}
events = kvOutput.close();
- assertEquals(45,
stats.getIOStatistics().values().iterator().next().getDataSize());
- assertEquals(5,
stats.getIOStatistics().values().iterator().next().getItemsProcessed());
+ assertEquals(45,
task.getTaskStatistics().getIOStatistics().values().iterator().next().getDataSize());
+ assertEquals(5,
task.getTaskStatistics().getIOStatistics().values().iterator().next().getItemsProcessed());
assertTrue(events != null && events.size() == 1);
CompositeDataMovementEvent dmEvent =
(CompositeDataMovementEvent)events.get(0);
@@ -212,12 +213,18 @@ public class TestOnFileUnorderedKVOutput {
TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
- TezCounters counters = new TezCounters();
UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
- LogicalIOProcessorRuntimeTask runtimeTask =
mock(LogicalIOProcessorRuntimeTask.class);
-
when(runtimeTask.addAndGetTezCounter(destinationVertexName)).thenReturn(counters);
- when(runtimeTask.getTaskStatistics()).thenReturn(stats);
-
+
+ TaskSpec mockSpec = mock(TaskSpec.class);
+
when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
+
when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
+ task = new LogicalIOProcessorRuntimeTask(
+ mockSpec, appAttemptNumber,
+ new Configuration(), new String[]{"/"},
+ tezUmbilical, null, null, null, null, "", null, 1024);
+
+ LogicalIOProcessorRuntimeTask runtimeTask = spy(task);
+
Map<String, String> auxEnv = new HashMap<String, String>();
ByteBuffer bb = ByteBuffer.allocate(4);
bb.putInt(shufflePort);
@@ -236,7 +243,7 @@ public class TestOnFileUnorderedKVOutput {
verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName);
verify(runtimeTask, times(1)).getTaskStatistics();
// verify output stats object got created
-
Assert.assertTrue(stats.getIOStatistics().containsKey(destinationVertexName));
+
Assert.assertTrue(task.getTaskStatistics().getIOStatistics().containsKey(destinationVertexName));
OutputContext outputContext = spy(realOutputContext);
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws
Throwable {