Repository: tez Updated Branches: refs/heads/master 7fc28f7bb -> 701e9aa2c
TEZ-3124. Running task hangs due to missing event to initialize input in recovery (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/701e9aa2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/701e9aa2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/701e9aa2 Branch: refs/heads/master Commit: 701e9aa2ce04585be657bc8e1c3eb17317afad6b Parents: 7fc28f7 Author: Jeff Zhang <[email protected]> Authored: Wed Feb 24 16:03:30 2016 +0800 Committer: Jeff Zhang <[email protected]> Committed: Wed Feb 24 16:05:01 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 13 +- .../RecoveryServiceWithEventHandlingHook.java | 116 +++++++++++++++-- .../java/org/apache/tez/test/TestRecovery.java | 123 +++++++++++++++++++ 4 files changed, 236 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5dc8976..fb0e8b6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3124. Running task hangs due to missing event to initialize input in recovery. TEZ-3135. tez-ext-service-tests, tez-plugins/tez-yarn-timeline-history and tez-tools/tez-javadoc-tools missing dependencies. TEZ-3134. tez-dag should depend on commons-collections4. TEZ-3126. Log reason for not reducing parallelism http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index c8f217b..8b81be7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1861,12 +1861,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl void logJobHistoryVertexInitializedEvent() { - // TODO Vertex init may happen multiple times, so it is possible to have multiple VertexInitializedEvent - VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName, - initTimeRequested, initedTime, numTasks, - getProcessorName(), getAdditionalInputs(), initGeneratedEvents); - this.appContext.getHistoryHandler().handle( - new DAGHistoryEvent(getDAGId(), initEvt)); + if (recoveryData == null || !recoveryData.shouldSkipInit()) { + VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName, + initTimeRequested, initedTime, numTasks, + getProcessorName(), getAdditionalInputs(), initGeneratedEvents); + this.appContext.getHistoryHandler().handle( + new DAGHistoryEvent(getDAGId(), initEvt)); + } } void logJobHistoryVertexStartedEvent() { http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java index 8a0f39e..c08780f 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java +++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java @@ -20,6 +20,8 @@ package org.apache.tez.test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.codec.binary.Base64; @@ -164,7 +166,7 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService { throws IOException { if (shutdownCondition.timing.equals(TIMING.PRE) && appContext.getApplicationAttemptId().getAttemptId() == 1 - && shouldShutdown(event)) { + && shutdownCondition.match(event.getHistoryEvent())) { recoveryService.shutdown(); } } @@ -173,19 +175,11 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService { public void postHandleRecoveryEvent(DAGHistoryEvent event) throws IOException { if (shutdownCondition.timing.equals(TIMING.POST) - && appContext.getApplicationAttemptId().getAttemptId() == 1 - && shouldShutdown(event)) { + && appContext.getApplicationAttemptId().getAttemptId() == 1 + && shutdownCondition.match(event.getHistoryEvent())) { recoveryService.shutdown(); } } - - private boolean shouldShutdown(DAGHistoryEvent event) { - // only check whether to shutdown when it is the first AM attempt - if (appContext.getApplicationAttemptId().getAttemptId() >= 2) { - return false; - } - return shutdownCondition.match(event.getHistoryEvent()); - } @Override public void preHandleSummaryEvent(HistoryEventType eventType, @@ -387,4 +381,104 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService { return event.getEventType(); } } + + public static class MultipleRoundRecoveryEventHook extends RecoveryServiceHook { + + public static final String MULTIPLE_ROUND_SHUTDOWN_CONDITION = "tez.test.recovery.multiple_round_shutdown_condition"; + private MultipleRoundShutdownCondition shutdownCondition; + private int attemptId; + + public MultipleRoundRecoveryEventHook(RecoveryServiceWithEventHandlingHook recoveryService, AppContext appContext) { + super(recoveryService, appContext); + this.shutdownCondition = new MultipleRoundShutdownCondition(); + try { + Preconditions.checkArgument(recoveryService.getConfig().get(MULTIPLE_ROUND_SHUTDOWN_CONDITION) != null, + MULTIPLE_ROUND_SHUTDOWN_CONDITION + " is not set in TezConfiguration"); + this.shutdownCondition.deserialize(recoveryService.getConfig().get(MULTIPLE_ROUND_SHUTDOWN_CONDITION)); + } catch (IOException e) { + throw new TezUncheckedException("Can not initialize MultipleRoundShutdownCondition", e); + } + this.attemptId = appContext.getApplicationAttemptId().getAttemptId(); + } + + @Override + public void preHandleRecoveryEvent(DAGHistoryEvent event) throws IOException { + if (attemptId <= shutdownCondition.size()) { + SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(attemptId - 1); + if (condition.timing.equals(TIMING.PRE) + && condition.match(event.getHistoryEvent())) { + recoveryService.shutdown(); + } + } + } + + @Override + public void postHandleRecoveryEvent(DAGHistoryEvent event) throws IOException { + for (int i=0;i<shutdownCondition.size();++i) { + SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(i); + LOG.info("condition:" + condition.getEvent().getEventType() + ":" + condition.getHistoryEvent()); + } + if (attemptId <= shutdownCondition.size()) { + SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(attemptId - 1); + + LOG.info("event:" + event.getHistoryEvent().getEventType()); + if (condition.timing.equals(TIMING.POST) + && condition.match(event.getHistoryEvent())) { + recoveryService.shutdown(); + } + } + } + + @Override + public void preHandleSummaryEvent(HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException { + + } + + @Override + public void postHandleSummaryEvent(HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException { + + } + } + + public static class MultipleRoundShutdownCondition { + + private List<SimpleShutdownCondition> shutdownConditionList; + + public MultipleRoundShutdownCondition() { + + } + + public MultipleRoundShutdownCondition(List<SimpleShutdownCondition> shutdownConditionList) { + this.shutdownConditionList = shutdownConditionList; + } + + public String serialize() throws IOException { + StringBuilder builder = new StringBuilder(); + for (int i=0; i< shutdownConditionList.size(); ++i) { + builder.append(shutdownConditionList.get(i).serialize()); + if (i!=shutdownConditionList.size()-1) { + builder.append(";"); + } + } + return builder.toString(); + } + + public MultipleRoundShutdownCondition deserialize(String str) throws IOException { + String[] splits = str.split(";"); + shutdownConditionList = new ArrayList<SimpleShutdownCondition>(); + for (String split : splits) { + SimpleShutdownCondition condition = new SimpleShutdownCondition(); + shutdownConditionList.add(condition.deserialize(split)); + } + return this; + } + + public SimpleShutdownCondition getSimpleShutdownCondition(int index) { + return shutdownConditionList.get(index); + } + + public int size() { + return shutdownConditionList.size(); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/701e9aa2/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java index dc26167..3f669c6 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java @@ -301,6 +301,56 @@ public class TestRecovery { } + private void testOrderedWordCountMultipleRoundRecoverying( + RecoveryServiceWithEventHandlingHook.MultipleRoundShutdownCondition shutdownCondition, + boolean enableAutoParallelism, boolean generateSplitInClient) throws Exception { + + for (int i=0; i<shutdownCondition.size(); i++) { + SimpleShutdownCondition condition = shutdownCondition.getSimpleShutdownCondition(i); + LOG.info("ShutdownCondition:" + condition.getEventType() + + ", event=" + condition.getEvent()); + } + + String inputDirStr = "/tmp/owc-input/"; + Path inputDir = new Path(inputDirStr); + Path stagingDirPath = new Path("/tmp/owc-staging-dir"); + remoteFs.mkdirs(inputDir); + remoteFs.mkdirs(stagingDirPath); + TestTezJobs.generateOrderedWordCountInput(inputDir, remoteFs); + + String outputDirStr = "/tmp/owc-output/"; + Path outputDir = new Path(outputDirStr); + + TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); + tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4); + tezConf.set(TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS, + RecoveryServiceWithEventHandlingHook.class.getName()); + tezConf.set( + RecoveryServiceWithEventHandlingHook.AM_RECOVERY_SERVICE_HOOK_CLASS, + RecoveryServiceWithEventHandlingHook.MultipleRoundRecoveryEventHook.class.getName()); + tezConf.set(RecoveryServiceWithEventHandlingHook.MultipleRoundRecoveryEventHook.MULTIPLE_ROUND_SHUTDOWN_CONDITION, + shutdownCondition.serialize()); + tezConf.setBoolean( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, + enableAutoParallelism); + tezConf.setBoolean( + RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, false); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + tezConf.setBoolean( + TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false); + OrderedWordCount job = new OrderedWordCount(); + if (generateSplitInClient) { + Assert + .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{ + "-generateSplitInClient", inputDirStr, outputDirStr, "5"}, null) == 0); + } else { + Assert + .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{ + inputDirStr, outputDirStr, "5"}, null) == 0); + } + TestTezJobs.verifyOutput(outputDir, remoteFs); + } + @Test(timeout = 1800000) public void testRecovery_HashJoin() throws Exception { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), @@ -501,4 +551,77 @@ public class TestRecovery { assertTrue(shutdownCondition.match(lastEvent)); } + @Test(timeout = 1800000) + public void testTwoRoundsRecoverying() throws Exception { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), + 1); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID vertexId0 = TezVertexID.getInstance(dagId, 0); + TezVertexID vertexId1 = TezVertexID.getInstance(dagId, 1); + TezVertexID vertexId2 = TezVertexID.getInstance(dagId, 2); + ContainerId containerId = ContainerId.newInstance( + ApplicationAttemptId.newInstance(appId, 1), 1); + NodeId nodeId = NodeId.newInstance("localhost", 10); + List<TezEvent> initGeneratedEvents = Lists.newArrayList( + new TezEvent(InputDataInformationEvent.createWithObjectPayload(0, new Object()), null)); + + + List<SimpleShutdownCondition> shutdownConditions = Lists.newArrayList( + + new SimpleShutdownCondition(TIMING.POST, new DAGInitializedEvent( + dagId, 0L, "username", "dagName", null)), + new SimpleShutdownCondition(TIMING.POST, new DAGStartedEvent(dagId, + 0L, "username", "dagName")), + new SimpleShutdownCondition(TIMING.POST, + new VertexInitializedEvent(vertexId0, "Tokenizer", 0L, 0L, 0, + "", null, initGeneratedEvents)), + new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent( + vertexId0, 0L, 0L)), + new SimpleShutdownCondition(TIMING.POST, + new VertexConfigurationDoneEvent(vertexId0, 0L, 2, null, null, + null, true)), + new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent( + TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L)), + new SimpleShutdownCondition(TIMING.POST, + new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance( + TezTaskID.getInstance(vertexId0, 0), 0), "vertexName", 0L, + containerId, nodeId, "", "", "")), + new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent( + TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L, + null, TaskState.SUCCEEDED, "", new TezCounters(), 0)), + new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( + vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, + VertexState.SUCCEEDED, "", new TezCounters(), + new VertexStats(), new HashMap<String, Integer>())), + new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( + vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, + VertexState.SUCCEEDED, "", new TezCounters(), + new VertexStats(), new HashMap<String, Integer>())), + new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( + vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, + VertexState.SUCCEEDED, "", new TezCounters(), + new VertexStats(), new HashMap<String, Integer>())), + new SimpleShutdownCondition(TIMING.POST, new DAGFinishedEvent( + dagId, 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(), + "username", "dagName", new HashMap<String, Integer>(), + ApplicationAttemptId.newInstance(appId, 1), null)) + + ); + + Random rand = new Random(); + for (int i = 0; i < shutdownConditions.size() - 1; i++) { + // randomly choose half of the test scenario to avoid + // timeout. + if (rand.nextDouble()<0.5) { + int nextSimpleConditionIndex = i + 1 + rand.nextInt(shutdownConditions.size() - i - 1); + if (nextSimpleConditionIndex == shutdownConditions.size() - 1) { + testOrderedWordCountMultipleRoundRecoverying( + new RecoveryServiceWithEventHandlingHook.MultipleRoundShutdownCondition( + Lists.newArrayList(shutdownConditions.get(i), shutdownConditions.get(nextSimpleConditionIndex))) + , true, + shutdownConditions.get(i).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED); + } + } + } + } }
