Repository: hadoop Updated Branches: refs/heads/branch-2 c611707e2 -> 1ec82e578
MAPREDUCE-7015. Possible race condition in JHS if the job is not loaded. Contributed by Peter Bacsko (cherry picked from commit cff9edd4b514bdcfe22cd49964e3707fb78ab876) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1ec82e57 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1ec82e57 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1ec82e57 Branch: refs/heads/branch-2 Commit: 1ec82e57893e08482f77dbca2128a629949a9f00 Parents: c611707 Author: Jason Lowe <[email protected]> Authored: Wed Jan 24 14:44:07 2018 -0600 Committer: Jason Lowe <[email protected]> Committed: Wed Jan 24 14:51:22 2018 -0600 ---------------------------------------------------------------------- .../mapreduce/v2/hs/CachedHistoryStorage.java | 8 +++++- .../mapreduce/v2/hs/HistoryFileManager.java | 30 ++++++++++++++++---- .../hadoop/mapreduce/v2/hs/TestJobHistory.java | 26 +++++++++++++++++ 3 files changed, 57 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ec82e57/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java index c59d17f..b93c8d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java @@ -172,9 +172,14 @@ public class CachedHistoryStorage extends AbstractService implements HistoryFileInfo fileInfo; fileInfo = hsManager.getFileInfo(jobId); + if (fileInfo == null) { throw new HSFileRuntimeException("Unable to find job " + jobId); - } else if (fileInfo.isDeleted()) { + } + + fileInfo.waitUntilMoved(); + + if (fileInfo.isDeleted()) { throw new HSFileRuntimeException("Cannot load deleted job " + jobId); } else { return fileInfo.loadJob(); @@ -210,6 +215,7 @@ public class CachedHistoryStorage extends AbstractService implements for (HistoryFileInfo mi : hsManager.getAllFileInfo()) { if (mi != null) { JobId id = mi.getJobId(); + mi.waitUntilMoved(); result.put(id, new PartialJob(mi.getJobIndexInfo(), id)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ec82e57/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index 045f0ee..6fd1c0e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -455,6 +455,8 @@ public class HistoryFileManager extends AbstractService { } catch (Throwable t) { LOG.error("Error while trying to move a job to done", t); this.state = HistoryInfoState.MOVE_FAILED; + } finally { + notifyAll(); } } @@ -488,12 +490,16 @@ public class HistoryFileManager extends AbstractService { } protected synchronized void delete() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("deleting " + historyFile + " and " + confFile); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("deleting " + historyFile + " and " + confFile); + } + state = HistoryInfoState.DELETED; + doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); + doneDirFc.delete(doneDirFc.makeQualified(confFile), false); + } finally { + notifyAll(); } - state = HistoryInfoState.DELETED; - doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); - doneDirFc.delete(doneDirFc.makeQualified(confFile), false); } public JobIndexInfo getJobIndexInfo() { @@ -520,6 +526,17 @@ public class HistoryFileManager extends AbstractService { jobIndexInfo.getNumMaps(); return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob); } + + public synchronized void waitUntilMoved() { + while (isMovePending() && !didMoveFail()) { + try { + wait(); + } catch (InterruptedException e) { + LOG.warn("Waiting has been interrupted"); + throw new RuntimeException(e); + } + } + } } private SerialNumberIndex serialNumberIndex = null; @@ -959,6 +976,7 @@ public class HistoryFileManager extends AbstractService { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling move to done of " +found); } + moveToDoneExecutor.execute(new Runnable() { @Override public void run() { @@ -1195,5 +1213,5 @@ public class HistoryFileManager extends AbstractService { @VisibleForTesting protected void setMaxHistoryAge(long newValue){ maxHistoryAge=newValue; - } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ec82e57/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java index 936c772..9f36477 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java @@ -446,6 +446,32 @@ public class TestJobHistory { } @Test + public void testCachedStorageWaitsForFileMove() throws IOException { + HistoryFileManager historyManager = mock(HistoryFileManager.class); + jobHistory = spy(new JobHistory()); + doReturn(historyManager).when(jobHistory).createHistoryFileManager(); + + Configuration conf = new Configuration(); + jobHistory.init(conf); + jobHistory.start(); + + CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory + .getHistoryStorage()); + + Job job = mock(Job.class); + JobId jobId = mock(JobId.class); + when(job.getID()).thenReturn(jobId); + when(job.getTotalMaps()).thenReturn(10); + when(job.getTotalReduces()).thenReturn(2); + HistoryFileInfo fileInfo = mock(HistoryFileInfo.class); + when(historyManager.getFileInfo(eq(jobId))).thenReturn(fileInfo); + when(fileInfo.loadJob()).thenReturn(job); + + storage.getFullJob(jobId); + verify(fileInfo).waitUntilMoved(); + } + + @Test public void testRefreshLoadedJobCacheUnSupportedOperation() { jobHistory = spy(new JobHistory()); HistoryStorage storage = new HistoryStorage() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
