Repository: tez Updated Branches: refs/heads/master c4487f966 -> 28f30b0ef
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 0d6cbcb..7082ca7 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -50,7 +50,7 @@ import org.apache.tez.dag.history.events.TaskFinishedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexInitializedEvent; -import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; +import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.history.utils.DAGUtils; @@ -113,15 +113,14 @@ public class HistoryEventTimelineConversion { case TASK_ATTEMPT_FINISHED: timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent); break; - case VERTEX_PARALLELISM_UPDATED: - timelineEntity = convertVertexParallelismUpdatedEvent( - (VertexParallelismUpdatedEvent) historyEvent); + case VERTEX_CONFIGURE_DONE: + timelineEntity = convertVertexReconfigureDoneEvent( + (VertexConfigurationDoneEvent) historyEvent); break; case DAG_RECOVERED: timelineEntity = convertDAGRecoveredEvent( (DAGRecoveredEvent) historyEvent); break; - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: case VERTEX_COMMIT_STARTED: case VERTEX_GROUP_COMMIT_STARTED: case VERTEX_GROUP_COMMIT_FINISHED: @@ -657,8 +656,8 @@ public class HistoryEventTimelineConversion { return atsEntity; } - private static TimelineEntity convertVertexParallelismUpdatedEvent( - VertexParallelismUpdatedEvent event) { + private static TimelineEntity convertVertexReconfigureDoneEvent( + VertexConfigurationDoneEvent event) { TimelineEntity atsEntity = new TimelineEntity(); atsEntity.setEntityId(event.getVertexID().toString()); atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name()); @@ -669,8 +668,8 @@ public class HistoryEventTimelineConversion { event.getVertexID().getDAGId().toString()); TimelineEvent updateEvt = new TimelineEvent(); - updateEvt.setEventType(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name()); - updateEvt.setTimestamp(event.getUpdateTime()); + updateEvt.setEventType(HistoryEventType.VERTEX_CONFIGURE_DONE.name()); + updateEvt.setTimestamp(event.getReconfigureDoneTime()); Map<String,Object> eventInfo = new HashMap<String, Object>(); if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) { @@ -683,7 +682,6 @@ public class HistoryEventTimelineConversion { eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers); } eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks()); - eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks()); updateEvt.setEventInfo(eventInfo); atsEntity.addEvent(updateEvt); http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 8e589d2..7792c62 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -70,12 +70,11 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; import org.apache.tez.dag.history.events.TaskFinishedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.history.events.VertexCommitStartedEvent; -import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; import org.apache.tez.dag.history.events.VertexInitializedEvent; -import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; +import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.history.utils.DAGUtils; @@ -158,13 +157,13 @@ public class TestHistoryEventTimelineConversion { break; case VERTEX_INITIALIZED: event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(), - random.nextInt(), "proc", null); + random.nextInt(), "proc", null, null); break; case VERTEX_STARTED: event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt()); break; - case VERTEX_PARALLELISM_UPDATED: - event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 1); + case VERTEX_CONFIGURE_DONE: + event = new VertexConfigurationDoneEvent(tezVertexID, 0L, 1, null, null, null, true); break; case VERTEX_FINISHED: event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(), @@ -184,7 +183,8 @@ public class TestHistoryEventTimelineConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, null, 0, null, 0); + random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, + null, null, null, null, 0, null, 0); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), @@ -193,9 +193,6 @@ public class TestHistoryEventTimelineConversion { case CONTAINER_STOPPED: event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId); break; - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: - event = new VertexRecoverableEventsGeneratedEvent(); - break; case DAG_COMMIT_STARTED: event = new DAGCommitStartedEvent(); break; @@ -524,7 +521,7 @@ public class TestHistoryEventTimelineConversion { events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID)); TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, - startTime, finishTime, state, error, diagnostics, counters, events, creationTime, + startTime, finishTime, state, error, diagnostics, counters, events, null, creationTime, tezTaskAttemptID, allocationTime); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); @@ -668,7 +665,7 @@ public class TestHistoryEventTimelineConversion { long initedTime = random.nextLong(); int numTasks = random.nextInt(); VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime, - initedTime, numTasks, "proc", null); + initedTime, numTasks, "proc", null, null); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); @@ -935,7 +932,7 @@ public class TestHistoryEventTimelineConversion { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testConvertVertexParallelismUpdatedEvent() { + public void testConvertVertexReconfigreDoneEvent() { TezVertexID vId = tezVertexID; Map<String, EdgeProperty> edgeMgrs = new HashMap<String, EdgeProperty>(); @@ -943,8 +940,8 @@ public class TestHistoryEventTimelineConversion { edgeMgrs.put("a", EdgeProperty.create(EdgeManagerPluginDescriptor.create("a.class") .setHistoryText("text"), DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))); - VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null, - edgeMgrs, null, 10); + VertexConfigurationDoneEvent event = new VertexConfigurationDoneEvent(vId, 0L, 1, null, + edgeMgrs, null, true); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType()); @@ -959,9 +956,8 @@ public class TestHistoryEventTimelineConversion { .contains(tezDAGID.toString())); TimelineEvent evt = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(), evt.getEventType()); + Assert.assertEquals(HistoryEventType.VERTEX_CONFIGURE_DONE.name(), evt.getEventType()); Assert.assertEquals(1, evt.getEventInfo().get(ATSConstants.NUM_TASKS)); - Assert.assertEquals(10, evt.getEventInfo().get(ATSConstants.OLD_NUM_TASKS)); Assert.assertNotNull(evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS)); Map<String, Object> updatedEdgeMgrs = (Map<String, Object>) @@ -976,7 +972,6 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY)); Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)); - } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java index b44b7d4..63e2b86 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java @@ -278,7 +278,7 @@ public class TezEvent implements Writable { } else { out.writeBoolean(false); } - } + } @Override public void readFields(DataInput in) throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/AMShutdownController.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/AMShutdownController.java b/tez-tests/src/test/java/org/apache/tez/test/AMShutdownController.java new file mode 100644 index 0000000..4baf6de --- /dev/null +++ b/tez-tests/src/test/java/org/apache/tez/test/AMShutdownController.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.test; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.recovery.RecoveryService; + +public abstract class AMShutdownController { + + private List<DAGHistoryEvent> historyEvents = new ArrayList<DAGHistoryEvent>(); + + protected AppContext appContext; + protected RecoveryService recoveryService; + + public AMShutdownController(AppContext appContext, RecoveryService recoveryService) { + this.appContext = appContext; + this.recoveryService = recoveryService; + } + + public void preHandleHistoryEvent(DAGHistoryEvent event) { + historyEvents.add(event); + if (shouldShutdownPreEvent(event, historyEvents)) { + System.exit(1); + } + } + + public void postHandleHistoryEvent(DAGHistoryEvent event) { + if (shouldShutdownPostEvent(event, historyEvents)) { + System.exit(1); + } + } + + protected abstract boolean shouldShutdownPreEvent(DAGHistoryEvent curEvent, + List<DAGHistoryEvent> historyEvents); + + protected abstract boolean shouldShutdownPostEvent(DAGHistoryEvent curEvent, + List<DAGHistoryEvent> historyEvents); +} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java new file mode 100644 index 0000000..cec8fbd --- /dev/null +++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java @@ -0,0 +1,386 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.TezReflectionException; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.SummaryEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; +import org.apache.tez.dag.history.events.VertexInitializedEvent; +import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.history.recovery.RecoveryService; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.test.RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Add hook before/after processing RecoveryEvent & SummaryEvent + * + */ +public class RecoveryServiceWithEventHandlingHook extends RecoveryService { + + public static final String AM_RECOVERY_SERVICE_HOOK_CLASS = "tez.test.am.recovery_service.hook"; + private static final Logger LOG = LoggerFactory.getLogger(RecoveryServiceWithEventHandlingHook.class); + private RecoveryServiceHook hook; + private boolean shutdownInvoked = false; + public RecoveryServiceWithEventHandlingHook(AppContext appContext) { + super(appContext); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + String clazz = conf.get(AM_RECOVERY_SERVICE_HOOK_CLASS); + Preconditions.checkArgument(clazz != null, "RecoveryServiceHook class is not specified"); + this.hook = ReflectionUtils.createClazzInstance(clazz, + new Class[]{RecoveryServiceWithEventHandlingHook.class, AppContext.class}, + new Object[]{this, super.appContext}); + } + + @Override + protected void handleRecoveryEvent(DAGHistoryEvent event) throws IOException { + hook.preHandleRecoveryEvent(event); + if (shutdownInvoked) { + return; + } + super.handleRecoveryEvent(event); + hook.postHandleRecoveryEvent(event); + } + + @Override + protected void handleSummaryEvent(TezDAGID dagID, HistoryEventType eventType, + SummaryEvent summaryEvent) throws IOException { + hook.preHandleSummaryEvent(eventType, summaryEvent); + if (shutdownInvoked) { + return; + } + super.handleSummaryEvent(dagID, eventType, summaryEvent); + hook.postHandleSummaryEvent(eventType, summaryEvent); + } + + private void shutdown() { + // start a new thread to shutdown AM otherwise will cause dead lock + // (JVM exit will DAGAppMasterShutdownHook called and RecoveryService's stop will be called + // which will drain all the events) + Thread shutdownThread = new Thread("AMShutdown Thread") { + @Override + public void run() { + LOG.info("Try to kill AM"); + System.exit(1); + } + }; + // stop process recovery events + super.setStopped(true); + shutdownInvoked = true; + shutdownThread.start(); + } + + /** + * Abstract class to allow do something before/after processing recovery events + * + */ + public static abstract class RecoveryServiceHook { + + protected RecoveryServiceWithEventHandlingHook recoveryService; + protected AppContext appContext; + + public RecoveryServiceHook(RecoveryServiceWithEventHandlingHook recoveryService, AppContext appContext) { + this.recoveryService = recoveryService; + this.appContext = appContext; + } + + public abstract void preHandleRecoveryEvent(DAGHistoryEvent event) throws IOException; + + public abstract void postHandleRecoveryEvent(DAGHistoryEvent event) throws IOException; + + public abstract void preHandleSummaryEvent(HistoryEventType eventType, + SummaryEvent summaryEvent) throws IOException; + + public abstract void postHandleSummaryEvent(HistoryEventType eventType, + SummaryEvent summaryEvent) throws IOException; + + } + + /** + * Shutdown AM before/after a specified recovery event is processed. + * Only do it in the first AM attempt + * + */ + public static class SimpleRecoveryEventHook extends RecoveryServiceHook { + + public static final String SIMPLE_SHUTDOWN_CONDITION = "tez.test.recovery.simple_shutdown_condition"; + private SimpleShutdownCondition shutdownCondition; + + public SimpleRecoveryEventHook( + RecoveryServiceWithEventHandlingHook recoveryService, AppContext appContext) { + super(recoveryService, appContext); + this.shutdownCondition = new SimpleShutdownCondition(); + try { + Preconditions.checkArgument(recoveryService.getConfig().get(SIMPLE_SHUTDOWN_CONDITION) != null, + SIMPLE_SHUTDOWN_CONDITION + " is not set in TezConfiguration"); + this.shutdownCondition.deserialize(recoveryService.getConfig().get(SIMPLE_SHUTDOWN_CONDITION)); + } catch (IOException e) { + throw new TezUncheckedException("Can not initialize SimpleShutdownCondition", e); + } + } + + @Override + public void preHandleRecoveryEvent(DAGHistoryEvent event) + throws IOException { + if (shutdownCondition.timing.equals(TIMING.PRE) + && appContext.getApplicationAttemptId().getAttemptId() == 1 + && shouldShutdown(event)) { + recoveryService.shutdown(); + } + } + + @Override + public void postHandleRecoveryEvent(DAGHistoryEvent event) + throws IOException { + if (shutdownCondition.timing.equals(TIMING.POST) + && appContext.getApplicationAttemptId().getAttemptId() == 1 + && shouldShutdown(event)) { + recoveryService.shutdown(); + } + } + + private boolean shouldShutdown(DAGHistoryEvent event) { + // only check whether to shutdown when it is the first AM attempt + if (appContext.getApplicationAttemptId().getAttemptId() >= 2) { + return false; + } + return shutdownCondition.match(event.getHistoryEvent()); + } + + @Override + public void preHandleSummaryEvent(HistoryEventType eventType, + SummaryEvent summaryEvent) throws IOException { + } + + @Override + public void postHandleSummaryEvent(HistoryEventType eventType, + SummaryEvent summaryEvent) throws IOException { + } + + } + + /** + * + * Shutdown AM based on one recovery event if it is matched. + * This would be serialized as property of TezConfiguration and deserialized at runtime. + */ + public static class SimpleShutdownCondition { + + public static enum TIMING { + PRE, // before the event + POST, // after the event + } + + private TIMING timing; + private HistoryEvent event; + + public SimpleShutdownCondition(TIMING timing, HistoryEvent event) { + this.timing = timing; + this.event = event; + } + + public SimpleShutdownCondition() { + } + + private String encodeHistoryEvent(HistoryEvent event) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + event.toProtoStream(out); + return event.getClass().getName() + "," + + Base64.encodeBase64String(out.toByteArray()); + } + + private HistoryEvent decodeHistoryEvent(String eventClass, String base64) + throws IOException { + ByteArrayInputStream in = new ByteArrayInputStream( + Base64.decodeBase64(base64)); + try { + HistoryEvent event = ReflectionUtils.createClazzInstance(eventClass); + event.fromProtoStream(in); + return event; + } catch (TezReflectionException e) { + throw new IOException(e); + } + } + + public String serialize() throws IOException { + StringBuilder builder = new StringBuilder(); + builder.append(timing.name() + ","); + builder.append(encodeHistoryEvent(event)); + return builder.toString(); + } + + public SimpleShutdownCondition deserialize(String str) throws IOException { + String[] tokens = str.split(","); + timing = TIMING.valueOf(tokens[0]); + this.event = decodeHistoryEvent(tokens[1], tokens[2]); + return this; + } + + public HistoryEvent getEvent() { + return event; + } + + public TIMING getTiming() { + return timing; + } + + public boolean match(HistoryEvent incomingEvent) { + switch (event.getEventType()) { + case DAG_SUBMITTED: + if (incomingEvent.getEventType() == HistoryEventType.DAG_SUBMITTED) { + // only compare eventType + return true; + } + break; + + case DAG_INITIALIZED: + if (incomingEvent.getEventType() == HistoryEventType.DAG_INITIALIZED) { + // only compare eventType + return true; + } + break; + + case DAG_STARTED: + if (incomingEvent.getEventType() == HistoryEventType.DAG_STARTED) { + // only compare eventType + return true; + } + break; + + case DAG_FINISHED: + if (incomingEvent.getEventType() == HistoryEventType.DAG_FINISHED) { + // only compare eventType + return true; + } + break; + + case VERTEX_INITIALIZED: + if (incomingEvent.getEventType() == HistoryEventType.VERTEX_INITIALIZED) { + VertexInitializedEvent otherEvent = (VertexInitializedEvent) incomingEvent; + VertexInitializedEvent conditionEvent = (VertexInitializedEvent) event; + // compare vertexId; + return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId(); + } + break; + + case VERTEX_STARTED: + if (incomingEvent.getEventType() == HistoryEventType.VERTEX_STARTED) { + VertexStartedEvent otherEvent = (VertexStartedEvent) incomingEvent; + VertexStartedEvent conditionEvent = (VertexStartedEvent) event; + // compare vertexId + return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId(); + } + break; + + case VERTEX_FINISHED: + if (incomingEvent.getEventType() == HistoryEventType.VERTEX_FINISHED) { + VertexFinishedEvent otherEvent = (VertexFinishedEvent) incomingEvent; + VertexFinishedEvent conditionEvent = (VertexFinishedEvent) event; + // compare vertexId + return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId(); + } + break; + case VERTEX_CONFIGURE_DONE: + if (incomingEvent.getEventType() == HistoryEventType.VERTEX_CONFIGURE_DONE) { + VertexConfigurationDoneEvent otherEvent = (VertexConfigurationDoneEvent) incomingEvent; + VertexConfigurationDoneEvent conditionEvent = (VertexConfigurationDoneEvent) event; + // compare vertexId + return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId(); + } + break; + case TASK_STARTED: + if (incomingEvent.getEventType() == HistoryEventType.TASK_STARTED) { + TaskStartedEvent otherEvent = (TaskStartedEvent) incomingEvent; + TaskStartedEvent conditionEvent = (TaskStartedEvent) event; + // compare vertexId and taskId + return otherEvent.getTaskID().getVertexID().getId() == conditionEvent.getTaskID().getVertexID().getId() + && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId(); + } + break; + + case TASK_FINISHED: + if (incomingEvent.getEventType() == HistoryEventType.TASK_FINISHED) { + TaskFinishedEvent otherEvent = (TaskFinishedEvent) incomingEvent; + TaskFinishedEvent conditionEvent = (TaskFinishedEvent) event; + // compare vertexId and taskId + return otherEvent.getTaskID().getVertexID().getId() == conditionEvent.getTaskID().getVertexID().getId() + && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId(); + } + break; + + case TASK_ATTEMPT_STARTED: + if (incomingEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_STARTED) { + TaskAttemptStartedEvent otherEvent = (TaskAttemptStartedEvent) incomingEvent; + TaskAttemptStartedEvent conditionEvent = (TaskAttemptStartedEvent) event; + // compare vertexId, taskId & taskAttemptId + return otherEvent.getTaskAttemptID().getTaskID().getVertexID().getId() + == conditionEvent.getTaskAttemptID().getTaskID().getVertexID().getId() + && otherEvent.getTaskAttemptID().getTaskID().getId() == conditionEvent.getTaskAttemptID().getTaskID().getId() + && otherEvent.getTaskAttemptID().getId() == conditionEvent.getTaskAttemptID().getId(); + } + break; + + case TASK_ATTEMPT_FINISHED: + if (incomingEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) { + TaskAttemptFinishedEvent otherEvent = (TaskAttemptFinishedEvent) incomingEvent; + TaskAttemptFinishedEvent conditionEvent = (TaskAttemptFinishedEvent) event; + // compare vertexId, taskId & taskAttemptId + return otherEvent.getTaskAttemptID().getTaskID().getVertexID().getId() + == conditionEvent.getTaskAttemptID().getTaskID().getVertexID().getId() + && otherEvent.getTaskAttemptID().getTaskID().getId() == conditionEvent.getTaskAttemptID().getTaskID().getId() + && otherEvent.getTaskAttemptID().getId() == conditionEvent.getTaskAttemptID().getId(); + } + break; + default: + LOG.info("do nothing with event:" + + event.getEventType()); + } + + return false; + } + + public HistoryEventType getEventType() { + return event.getEventType(); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java index 778825b..8e41b7e 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java @@ -24,26 +24,18 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.client.TezClientUtils; import org.apache.tez.client.TezClient; -import org.apache.tez.common.TezCommonUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DataSourceDescriptor; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.DAGStatus.State; -import org.apache.tez.dag.app.RecoveryParser; -import org.apache.tez.dag.history.HistoryEvent; -import org.apache.tez.dag.history.HistoryEventType; -import org.apache.tez.dag.history.events.VertexInitializedEvent; -import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; import org.apache.tez.test.dag.MultiAttemptDAG; import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer; import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput; @@ -57,8 +49,6 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Random; public class TestDAGRecovery { @@ -176,56 +166,6 @@ public class TestDAGRecovery { Assert.assertEquals(finalState, dagStatus.getState()); } - private void verifyRecoveryLog() throws IOException{ - ApplicationId appId = tezSession.getAppMasterApplicationId(); - Path tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString()); - Path recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf); - - FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf); - // verify recovery logs in each attempt - for (int attemptNum=1; attemptNum<=3; ++attemptNum) { - List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>(); - // read the recovery logs for current attempt - // since dag recovery logs is dispersed in each attempt's recovery directory, - // so need to read recovery logs from the first attempt to current attempt - for (int i=1 ;i<=attemptNum;++i) { - Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,i); - Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir, - appId.toString().replace("application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); - historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile( - fs.open(recoveryFilePath))); - } - - int inputInfoEventIndex = -1; - int vertexInitedEventIndex = -1; - for (int j=0;j<historyEvents.size(); ++j) { - HistoryEvent historyEvent = historyEvents.get(j); - LOG.info("Parsed event from recovery stream" - + ", eventType=" + historyEvent.getEventType() - + ", event=" + historyEvent); - if (historyEvent.getEventType() == HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED) { - VertexRecoverableEventsGeneratedEvent dmEvent = - (VertexRecoverableEventsGeneratedEvent) historyEvent; - // TODO do not need to check whether it is -1 after Tez-1521 is resolved - if (dmEvent.getVertexID().getId() == 0 && inputInfoEventIndex == -1) { - inputInfoEventIndex = j; - } - } - if (historyEvent.getEventType() == HistoryEventType.VERTEX_INITIALIZED) { - VertexInitializedEvent vInitedEvent = (VertexInitializedEvent) historyEvent; - if (vInitedEvent.getVertexID().getId() == 0) { - vertexInitedEventIndex = j; - } - } - } - // v1's init events must be logged before its VertexInitializedEvent (Tez-1345) - Assert.assertTrue("can not find VERTEX_DATA_MOVEMENT_EVENTS_GENERATED for v1", inputInfoEventIndex != -1); - Assert.assertTrue("can not find VERTEX_INITIALIZED for v1", vertexInitedEventIndex != -1); - Assert.assertTrue("VERTEX_DATA_MOVEMENT_EVENTS_GENERATED is logged before VERTEX_INITIALIZED for v1", - inputInfoEventIndex < vertexInitedEventIndex); - } - } - @Test(timeout=120000) public void testBasicRecovery() throws Exception { DAG dag = MultiAttemptDAG.createDAG("TestBasicRecovery", null); @@ -236,8 +176,6 @@ public class TestDAGRecovery { dag.getVertex("v1").addDataSource("Input", dataSource); runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED); - - verifyRecoveryLog(); } @Test(timeout=120000) http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java new file mode 100644 index 0000000..45582a1 --- /dev/null +++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java @@ -0,0 +1,484 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.app.RecoveryParser; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.impl.VertexStats; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; +import org.apache.tez.dag.history.events.VertexInitializedEvent; +import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.history.recovery.RecoveryService; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.examples.HashJoinExample; +import org.apache.tez.examples.OrderedWordCount; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.test.RecoveryServiceWithEventHandlingHook.SimpleRecoveryEventHook; +import org.apache.tez.test.RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition; +import org.apache.tez.test.RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class TestRecovery { + + private static final Logger LOG = LoggerFactory.getLogger(TestRecovery.class); + + private static Configuration conf = new Configuration(); + private static MiniTezCluster miniTezCluster = null; + private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + + TestRecovery.class.getName() + "-tmpDir"; + private static MiniDFSCluster dfsCluster = null; + private static FileSystem remoteFs = null; + + @BeforeClass + public static void beforeClass() throws Exception { + LOG.info("Starting mini clusters"); + try { + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .format(true).racks(null).build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + if (miniTezCluster == null) { + miniTezCluster = new MiniTezCluster(TestRecovery.class.getName(), 1, 1, 1); + Configuration miniTezconf = new Configuration(conf); + miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4); + miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + miniTezCluster.init(miniTezconf); + miniTezCluster.start(); + } + } + + @AfterClass + public static void afterClass() throws InterruptedException { + if (miniTezCluster != null) { + try { + LOG.info("Stopping MiniTezCluster"); + miniTezCluster.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + if (dfsCluster != null) { + try { + LOG.info("Stopping DFSCluster"); + dfsCluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + @Test(timeout=1800000) + public void testRecovery_OrderedWordCount() throws Exception { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), + 1); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID vertexId0 = TezVertexID.getInstance(dagId, 0); + TezVertexID vertexId1 = TezVertexID.getInstance(dagId, 1); + TezVertexID vertexId2 = TezVertexID.getInstance(dagId, 2); + ContainerId containerId = ContainerId.newContainerId( + ApplicationAttemptId.newInstance(appId, 1), 1); + NodeId nodeId = NodeId.newInstance("localhost", 10); + + List<TezEvent> initGeneratedEvents = Lists.newArrayList( + new TezEvent(InputDataInformationEvent.createWithObjectPayload(0, new Object()), null)); + + List<SimpleShutdownCondition> shutdownConditions = Lists + .newArrayList( + new SimpleShutdownCondition(TIMING.POST, new DAGInitializedEvent( + dagId, 0L, "username", "dagName", null)), + new SimpleShutdownCondition(TIMING.POST, new DAGStartedEvent(dagId, + 0L, "username", "dagName")), + new SimpleShutdownCondition(TIMING.POST, new DAGFinishedEvent( + dagId, 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(), + "username", "dagName", new HashMap<String, Integer>(), + ApplicationAttemptId.newInstance(appId, 1))), + new SimpleShutdownCondition(TIMING.POST, + new VertexInitializedEvent(vertexId0, "Tokenizer", 0L, 0L, 0, + "", null, initGeneratedEvents)), + new SimpleShutdownCondition(TIMING.POST, + new VertexInitializedEvent(vertexId1, "Summation", 0L, 0L, 0, + "", null, null)), + new SimpleShutdownCondition(TIMING.POST, + new VertexInitializedEvent(vertexId2, "Sorter", 0L, 0L, 0, "", + null, null)), + + new SimpleShutdownCondition(TIMING.POST, + new VertexConfigurationDoneEvent(vertexId0, 0L, 2, null, null, + null, true)), + + new SimpleShutdownCondition(TIMING.POST, + new VertexConfigurationDoneEvent(vertexId1, 0L, 2, null, null, + null, true)), + + new SimpleShutdownCondition(TIMING.POST, + new VertexConfigurationDoneEvent(vertexId2, 0L, 2, null, null, + null, true)), + new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent( + vertexId0, 0L, 0L)), + new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent( + vertexId1, 0L, 0L)), + new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent( + vertexId2, 0L, 0L)), + + new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( + vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, + VertexState.SUCCEEDED, "", new TezCounters(), + new VertexStats(), new HashMap<String, Integer>())), + new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( + vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, + VertexState.SUCCEEDED, "", new TezCounters(), + new VertexStats(), new HashMap<String, Integer>())), + new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( + vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, + VertexState.SUCCEEDED, "", new TezCounters(), + new VertexStats(), new HashMap<String, Integer>())), + + new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent( + TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L)), + new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent( + TezTaskID.getInstance(vertexId1, 0), "vertexName", 0L, 0L)), + new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent( + TezTaskID.getInstance(vertexId2, 0), "vertexName", 0L, 0L)), + + new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent( + TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L, + null, TaskState.SUCCEEDED, "", new TezCounters(), 0)), + new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent( + TezTaskID.getInstance(vertexId1, 0), "vertexName", 0L, 0L, + null, TaskState.SUCCEEDED, "", new TezCounters(), 0)), + new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent( + TezTaskID.getInstance(vertexId2, 0), "vertexName", 0L, 0L, + null, TaskState.SUCCEEDED, "", new TezCounters(), 0)), + + new SimpleShutdownCondition(TIMING.POST, + new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance( + TezTaskID.getInstance(vertexId0, 0), 0), "vertexName", 0L, + containerId, nodeId, "", "", "")), + new SimpleShutdownCondition(TIMING.POST, + new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance( + TezTaskID.getInstance(vertexId1, 0), 0), "vertexName", 0L, + containerId, nodeId, "", "", "")), + new SimpleShutdownCondition(TIMING.POST, + new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance( + TezTaskID.getInstance(vertexId2, 0), 0), "vertexName", 0L, + containerId, nodeId, "", "", "")) + + ); + + Random rand = new Random(); + for (int i = 0; i < shutdownConditions.size(); i++) { + // randomly choose half of the test scenario to avoid + // timeout. + if (rand.nextDouble() < 0.5) { + testOrderedWordCount(shutdownConditions.get(i), true); + } + } + } + + private void testOrderedWordCount(SimpleShutdownCondition shutdownCondition, + boolean enableAutoParallelism) throws Exception { + LOG.info("shutdownCondition:" + shutdownCondition.getEventType() + + ", event=" + shutdownCondition.getEvent()); + String inputDirStr = "/tmp/owc-input/"; + Path inputDir = new Path(inputDirStr); + Path stagingDirPath = new Path("/tmp/owc-staging-dir"); + remoteFs.mkdirs(inputDir); + remoteFs.mkdirs(stagingDirPath); + TestTezJobs.generateOrderedWordCountInput(inputDir, remoteFs); + + String outputDirStr = "/tmp/owc-output/"; + Path outputDir = new Path(outputDirStr); + + TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); + tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4); + tezConf.set(TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS, + RecoveryServiceWithEventHandlingHook.class.getName()); + tezConf.set( + RecoveryServiceWithEventHandlingHook.AM_RECOVERY_SERVICE_HOOK_CLASS, + SimpleRecoveryEventHook.class.getName()); + tezConf.set(SimpleRecoveryEventHook.SIMPLE_SHUTDOWN_CONDITION, + shutdownCondition.serialize()); + tezConf.setBoolean( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, + enableAutoParallelism); + tezConf.setBoolean( + RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, false); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + tezConf.setBoolean( + TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false); + tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO;org.apache.tez=DEBUG"); + + OrderedWordCount job = new OrderedWordCount(); + Assert + .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[] { + inputDirStr, outputDirStr, "5" }, null) == 0); + TestTezJobs.verifyOutput(outputDir, remoteFs); + List<HistoryEvent> historyEventsOfAttempt1 = RecoveryParser + .readRecoveryEvents(tezConf, job.getAppId(), 1); + HistoryEvent lastEvent = historyEventsOfAttempt1 + .get(historyEventsOfAttempt1.size() - 1); + assertEquals(shutdownCondition.getEvent().getEventType(), + lastEvent.getEventType()); + assertTrue(shutdownCondition.match(lastEvent)); + + } + + @Test(timeout = 1800000) + public void testRecovery_HashJoin() throws Exception { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), + 1); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID vertexId0 = TezVertexID.getInstance(dagId, 0); + TezVertexID vertexId1 = TezVertexID.getInstance(dagId, 1); + TezVertexID vertexId2 = TezVertexID.getInstance(dagId, 2); + ContainerId containerId = ContainerId.newContainerId( + ApplicationAttemptId.newInstance(appId, 1), 1); + NodeId nodeId = NodeId.newInstance("localhost", 10); + List<TezEvent> initGeneratedEvents = Lists.newArrayList( + new TezEvent(InputDataInformationEvent.createWithObjectPayload(0, new Object()), null)); + + List<SimpleShutdownCondition> shutdownConditions = Lists.newArrayList( + + new SimpleShutdownCondition(TIMING.POST, new DAGInitializedEvent(dagId, + 0L, "username", "dagName", null)), + new SimpleShutdownCondition(TIMING.POST, new DAGStartedEvent(dagId, 0L, + "username", "dagName")), + new SimpleShutdownCondition(TIMING.POST, new DAGFinishedEvent(dagId, + 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(), "username", + "dagName", new HashMap<String, Integer>(), ApplicationAttemptId + .newInstance(appId, 1))), + new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent( + vertexId0, "hashSide", 0L, 0L, 0, "", null, initGeneratedEvents)), + new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent( + vertexId1, "streamingSide", 0L, 0L, 0, "", null, null)), + new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent( + vertexId2, "joiner", 0L, 0L, 0, "", null, null)), + + new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent( + vertexId0, 0L, 0L)), + new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent( + vertexId1, 0L, 0L)), + new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent( + vertexId2, 0L, 0L)), + + new SimpleShutdownCondition(TIMING.POST, + new VertexConfigurationDoneEvent(vertexId0, 0L, 2, null, null, + null, true)), + + new SimpleShutdownCondition(TIMING.POST, + new VertexConfigurationDoneEvent(vertexId1, 0L, 2, null, null, + null, true)), + + new SimpleShutdownCondition(TIMING.POST, + new VertexConfigurationDoneEvent(vertexId2, 0L, 2, null, null, + null, true)), + + new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( + vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, + VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), + new HashMap<String, Integer>())), + new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( + vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, + VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), + new HashMap<String, Integer>())), + new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( + vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, + VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), + new HashMap<String, Integer>())), + + new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(TezTaskID + .getInstance(vertexId0, 0), "vertexName", 0L, 0L)), + new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(TezTaskID + .getInstance(vertexId1, 0), "vertexName", 0L, 0L)), + new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(TezTaskID + .getInstance(vertexId2, 0), "vertexName", 0L, 0L)), + + new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent( + TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L, null, + TaskState.SUCCEEDED, "", new TezCounters(), 0)), + new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent( + TezTaskID.getInstance(vertexId1, 0), "vertexName", 0L, 0L, null, + TaskState.SUCCEEDED, "", new TezCounters(), 0)), + new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent( + TezTaskID.getInstance(vertexId2, 0), "vertexName", 0L, 0L, null, + TaskState.SUCCEEDED, "", new TezCounters(), 0)), + + new SimpleShutdownCondition(TIMING.POST, + new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance( + TezTaskID.getInstance(vertexId0, 0), 0), "vertexName", 0L, + containerId, nodeId, "", "", "")), + new SimpleShutdownCondition(TIMING.POST, + new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance( + TezTaskID.getInstance(vertexId1, 0), 0), "vertexName", 0L, + containerId, nodeId, "", "", "")), + new SimpleShutdownCondition(TIMING.POST, + new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance( + TezTaskID.getInstance(vertexId2, 0), 0), "vertexName", 0L, + containerId, nodeId, "", "", "")) + + ); + + Random rand = new Random(); + for (int i = 0; i < shutdownConditions.size(); i++) { + // randomly choose half of the test scenario to avoid + // timeout. + if (rand.nextDouble() < 0.5) { + testHashJoinExample(shutdownConditions.get(i), true); + } + } + } + + private void testHashJoinExample(SimpleShutdownCondition shutdownCondition, + boolean enableAutoParallelism) throws Exception { + HashJoinExample hashJoinExample = new HashJoinExample(); + TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); + tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4); + tezConf.set(TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS, + RecoveryServiceWithEventHandlingHook.class.getName()); + tezConf.set( + RecoveryServiceWithEventHandlingHook.AM_RECOVERY_SERVICE_HOOK_CLASS, + SimpleRecoveryEventHook.class.getName()); + tezConf.set(SimpleRecoveryEventHook.SIMPLE_SHUTDOWN_CONDITION, + shutdownCondition.serialize()); + tezConf.setBoolean( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, + enableAutoParallelism); + tezConf.setBoolean( + RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, false); + tezConf.setBoolean( + TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false); + tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO;org.apache.tez=DEBUG"); + + hashJoinExample.setConf(tezConf); + Path stagingDirPath = new Path("/tmp/tez-staging-dir"); + Path inPath1 = new Path("/tmp/hashJoin/inPath1"); + Path inPath2 = new Path("/tmp/hashJoin/inPath2"); + Path outPath = new Path("/tmp/hashJoin/outPath"); + remoteFs.delete(outPath, true); + remoteFs.mkdirs(inPath1); + remoteFs.mkdirs(inPath2); + remoteFs.mkdirs(stagingDirPath); + + Set<String> expectedResult = new HashSet<String>(); + + FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file")); + FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file")); + BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter(out1)); + BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter(out2)); + for (int i = 0; i < 20; i++) { + String term = "term" + i; + writer1.write(term); + writer1.newLine(); + if (i % 2 == 0) { + writer2.write(term); + writer2.newLine(); + expectedResult.add(term); + } + } + writer1.close(); + writer2.close(); + out1.close(); + out2.close(); + + String[] args = new String[] { + "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + + stagingDirPath.toString(), inPath1.toString(), + inPath2.toString(), "1", outPath.toString() }; + assertEquals(0, hashJoinExample.run(args)); + + FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); + assertEquals(1, statuses.length); + FSDataInputStream inStream = remoteFs.open(statuses[0].getPath()); + BufferedReader reader = new BufferedReader(new InputStreamReader(inStream)); + String line; + while ((line = reader.readLine()) != null) { + assertTrue(expectedResult.remove(line)); + } + reader.close(); + inStream.close(); + assertEquals(0, expectedResult.size()); + + List<HistoryEvent> historyEventsOfAttempt1 = RecoveryParser + .readRecoveryEvents(tezConf, hashJoinExample.getAppId(), 1); + HistoryEvent lastEvent = historyEventsOfAttempt1 + .get(historyEventsOfAttempt1.size() - 1); + assertEquals(shutdownCondition.getEvent().getEventType(), + lastEvent.getEventType()); + assertTrue(shutdownCondition.match(lastEvent)); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index ab89ddb..c3e8487 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -573,7 +573,7 @@ public class TestTezJobs { } } - private void generateOrderedWordCountInput(Path inputDir, FileSystem fs) throws IOException { + public static void generateOrderedWordCountInput(Path inputDir, FileSystem fs) throws IOException { Path dataPath1 = new Path(inputDir, "inPath1"); Path dataPath2 = new Path(inputDir, "inPath2"); @@ -606,7 +606,7 @@ public class TestTezJobs { } } - private void verifyOrderedWordCountOutput(Path resultFile, FileSystem fs) throws IOException { + public static void verifyOrderedWordCountOutput(Path resultFile, FileSystem fs) throws IOException { FSDataInputStream inputStream = fs.open(resultFile); final String prefix = "a"; int currentCounter = 10; @@ -631,7 +631,7 @@ public class TestTezJobs { Assert.assertEquals(0, currentCounter); } - private void verifyOutput(Path outputDir, FileSystem fs) throws IOException { + public static void verifyOutput(Path outputDir, FileSystem fs) throws IOException { FileStatus[] fileStatuses = fs.listStatus(outputDir); Path resultFile = null; boolean foundResult = false; http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java index 5c6f855..cdf69e6 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java +++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java @@ -121,7 +121,7 @@ public class MultiAttemptDAG { + ", currentAttempt=" + getContext().getDAGAttemptNumber()); if (successAttemptId > getContext().getDAGAttemptNumber()) { Runtime.getRuntime().halt(-1); - } else if (successAttemptId == getContext().getDAGAttemptNumber()) { + } else { LOG.info("Scheduling tasks for vertex=" + getContext().getVertexName()); int numTasks = getContext().getVertexNumTasks(getContext().getVertexName()); List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
