Repository: tez
Updated Branches:
refs/heads/branch-0.7 f5c87c1ef -> c033a7b0a
TEZ-2987. TestVertexImpl.testTez2684 fails (bikas)
(cherry picked from commit c9741492056758e3cac1a3d34cd1c7cbd7d9ddd4)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c033a7b0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c033a7b0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c033a7b0
Branch: refs/heads/branch-0.7
Commit: c033a7b0af166e885893713df935a3677c7cddc5
Parents: f5c87c1
Author: Bikas Saha <[email protected]>
Authored: Sun Dec 13 19:48:17 2015 -0800
Committer: Bikas Saha <[email protected]>
Committed: Sun Dec 13 19:56:20 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/TestVertexImpl.java | 4 ++--
.../vertexmanager/ShuffleVertexManager.java | 17 ++++++++++++++++
.../vertexmanager/TestShuffleVertexManager.java | 21 +++++++++++++++++++-
4 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c033a7b0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 50d0b67..09e4666 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES
TEZ-2684. ShuffleVertexManager.parsePartitionStats throws
IllegalStateException: Stats should be initialized.
TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the
partition sizes from the source.
+ TEZ-2987. TestVertexImpl.testTez2684 fails
TEZ-2995. Timeline primary filter should only be on callerId and not type.
TEZ-2599. Dont send obsoleted data movement events to tasks
TEZ-2943. Change shuffle vertex manager to use per vertex data for auto
http://git-wip-us.apache.org/repos/asf/tez/blob/c033a7b0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index fe540e8..f0a8625 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -5930,11 +5930,11 @@ public class TestVertexImpl {
//Send VertexManagerEvent
long[] sizes = new long[]{(100 * 1000l * 1000l)};
- Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "C");
+ Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "B");
TezTaskAttemptID taId = TezTaskAttemptID.getInstance(
TezTaskID.getInstance(vC.getVertexId(), 1), 1);
- EventMetaData sourceInfo = new
EventMetaData(EventProducerConsumerType.INPUT, "C", "C", taId);
+ EventMetaData sourceInfo = new
EventMetaData(EventProducerConsumerType.INPUT, "B", "C", taId);
TezEvent tezEvent = new TezEvent(vmEvent, sourceInfo);
dispatcher.getEventHandler().handle(new
VertexEventRouteEvent(vC.getVertexId(),
Lists.newArrayList(tezEvent)));
http://git-wip-us.apache.org/repos/asf/tez/blob/c033a7b0/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index c88c7a2..410ad73 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -145,6 +145,7 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
int numBipartiteSourceTasksCompleted = 0;
int numVertexManagerEventsReceived = 0;
List<PendingTaskInfo> pendingTasks = Lists.newLinkedList();
+ List<VertexManagerEvent> pendingVMEvents = Lists.newLinkedList();
int totalTasksToSchedule = 0;
private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
@@ -501,10 +502,16 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
if(bipartiteSources == 0) {
throw new TezUncheckedException("Atleast 1 bipartite source should
exist");
}
+
for (VertexStateUpdate stateUpdate : pendingStateUpdates) {
handleVertexStateUpdate(stateUpdate);
}
pendingStateUpdates.clear();
+
+ for (VertexManagerEvent vmEvent : pendingVMEvents) {
+ handleVertexManagerEvent(vmEvent);
+ }
+ pendingVMEvents.clear();
// track the tasks in this vertex
updatePendingTasks();
@@ -567,6 +574,16 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
@Override
public synchronized void onVertexManagerEventReceived(VertexManagerEvent
vmEvent) {
+ if (onVertexStartedDone.get()) {
+ // internal data structures have been initialized - so handle the events
directly
+ handleVertexManagerEvent(vmEvent);
+ } else {
+ // save this event for processing after vertex starts
+ pendingVMEvents.add(vmEvent);
+ }
+ }
+
+ private void handleVertexManagerEvent(VertexManagerEvent vmEvent) {
// currently events from multiple attempts of the same task can be ignored
because
// their output will be the same. However, with pipelined events that may
not hold.
TaskIdentifier producerTask =
vmEvent.getProducerAttemptIdentifier().getTaskIdentifier();
http://git-wip-us.apache.org/repos/asf/tez/blob/c033a7b0/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 9d53ebc..9c21aed 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -307,6 +307,25 @@ public class TestShuffleVertexManager {
Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
/**
+ * Test vmEvent and vertexStatusUpdate before started
+ */
+ scheduledTasks.clear();
+ //{5,9,12,18} in bitmap
+ vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+
+ manager = createManager(conf, mockContext, 0.01f, 0.75f);
+ Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+ Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+
+ TezTaskAttemptID taId1 =
TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
+ vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag",
mockSrcVertexId1, taId1));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
+ manager.onVertexManagerEventReceived(vmEvent);
+ Assert.assertEquals(0, manager.numVertexManagerEventsReceived); // nothing
happens
+ manager.onVertexStarted(emptyCompletions); // now the processing happens
+ Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+
+ /**
* Test partition stats
*/
scheduledTasks.clear();
@@ -320,7 +339,7 @@ public class TestShuffleVertexManager {
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
- TezTaskAttemptID taId1 =
TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
+ taId1 =
TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag",
mockSrcVertexId1, taId1));
manager.onVertexManagerEventReceived(vmEvent);
Assert.assertEquals(1, manager.numVertexManagerEventsReceived);