Repository: tez Updated Branches: refs/heads/branch-0.7 1709e46ce -> a708c6d6c
TEZ-2758. Remove append API in RecoveryService after TEZ-1909 (zjffdu) (cherry picked from commit 35c926f238ec456c7ddf7e8ca47616c89cf68695) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a708c6d6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a708c6d6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a708c6d6 Branch: refs/heads/branch-0.7 Commit: a708c6d6c485e9793b80a95d6b8bedbdabd8c205 Parents: 1709e46 Author: Jeff Zhang <[email protected]> Authored: Wed Sep 30 10:25:21 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Wed Sep 30 10:28:58 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../dag/history/recovery/RecoveryService.java | 57 +++++----- .../history/recovery/TestRecoveryService.java | 110 ++++++++++++++++++- 3 files changed, 142 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a708c6d6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e846834..4764a35 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2758. Remove append API in RecoveryService after TEZ-1909. TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. TEZ-2858. Stop using System.currentTimeMillis in TestInputReadyTracker. TEZ-2857. Fix flakey tests in TestDAGImpl. @@ -282,6 +283,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2758. Remove append API in RecoveryService after TEZ-1909. TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. TEZ-2398. Flaky test: TestFaultTolerance TEZ-2808. Race condition between preemption and container assignment http://git-wip-us.apache.org/repos/asf/tez/blob/a708c6d6/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index 2fe0e6d..585050d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -306,11 +306,13 @@ public class RecoveryService extends AbstractService { try { SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent(); handleSummaryEvent(dagId, eventType, summaryEvent); - summaryStream.hflush(); if (summaryEvent.writeToRecoveryImmediately()) { handleRecoveryEvent(event); - doFlush(outputStreamMap.get(event.getDagID()), - appContext.getClock().getTime()); + // outputStream may already be closed and removed + if (outputStreamMap.containsKey(event.getDagID())) { + doFlush(outputStreamMap.get(event.getDagID()), + appContext.getClock().getTime()); + } } else { if (LOG.isDebugEnabled()) { LOG.debug("Queueing Non-immediate Summary/Recovery event of type" @@ -336,23 +338,7 @@ public class RecoveryService extends AbstractService { } catch (IOException ioe) { LOG.error("Error handling summary event" + ", eventType=" + event.getHistoryEvent().getEventType(), ioe); - Path fatalErrorDir = new Path(recoveryPath, RECOVERY_FATAL_OCCURRED_DIR); - try { - LOG.error("Adding a flag to ensure next AM attempt does not start up" - + ", flagFile=" + fatalErrorDir.toString()); - recoveryFatalErrorOccurred.set(true); - recoveryDirFS.mkdirs(fatalErrorDir); - if (recoveryDirFS.exists(fatalErrorDir)) { - LOG.error("Recovery failure occurred. Skipping all events"); - } else { - // throw error if fatal error flag could not be set - throw ioe; - } - } catch (IOException e) { - LOG.error("Failed to create fatal error flag dir " - + fatalErrorDir.toString(), e); - throw ioe; - } + createFatalErrorFlagDir(); if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) { // Throw error to tell client that dag submission failed throw ioe; @@ -368,6 +354,26 @@ public class RecoveryService extends AbstractService { } } + private void createFatalErrorFlagDir() throws IOException { + Path fatalErrorDir = new Path(recoveryPath, RECOVERY_FATAL_OCCURRED_DIR); + try { + LOG.error("Adding a flag to ensure next AM attempt does not start up" + + ", flagFile=" + fatalErrorDir.toString()); + recoveryFatalErrorOccurred.set(true); + recoveryDirFS.mkdirs(fatalErrorDir); + if (recoveryDirFS.exists(fatalErrorDir)) { + LOG.error("Recovery failure occurred. Skipping all events"); + } else { + // throw error if fatal error flag could not be set + throw new IOException("Failed to create fatal error flag dir " + + fatalErrorDir.toString()); + } + } catch (IOException e) { + LOG.error("Failed to create fatal error flag dir " + + fatalErrorDir.toString(), e); + } + } + private void handleSummaryEvent(TezDAGID dagID, HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException { @@ -385,7 +391,8 @@ public class RecoveryService extends AbstractService { summaryStream = recoveryDirFS.create(summaryPath, false, bufferSize); } else { - summaryStream = recoveryDirFS.append(summaryPath, bufferSize); + createFatalErrorFlagDir(); + return; } } if (LOG.isDebugEnabled()) { @@ -394,6 +401,7 @@ public class RecoveryService extends AbstractService { + ", eventType=" + eventType); } summaryEvent.toSummaryProtoStream(summaryStream); + summaryStream.hflush(); } @VisibleForTesting @@ -421,11 +429,8 @@ public class RecoveryService extends AbstractService { Path dagFilePath = TezCommonUtils.getDAGRecoveryPath(recoveryPath, dagID.toString()); FSDataOutputStream outputStream; if (recoveryDirFS.exists(dagFilePath)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Opening DAG recovery file in append mode" - + ", filePath=" + dagFilePath); - } - outputStream = recoveryDirFS.append(dagFilePath, bufferSize); + createFatalErrorFlagDir(); + return; } else { if (LOG.isDebugEnabled()) { LOG.debug("Opening DAG recovery file in create mode" http://git-wip-us.apache.org/repos/asf/tez/blob/a708c6d6/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java index f10adfc..040b407 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java @@ -22,16 +22,21 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.junit.Before; import org.junit.Test; import static org.mockito.Mockito.*; @@ -42,6 +47,15 @@ public class TestRecoveryService { private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestRecoveryService.class.getName() + "-tmpDir"; + private Configuration conf; + + @Before + public void setUp() throws IllegalArgumentException, IOException { + this.conf = new Configuration(); + FileSystem localFS = FileSystem.getLocal(conf); + localFS.delete(new Path(TEST_ROOT_DIR), true); + } + @Test(timeout = 5000) public void testDrainEvents() throws IOException { Configuration conf = new Configuration(); @@ -63,6 +77,100 @@ public class TestRecoveryService { assertEquals(randEventCount, recoveryService.processedRecoveryEventCounter.get()); } + @Test(timeout = 5000) + public void testMultipleDAGFinishedEvent() throws IOException { + Configuration conf = new Configuration(); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); + when(appContext.getClock()).thenReturn(new SystemClock()); + + MockRecoveryService recoveryService = new MockRecoveryService(appContext); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + recoveryService.init(conf); + recoveryService.start(); + TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1); + int randEventCount = new Random().nextInt(100) + 100; + for (int i=0; i< randEventCount; ++i) { + recoveryService.handle(new DAGHistoryEvent(dagId, + new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L))); + } + recoveryService.await(); + assertTrue(recoveryService.outputStreamMap.containsKey(dagId)); + // 2 DAGFinishedEvent + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGFinishedEvent(dagId, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1", null, + appAttemptId))); + // outputStream removed + assertFalse(recoveryService.outputStreamMap.containsKey(dagId)); + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null, + appAttemptId))); + // no new outputStream opened + assertEquals(recoveryService.outputStreamMap.size(), 0); + assertFalse(recoveryService.outputStreamMap.containsKey(dagId)); + recoveryService.stop(); + } + + @Test(timeout = 5000) + public void testSummaryPathExisted() throws IOException { + Configuration conf = new Configuration(); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); + when(appContext.getClock()).thenReturn(new SystemClock()); + + MockRecoveryService recoveryService = new MockRecoveryService(appContext); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + recoveryService.init(conf); + recoveryService.start(); + TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1); + Path dagRecoveryPath = TezCommonUtils.getSummaryRecoveryPath(recoveryService.recoveryPath); + touchFile(dagRecoveryPath); + assertFalse(recoveryService.hasRecoveryFailed()); + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null, + appAttemptId))); + assertTrue(recoveryService.hasRecoveryFailed()); + // be able to handle event after fatal error + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null, + appAttemptId))); + } + + @Test(timeout = 5000) + public void testRecoveryPathExisted() throws IOException { + Configuration conf = new Configuration(); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); + when(appContext.getClock()).thenReturn(new SystemClock()); + + MockRecoveryService recoveryService = new MockRecoveryService(appContext); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + recoveryService.init(conf); + recoveryService.start(); + TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1); + Path dagRecoveryPath = TezCommonUtils.getDAGRecoveryPath(recoveryService.recoveryPath, dagId.toString()); + touchFile(dagRecoveryPath); + assertFalse(recoveryService.hasRecoveryFailed()); + recoveryService.handle(new DAGHistoryEvent(dagId, + new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L))); + // wait for recovery event to be handled + recoveryService.await(); + assertTrue(recoveryService.hasRecoveryFailed()); + // be able to handle recovery event after fatal error + recoveryService.handle(new DAGHistoryEvent(dagId, + new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L))); + } + + private void touchFile(Path path) throws IOException { + FileSystem fs = FileSystem.getLocal(new Configuration()); + fs.create(path).close(); + } + private static class MockRecoveryService extends RecoveryService { public AtomicInteger processedRecoveryEventCounter = new AtomicInteger(0);
