Repository: tez
Updated Branches:
refs/heads/branch-0.6 7fe2dc592 -> 1d83ece47
TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in
recovery log (zjffdu)
(cherry picked from commit 3894c5ec6b707d7fff6381091fbbdf05c89f0f81)
Conflicts:
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1d83ece4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1d83ece4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1d83ece4
Branch: refs/heads/branch-0.6
Commit: 1d83ece47a2c83bf16579923613cf61652159135
Parents: 7fe2dc5
Author: Jeff Zhang <[email protected]>
Authored: Thu Apr 30 11:15:53 2015 +0800
Committer: Jeff Zhang <[email protected]>
Committed: Thu Apr 30 12:50:49 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 11 +++--
.../tez/dag/app/dag/impl/TestVertexImpl.java | 48 ++++++++++++++++++++
3 files changed, 55 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1d83ece4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af7612e..991e82a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -184,6 +184,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in
recovery log
TEZ-2348. EOF exception during UnorderedKVReader.next().
TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in
recovery.
TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job
output-path
http://git-wip-us.apache.org/repos/asf/tez/blob/1d83ece4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b01d05c..27a6eed 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1249,7 +1249,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex,
if (!pendingTaskEvents.isEmpty()) {
LOG.info("Routing pending task events for vertex: " + logIdentifier);
try {
- handleRoutedTezEvents(this, pendingTaskEvents, false);
+ handleRoutedTezEvents(this, pendingTaskEvents, false, true);
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() +", vertex=" +
logIdentifier;
LOG.error(msg, e);
@@ -2862,7 +2862,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex,
vertex.recoveredEvents.clear();
if (!vertex.pendingRouteEvents.isEmpty()) {
try {
- handleRoutedTezEvents(vertex, vertex.pendingRouteEvents, false);
+ handleRoutedTezEvents(vertex, vertex.pendingRouteEvents, false,
true);
vertex.pendingRouteEvents.clear();
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex=" +
vertex.getLogIdentifier();
@@ -3091,7 +3091,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex,
vertex.getAdditionalInputs().get(liInitEvent.getInputName())
.getIODescriptor(), liInitEvent.getEvents());
if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
- VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false);
+ VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false,
false);
}
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex:" +
vertex.getLogIdentifier();
@@ -3679,7 +3679,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex,
boolean recovered = rEvent.isRecovered();
List<TezEvent> tezEvents = rEvent.getEvents();
try {
- VertexImpl.handleRoutedTezEvents(vertex, tezEvents, recovered);
+ VertexImpl.handleRoutedTezEvents(vertex, tezEvents, recovered, false);
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex=" +
vertex.getLogIdentifier();
LOG.error(msg, e);
@@ -3697,9 +3697,10 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex,
}
}
- private static void handleRoutedTezEvents(VertexImpl vertex, List<TezEvent>
tezEvents, boolean recovered) throws AMUserCodeException {
+ private static void handleRoutedTezEvents(VertexImpl vertex, List<TezEvent>
tezEvents, boolean recovered, boolean isPendingEvents) throws
AMUserCodeException {
if (vertex.getAppContext().isRecoveryEnabled()
&& !recovered
+ && !isPendingEvents
&& !tezEvents.isEmpty()) {
List<TezEvent> recoveryEvents =
Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/tez/blob/1d83ece4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index f3b5fed..a8d8ba5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -141,7 +141,10 @@ import
org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
+import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@@ -177,6 +180,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.internal.util.collections.Sets;
import com.google.common.collect.Lists;
@@ -5422,6 +5426,50 @@ public class TestVertexImpl {
Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE,
v2.getTerminationCause());
}
+ @Test (timeout = 5000)
+ public void testRouteEvent_RecoveredEvent() throws IOException {
+ doReturn(historyEventHandler).when(appContext).getHistoryHandler();
+ doReturn(true).when(appContext).isRecoveryEnabled();
+
+ initAllVertices(VertexState.INITED);
+ VertexImpl v1 = (VertexImpl)vertices.get("vertex1");
+ VertexImpl v2 = (VertexImpl)vertices.get("vertex2");
+ VertexImpl v3 = (VertexImpl)vertices.get("vertex3");
+ startVertex(v1);
+ startVertex(v2);
+ TezTaskID taskId = TezTaskID.getInstance(v1.getVertexId(), 0);
+ v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
+ DataMovementEvent dmEvent = DataMovementEvent.create(0,
ByteBuffer.wrap(new byte[0]));
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(taskId, 0);
+ TezEvent tezEvent1 = new TezEvent(dmEvent, new
EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", taId));
+ v1.handle(new VertexEventRouteEvent(v1.getVertexId(),
Lists.newArrayList(tezEvent1)));
+ dispatcher.await();
+ assertTrue(v3.pendingTaskEvents.size() != 0);
+ ArgumentCaptor<DAGHistoryEvent> argCaptor =
ArgumentCaptor.forClass(DAGHistoryEvent.class);
+ verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
+ verifyHistoryEvents(argCaptor.getAllValues(),
HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
+
+ v3.scheduleTasks(Lists.newArrayList(new TaskWithLocationHint(0, null)));
+ dispatcher.await();
+ assertTrue(v3.pendingTaskEvents.size() == 0);
+ // recovery events is not only handled one time
+ argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
+ verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
+ verifyHistoryEvents(argCaptor.getAllValues(),
HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
+ }
+
+ private void verifyHistoryEvents(List<DAGHistoryEvent> events,
HistoryEventType eventType, int expectedTimes) {
+ int actualTimes = 0;
+ LOG.info("");
+ for (DAGHistoryEvent event : events) {
+ LOG.info(event.getHistoryEvent().getEventType() + "");
+ if (event.getHistoryEvent().getEventType() == eventType) {
+ actualTimes ++;
+ }
+ }
+ Assert.assertEquals(actualTimes, expectedTimes);
+ }
+
@InterfaceAudience.Private
public static class RootInputSpecUpdaterVertexManager extends
VertexManagerPlugin {