Repository: tez
Updated Branches:
refs/heads/branch-0.6 aa4806544 -> 7514b157e
TEZ-2758. Remove append API in RecoveryService after TEZ-1909 (zjffdu)
(cherry picked from commit 35c926f238ec456c7ddf7e8ca47616c89cf68695)
Conflicts:
tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7514b157
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7514b157
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7514b157
Branch: refs/heads/branch-0.6
Commit: 7514b157ec44b9e356a2579671e56e1bf1997f15
Parents: aa48065
Author: Jeff Zhang <[email protected]>
Authored: Wed Sep 30 10:25:21 2015 +0800
Committer: Jeff Zhang <[email protected]>
Committed: Wed Sep 30 10:31:44 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dag/history/recovery/RecoveryService.java | 57 +++++-----
.../history/recovery/TestRecoveryService.java | 110 ++++++++++++++++++-
3 files changed, 141 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/7514b157/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e0b76ae..1209e22 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
TEZ-2851. Support a way for upstream applications to pass in a caller
context to Tez.
TEZ-2398. Flaky test: TestFaultTolerance
TEZ-2808. Race condition between preemption and container assignment
http://git-wip-us.apache.org/repos/asf/tez/blob/7514b157/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 8a07211..6dcced3 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -304,11 +304,13 @@ public class RecoveryService extends AbstractService {
try {
SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
handleSummaryEvent(dagId, eventType, summaryEvent);
- summaryStream.hflush();
if (summaryEvent.writeToRecoveryImmediately()) {
handleRecoveryEvent(event);
- doFlush(outputStreamMap.get(event.getDagID()),
- appContext.getClock().getTime());
+ // outputStream may already be closed and removed
+ if (outputStreamMap.containsKey(event.getDagID())) {
+ doFlush(outputStreamMap.get(event.getDagID()),
+ appContext.getClock().getTime());
+ }
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Queueing Non-immediate Summary/Recovery event of type"
@@ -334,23 +336,7 @@ public class RecoveryService extends AbstractService {
} catch (IOException ioe) {
LOG.error("Error handling summary event"
+ ", eventType=" + event.getHistoryEvent().getEventType(), ioe);
- Path fatalErrorDir = new Path(recoveryPath,
RECOVERY_FATAL_OCCURRED_DIR);
- try {
- LOG.error("Adding a flag to ensure next AM attempt does not start
up"
- + ", flagFile=" + fatalErrorDir.toString());
- recoveryFatalErrorOccurred.set(true);
- recoveryDirFS.mkdirs(fatalErrorDir);
- if (recoveryDirFS.exists(fatalErrorDir)) {
- LOG.error("Recovery failure occurred. Skipping all events");
- } else {
- // throw error if fatal error flag could not be set
- throw ioe;
- }
- } catch (IOException e) {
- LOG.fatal("Failed to create fatal error flag dir "
- + fatalErrorDir.toString(), e);
- throw ioe;
- }
+ createFatalErrorFlagDir();
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
// Throw error to tell client that dag submission failed
throw ioe;
@@ -366,6 +352,26 @@ public class RecoveryService extends AbstractService {
}
}
+ private void createFatalErrorFlagDir() throws IOException {
+ Path fatalErrorDir = new Path(recoveryPath, RECOVERY_FATAL_OCCURRED_DIR);
+ try {
+ LOG.error("Adding a flag to ensure next AM attempt does not start up"
+ + ", flagFile=" + fatalErrorDir.toString());
+ recoveryFatalErrorOccurred.set(true);
+ recoveryDirFS.mkdirs(fatalErrorDir);
+ if (recoveryDirFS.exists(fatalErrorDir)) {
+ LOG.error("Recovery failure occurred. Skipping all events");
+ } else {
+ // throw error if fatal error flag could not be set
+ throw new IOException("Failed to create fatal error flag dir "
+ + fatalErrorDir.toString());
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to create fatal error flag dir "
+ + fatalErrorDir.toString(), e);
+ }
+ }
+
private void handleSummaryEvent(TezDAGID dagID,
HistoryEventType eventType,
SummaryEvent summaryEvent) throws IOException {
@@ -383,7 +389,8 @@ public class RecoveryService extends AbstractService {
summaryStream = recoveryDirFS.create(summaryPath, false,
bufferSize);
} else {
- summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
+ createFatalErrorFlagDir();
+ return;
}
}
if (LOG.isDebugEnabled()) {
@@ -392,6 +399,7 @@ public class RecoveryService extends AbstractService {
+ ", eventType=" + eventType);
}
summaryEvent.toSummaryProtoStream(summaryStream);
+ summaryStream.hflush();
}
@VisibleForTesting
@@ -419,11 +427,8 @@ public class RecoveryService extends AbstractService {
Path dagFilePath = TezCommonUtils.getDAGRecoveryPath(recoveryPath,
dagID.toString());
FSDataOutputStream outputStream;
if (recoveryDirFS.exists(dagFilePath)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Opening DAG recovery file in append mode"
- + ", filePath=" + dagFilePath);
- }
- outputStream = recoveryDirFS.append(dagFilePath, bufferSize);
+ createFatalErrorFlagDir();
+ return;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Opening DAG recovery file in create mode"
http://git-wip-us.apache.org/repos/asf/tez/blob/7514b157/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
index f10adfc..040b407 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
@@ -22,16 +22,21 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.*;
@@ -42,6 +47,15 @@ public class TestRecoveryService {
private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ TestRecoveryService.class.getName() + "-tmpDir";
+ private Configuration conf;
+
+ @Before
+ public void setUp() throws IllegalArgumentException, IOException {
+ this.conf = new Configuration();
+ FileSystem localFS = FileSystem.getLocal(conf);
+ localFS.delete(new Path(TEST_ROOT_DIR), true);
+ }
+
@Test(timeout = 5000)
public void testDrainEvents() throws IOException {
Configuration conf = new Configuration();
@@ -63,6 +77,100 @@ public class TestRecoveryService {
assertEquals(randEventCount,
recoveryService.processedRecoveryEventCounter.get());
}
+ @Test(timeout = 5000)
+ public void testMultipleDAGFinishedEvent() throws IOException {
+ Configuration conf = new Configuration();
+ ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new
Path(TEST_ROOT_DIR));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+
+ MockRecoveryService recoveryService = new MockRecoveryService(appContext);
+
conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
true);
+ recoveryService.init(conf);
+ recoveryService.start();
+ TezDAGID dagId =
TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(),
1),1);
+ int randEventCount = new Random().nextInt(100) + 100;
+ for (int i=0; i< randEventCount; ++i) {
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new
TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1),
"v1", 0L, 0L)));
+ }
+ recoveryService.await();
+ assertTrue(recoveryService.outputStreamMap.containsKey(dagId));
+ // 2 DAGFinishedEvent
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGFinishedEvent(dagId, 1L, 2L, DAGState.FAILED, "diag", null,
"user", "dag1", null,
+ appAttemptId)));
+ // outputStream removed
+ assertFalse(recoveryService.outputStreamMap.containsKey(dagId));
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null,
"user", "dag1", null,
+ appAttemptId)));
+ // no new outputStream opened
+ assertEquals(recoveryService.outputStreamMap.size(), 0);
+ assertFalse(recoveryService.outputStreamMap.containsKey(dagId));
+ recoveryService.stop();
+ }
+
+ @Test(timeout = 5000)
+ public void testSummaryPathExisted() throws IOException {
+ Configuration conf = new Configuration();
+ ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new
Path(TEST_ROOT_DIR));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+
+ MockRecoveryService recoveryService = new MockRecoveryService(appContext);
+
conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
true);
+ recoveryService.init(conf);
+ recoveryService.start();
+ TezDAGID dagId =
TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(),
1),1);
+ Path dagRecoveryPath =
TezCommonUtils.getSummaryRecoveryPath(recoveryService.recoveryPath);
+ touchFile(dagRecoveryPath);
+ assertFalse(recoveryService.hasRecoveryFailed());
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null,
"user", "dag1", null,
+ appAttemptId)));
+ assertTrue(recoveryService.hasRecoveryFailed());
+ // be able to handle event after fatal error
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null,
"user", "dag1", null,
+ appAttemptId)));
+ }
+
+ @Test(timeout = 5000)
+ public void testRecoveryPathExisted() throws IOException {
+ Configuration conf = new Configuration();
+ ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new
Path(TEST_ROOT_DIR));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+
+ MockRecoveryService recoveryService = new MockRecoveryService(appContext);
+
conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
true);
+ recoveryService.init(conf);
+ recoveryService.start();
+ TezDAGID dagId =
TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(),
1),1);
+ Path dagRecoveryPath =
TezCommonUtils.getDAGRecoveryPath(recoveryService.recoveryPath,
dagId.toString());
+ touchFile(dagRecoveryPath);
+ assertFalse(recoveryService.hasRecoveryFailed());
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new
TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1),
"v1", 0L, 0L)));
+ // wait for recovery event to be handled
+ recoveryService.await();
+ assertTrue(recoveryService.hasRecoveryFailed());
+ // be able to handle recovery event after fatal error
+ recoveryService.handle(new DAGHistoryEvent(dagId,
+ new
TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1),
"v1", 0L, 0L)));
+ }
+
+ private void touchFile(Path path) throws IOException {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ fs.create(path).close();
+ }
+
private static class MockRecoveryService extends RecoveryService {
public AtomicInteger processedRecoveryEventCounter = new AtomicInteger(0);