Repository: tez Updated Branches: refs/heads/master b17edc401 -> 9cf25d142
TEZ-3434. Add unit tests for flushing of recovery events. (Harish Jaiprakash via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9cf25d14 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9cf25d14 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9cf25d14 Branch: refs/heads/master Commit: 9cf25d142b4c30c7d903c1ae03bca7b070b706b0 Parents: b17edc4 Author: Hitesh Shah <hit...@apache.org> Authored: Mon Sep 19 15:50:22 2016 -0700 Committer: Hitesh Shah <hit...@apache.org> Committed: Mon Sep 19 15:50:22 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../dag/history/recovery/RecoveryService.java | 3 +- .../history/recovery/TestRecoveryService.java | 390 +++++++++++++++---- 3 files changed, 325 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9cf25d14/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ed0ef7b..c7f540b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3434. Add unit tests for flushing of recovery events. TEZ-3317. Speculative execution starts too early due to 0 progress. TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM. TEZ-3272. Add AMContainerImpl and AMNodeImpl to StateMachine visualization list. http://git-wip-us.apache.org/repos/asf/tez/blob/9cf25d14/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index 3eeddf5..8c29172 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -69,7 +69,8 @@ public class RecoveryService extends AbstractService { @VisibleForTesting public static final boolean TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT = true; - private LinkedBlockingQueue<DAGHistoryEvent> eventQueue = + @VisibleForTesting + LinkedBlockingQueue<DAGHistoryEvent> eventQueue = new LinkedBlockingQueue<DAGHistoryEvent>(); private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>(); private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>(); http://git-wip-us.apache.org/repos/asf/tez/blob/9cf25d14/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java index d828d6b..3dec1d7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java @@ -17,90 +17,140 @@ */ package org.apache.tez.dag.history.recovery; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.DAGCommitStartedEvent; import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.hadoop.shim.DefaultHadoopShim; -import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.*; -import static org.junit.Assert.*; - public class TestRecoveryService { - private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestRecoveryService.class.getName() + "-tmpDir"; + private static final long startTime = System.currentTimeMillis(); + private static final ApplicationId appId = ApplicationId.newInstance(startTime, 1); + private static final ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + private static final TezDAGID dagId = TezDAGID.getInstance(appId, 1); + private static final TezVertexID vertexId = TezVertexID.getInstance(dagId, 1); + private static final TezTaskID tezTaskId = TezTaskID.getInstance(vertexId, 1); + private Configuration conf; + private AppContext appContext; + private MockRecoveryService recoveryService; + private Path dagRecoveryPath; + private Path summaryPath; + private FileSystem fs; + private FSDataOutputStream dagFos; + private FSDataOutputStream summaryFos; - @Before - public void setUp() throws IllegalArgumentException, IOException { - this.conf = new Configuration(); - FileSystem localFS = FileSystem.getLocal(conf); - localFS.delete(new Path(TEST_ROOT_DIR), true); - } + private void setup(boolean useMockFs, String[][] configs) throws Exception { + conf = new Configuration(); + if (configs != null) { + for (String[] config : configs) { + conf.set(config[0], config[1]); + } + } - @Test(timeout = 5000) - public void testDrainEvents() throws IOException { - Configuration conf = new Configuration(); - AppContext appContext = mock(AppContext.class); - when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); + appContext = mock(AppContext.class); when(appContext.getClock()).thenReturn(new SystemClock()); when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); - ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); when(appContext.getApplicationID()).thenReturn(appId); - MockRecoveryService recoveryService = new MockRecoveryService(appContext); + if (useMockFs) { + fs = mock(FileSystem.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path("mockfs:///")); + conf.set("fs.mockfs.impl", MockFileSystem.class.getName()); + MockFileSystem.delegate = fs; + dagFos = spy(new FSDataOutputStream(new OutputStream() { + @Override + public void write(int b) throws IOException {} + }, null)); + summaryFos = spy(new FSDataOutputStream(new OutputStream() { + @Override + public void write(int b) throws IOException {} + }, null)); + } else { + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); + fs = FileSystem.getLocal(conf); + fs.delete(new Path(TEST_ROOT_DIR), true); + } + + recoveryService = new MockRecoveryService(appContext); conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); recoveryService.init(conf); + + summaryPath = TezCommonUtils.getSummaryRecoveryPath(recoveryService.recoveryPath); + dagRecoveryPath = TezCommonUtils.getDAGRecoveryPath( + recoveryService.recoveryPath, dagId.toString()); + if (useMockFs) { + when(fs.create(eq(dagRecoveryPath), eq(false), anyInt())).thenReturn(dagFos); + when(fs.create(eq(summaryPath), eq(false), anyInt())).thenReturn(summaryFos); + } + } + + @Test(timeout = 5000) + public void testDrainEvents() throws Exception { + setup(false, null); recoveryService.start(); - TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1); int randEventCount = new Random().nextInt(100) + 100; for (int i=0; i< randEventCount; ++i) { recoveryService.handle(new DAGHistoryEvent(dagId, - new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L))); + new TaskStartedEvent(tezTaskId, "v1", 0L, 0L))); } recoveryService.stop(); assertEquals(randEventCount, recoveryService.processedRecoveryEventCounter.get()); } @Test(timeout = 5000) - public void testMultipleDAGFinishedEvent() throws IOException { - Configuration conf = new Configuration(); - ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - AppContext appContext = mock(AppContext.class); - when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); - when(appContext.getClock()).thenReturn(new SystemClock()); - when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); - when(appContext.getApplicationID()).thenReturn(appId); - - MockRecoveryService recoveryService = new MockRecoveryService(appContext); - conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); - recoveryService.init(conf); + public void testMultipleDAGFinishedEvent() throws Exception { + setup(false, null); recoveryService.start(); - TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1); int randEventCount = new Random().nextInt(100) + 100; for (int i=0; i< randEventCount; ++i) { recoveryService.handle(new DAGHistoryEvent(dagId, - new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L))); + new TaskStartedEvent(tezTaskId, "v1", 0L, 0L))); } recoveryService.await(); assertTrue(recoveryService.outputStreamMap.containsKey(dagId)); @@ -120,23 +170,10 @@ public class TestRecoveryService { } @Test(timeout = 5000) - public void testSummaryPathExisted() throws IOException { - Configuration conf = new Configuration(); - ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - AppContext appContext = mock(AppContext.class); - when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); - when(appContext.getClock()).thenReturn(new SystemClock()); - when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); - when(appContext.getApplicationID()).thenReturn(appId); - - MockRecoveryService recoveryService = new MockRecoveryService(appContext); - conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); - recoveryService.init(conf); + public void testSummaryPathExisted() throws Exception { + setup(false, null); recoveryService.start(); - TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1); - Path dagRecoveryPath = TezCommonUtils.getSummaryRecoveryPath(recoveryService.recoveryPath); - touchFile(dagRecoveryPath); + touchFile(summaryPath); assertFalse(recoveryService.hasRecoveryFailed()); recoveryService.handle(new DAGHistoryEvent(dagId, new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null, @@ -146,38 +183,183 @@ public class TestRecoveryService { recoveryService.handle(new DAGHistoryEvent(dagId, new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null, appAttemptId, null))); + recoveryService.stop(); } @Test(timeout = 5000) - public void testRecoveryPathExisted() throws IOException { - Configuration conf = new Configuration(); - ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - AppContext appContext = mock(AppContext.class); - when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR)); - when(appContext.getClock()).thenReturn(new SystemClock()); - when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); - when(appContext.getApplicationID()).thenReturn(appId); - - MockRecoveryService recoveryService = new MockRecoveryService(appContext); - conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); - recoveryService.init(conf); + public void testRecoveryPathExisted() throws Exception { + setup(false, null); recoveryService.start(); - TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1),1); - Path dagRecoveryPath = TezCommonUtils.getDAGRecoveryPath(recoveryService.recoveryPath, dagId.toString()); touchFile(dagRecoveryPath); assertFalse(recoveryService.hasRecoveryFailed()); recoveryService.handle(new DAGHistoryEvent(dagId, - new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L))); + new TaskStartedEvent(tezTaskId, "v1", 0L, 0L))); // wait for recovery event to be handled recoveryService.await(); assertTrue(recoveryService.hasRecoveryFailed()); // be able to handle recovery event after fatal error recoveryService.handle(new DAGHistoryEvent(dagId, - new TaskStartedEvent(TezTaskID.getInstance(TezVertexID.getInstance(dagId, 1), 1), "v1", 0L, 0L))); + new TaskStartedEvent(tezTaskId, "v1", 0L, 0L))); + recoveryService.stop(); + } + + @Test(timeout=5000) + public void testRecoveryFlushOnMaxEvents() throws Exception { + setup(true, new String[][] { + {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "10"}, + {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "-1"} + }); + recoveryService.start(); + + // Send 1 event less, wait for drain + for (int i = 0; i < 9; ++i) { + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGStartedEvent(dagId, startTime, "nobody", "test-dag"))); + } + waitForDrain(-1); + verify(dagFos, times(0)).hflush(); + + // This event should cause the flush. + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGStartedEvent(dagId, startTime, "nobody", "test-dag"))); + waitForDrain(-1); + verify(dagFos, times(1)).hflush(); + + recoveryService.stop(); + } + + @Test(timeout=10000) + public void testRecoveryFlushOnTimeoutEvents() throws Exception { + setup(true, new String[][] { + {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "-1"}, + {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "5"} + }); + recoveryService.start(); + + // Send lot of events. + for (int i = 0; i < TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT; ++i) { + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGStartedEvent(dagId, startTime, "nobody", "test-dag"))); + } + // wait for timeout. + Thread.sleep(5000); + assertTrue(recoveryService.eventQueue.isEmpty()); + verify(fs, times(1)).create(eq(dagRecoveryPath), eq(false), anyInt()); + verify(dagFos, times(0)).hflush(); + + // The flush is trigged by sending 1 event after the timeout. + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGStartedEvent(dagId, startTime, "nobody", "test-dag"))); + waitForDrain(1000); + verify(dagFos, times(1)).hflush(); + + recoveryService.stop(); + } + + @Test(timeout=10000) + public void testRecoveryFlush() throws Exception { + setup(true, new String[][] { + {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "10"}, + {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "5"} + }); + recoveryService.start(); + + // 5 second flush + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGStartedEvent(dagId, startTime, "nobody", "test-dag"))); + Thread.sleep(5000); + assertTrue(recoveryService.eventQueue.isEmpty()); + verify(fs, times(1)).create(eq(dagRecoveryPath), eq(false), anyInt()); + verify(dagFos, times(0)).hflush(); + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGStartedEvent(dagId, startTime, "nobody", "test-dag"))); + waitForDrain(1000); + verify(dagFos, times(1)).hflush(); + + // Number of events flush. + for (int i = 0; i < 9; ++i) { + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGStartedEvent(dagId, startTime, "nobody", "test-dag"))); + } + waitForDrain(-1); + verify(dagFos, times(1)).hflush(); + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGStartedEvent(dagId, startTime, "nobody", "test-dag"))); + waitForDrain(-1); + verify(dagFos, times(2)).hflush(); + + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGStartedEvent(dagId, startTime, "nobody", "test-dag"))); + + recoveryService.stop(); + } + + @Test(timeout=50000) + public void testRecoveryFlushOnStop() throws Exception { + setup(true, new String[][] { + {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "-1"}, + {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "-1"} + }); + recoveryService.start(); + + // Does not flush on event counts. + for (int i = 0; i < TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT; ++i) { + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGStartedEvent(dagId, startTime, "nobody", "test-dag"))); + } + waitForDrain(-1); + verify(dagFos, times(0)).hflush(); + + // Does not flush on timeout. + Thread.sleep(TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT * 1000); + recoveryService.handle(new DAGHistoryEvent(dagId, + new DAGStartedEvent(dagId, startTime, "nobody", "test-dag"))); + waitForDrain(-1); + verify(dagFos, times(0)).hflush(); + + // Does flush on stop. + recoveryService.stop(); + verify(dagFos, times(1)).hflush(); + } + + @Test(timeout=5000) + public void testRecoveryFlushOnSummaryEvent() throws Exception { + setup(true, new String[][] { + {TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, "-1"}, + {TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS, "-1"} + }); + recoveryService.start(); + + DAGPlan dagPlan = DAGPlan.newBuilder().setName("test_dag").build(); + // This writes to recovery immediately. + recoveryService.handle(new DAGHistoryEvent(dagId, new DAGSubmittedEvent( + dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null))); + waitForDrain(-1); + verify(summaryFos, times(1)).hflush(); + verify(dagFos, times(1)).hflush(); + + // This does not write to recovery immediately. + recoveryService.handle(new DAGHistoryEvent(dagId, new DAGCommitStartedEvent(dagId, startTime))); + waitForDrain(-1); + verify(summaryFos, times(2)).hflush(); + verify(dagFos, times(1)).hflush(); + + // Does flush on stop. + recoveryService.stop(); + verify(dagFos, times(2)).hflush(); + } + + private void waitForDrain(int limit) throws Exception { + long maxTime = System.currentTimeMillis() + limit; + while (!recoveryService.eventQueue.isEmpty()) { + Thread.sleep(10); + if (limit != -1 && System.currentTimeMillis() > maxTime) { + break; + } + } } private void touchFile(Path path) throws IOException { - FileSystem fs = FileSystem.getLocal(new Configuration()); fs.create(path).close(); } @@ -196,4 +378,76 @@ public class TestRecoveryService { processedRecoveryEventCounter.addAndGet(1); } } + + // Public access to ensure it can be created through reflection. + public static class MockFileSystem extends FileSystem { + // Should be set to a mock fs in the test, only one instance of this class can run. + static FileSystem delegate; + + static URI uri = URI.create("mockfs:///"); + + @Override + public URI getUri() { + return uri; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return delegate.open(f, bufferSize); + } + + @Override + public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException { + return delegate.create(f, overwrite, bufferSize); + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, + int bufferSize, short replication, long blockSize, Progressable progress) + throws IOException { + return delegate.create(f, permission, overwrite, bufferSize, replication, blockSize, + progress); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) + throws IOException { + return delegate.append(f, bufferSize, progress); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return delegate.rename(src, dst); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return delegate.delete(f, recursive); + } + + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + return delegate.listStatus(f); + } + + @Override + public void setWorkingDirectory(Path new_dir) { + delegate.setWorkingDirectory(new_dir); + } + + @Override + public Path getWorkingDirectory() { + return delegate.getWorkingDirectory(); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + return delegate.mkdirs(f, permission); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return delegate.getFileStatus(f); + } + } }