Repository: hadoop Updated Branches: refs/heads/branch-3.1 d7442c244 -> 44c4928b6
YARN-8242. YARN NM: OOM error while reading back the state store on recovery. Contributed by Pradeep Ambati and Kanwaljeet Sachdev (cherry picked from commit 65e7469712be6cf393e29ef73cc94727eec81227) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/44c4928b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/44c4928b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/44c4928b Branch: refs/heads/branch-3.1 Commit: 44c4928b64498c76fc5dffe288d9e959960282df Parents: d7442c2 Author: Jason Lowe <[email protected]> Authored: Mon Aug 20 10:14:40 2018 -0500 Committer: Jason Lowe <[email protected]> Committed: Mon Aug 20 10:21:57 2018 -0500 ---------------------------------------------------------------------- .../server/nodemanager/DeletionService.java | 25 +- .../containermanager/ContainerManagerImpl.java | 26 +- .../localizer/ResourceLocalizationService.java | 56 +-- .../recovery/NMLeveldbStateStoreService.java | 412 ++++++++++++------- .../recovery/NMNullStateStoreService.java | 2 +- .../recovery/NMStateStoreService.java | 55 +-- .../nodemanager/recovery/RecoveryIterator.java | 41 ++ .../security/NMContainerTokenSecretManager.java | 27 +- .../security/NMTokenSecretManagerInNM.java | 15 +- .../recovery/NMMemoryStateStoreService.java | 82 +++- .../TestNMLeveldbStateStoreService.java | 216 +++++++--- 11 files changed, 647 insertions(+), 310 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/44c4928b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index ae81dc1..e665c5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -19,13 +19,14 @@ package org.apache.hadoop.yarn.server.nodemanager; import static java.util.concurrent.TimeUnit.SECONDS; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -96,16 +97,20 @@ public class DeletionService extends AbstractService { private void recover(NMStateStoreService.RecoveredDeletionServiceState state) throws IOException { - List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks(); Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap = - new HashMap<>(taskProtos.size()); - Set<Integer> successorTasks = new HashSet<>(); - for (DeletionServiceDeleteTaskProto proto : taskProtos) { - DeletionTaskRecoveryInfo info = - NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); - idToInfoMap.put(info.getTask().getTaskId(), info); - nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); - successorTasks.addAll(info.getSuccessorTaskIds()); + new HashMap<Integer, DeletionTaskRecoveryInfo>(); + Set<Integer> successorTasks = new HashSet<Integer>(); + + try (RecoveryIterator<DeletionServiceDeleteTaskProto> it = + state.getIterator()) { + while (it.hasNext()) { + DeletionServiceDeleteTaskProto proto = it.next(); + DeletionTaskRecoveryInfo info = + NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); + idToInfoMap.put(info.getTask().getTaskId(), info); + nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); + successorTasks.addAll(info.getSuccessorTaskIds()); + } } // restore the task dependencies and schedule the deletion tasks that http://git-wip-us.apache.org/repos/asf/hadoop/blob/44c4928b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 8b35258..b89e2dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -23,6 +23,7 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -356,19 +357,26 @@ public class ContainerManagerImpl extends CompositeService implements stateStore.loadLocalizationState()); RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); - for (ContainerManagerApplicationProto proto : - appsState.getApplications()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering application with state: " + proto.toString()); + try (RecoveryIterator<ContainerManagerApplicationProto> rasIterator = + appsState.getIterator()) { + while (rasIterator.hasNext()) { + ContainerManagerApplicationProto proto = rasIterator.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering application with state: " + proto.toString()); + } + recoverApplication(proto); } - recoverApplication(proto); } - for (RecoveredContainerState rcs : stateStore.loadContainersState()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering container with state: " + rcs); + try (RecoveryIterator<RecoveredContainerState> rcsIterator = + stateStore.getContainerStateIterator()) { + while (rcsIterator.hasNext()) { + RecoveredContainerState rcs = rcsIterator.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering container with state: " + rcs); + } + recoverContainer(rcs); } - recoverContainer(rcs); } // Recovery AMRMProxy state after apps and containers are recovered http://git-wip-us.apache.org/repos/asf/hadoop/blob/44c4928b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 142387e..7a0cd8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -298,42 +300,46 @@ public class ResourceLocalizationService extends CompositeService //Recover localized resources after an NM restart public void recoverLocalizedResources(RecoveredLocalizationState state) - throws URISyntaxException { + throws URISyntaxException, IOException { LocalResourceTrackerState trackerState = state.getPublicTrackerState(); recoverTrackerResources(publicRsrc, trackerState); - for (Map.Entry<String, RecoveredUserResources> userEntry : - state.getUserResources().entrySet()) { - String user = userEntry.getKey(); - RecoveredUserResources userResources = userEntry.getValue(); - trackerState = userResources.getPrivateTrackerState(); - if (!trackerState.isEmpty()) { - LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - null, dispatcher, true, super.getConfig(), stateStore, dirsHandler); - LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, - tracker); - if (oldTracker != null) { - tracker = oldTracker; - } - recoverTrackerResources(tracker, trackerState); - } - - for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry : - userResources.getAppTrackerStates().entrySet()) { - trackerState = appEntry.getValue(); + try (RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it + = state.getIterator()) { + while (it.hasNext()) { + Map.Entry<String, RecoveredUserResources> userEntry = it.next(); + String user = userEntry.getKey(); + RecoveredUserResources userResources = userEntry.getValue(); + trackerState = userResources.getPrivateTrackerState(); if (!trackerState.isEmpty()) { - ApplicationId appId = appEntry.getKey(); - String appIdStr = appId.toString(); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - appId, dispatcher, false, super.getConfig(), stateStore, + null, dispatcher, true, super.getConfig(), stateStore, dirsHandler); - LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, tracker); if (oldTracker != null) { tracker = oldTracker; } recoverTrackerResources(tracker, trackerState); } + + for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry : + userResources.getAppTrackerStates().entrySet()) { + trackerState = appEntry.getValue(); + if (!trackerState.isEmpty()) { + ApplicationId appId = appEntry.getKey(); + String appIdStr = appId.toString(); + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, false, super.getConfig(), stateStore, + dirsHandler); + LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + tracker); + if (oldTracker != null) { + tracker = oldTracker; + } + recoverTrackerResources(tracker, trackerState); + } + } } } } @@ -559,7 +565,7 @@ public class ResourceLocalizationService extends CompositeService rsrcCleanup.getResources(); for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e : rsrcs.entrySet()) { - LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), + LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), c.getContainerId().getApplicationAttemptId() .getApplicationId()); for (LocalResourceRequest req : e.getValue()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/44c4928b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 67f642d..5d4253d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -73,6 +74,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -225,68 +227,119 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { return isHealthy; } - @Override - public List<RecoveredContainerState> loadContainersState() + // LeveldbIterator starting at startkey + private LeveldbIterator getLevelDBIterator(String startKey) throws IOException { - ArrayList<RecoveredContainerState> containers = - new ArrayList<RecoveredContainerState>(); - ArrayList<ContainerId> containersToRemove = - new ArrayList<ContainerId>(); - LeveldbIterator iter = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(CONTAINERS_KEY_PREFIX)); + LeveldbIterator it = new LeveldbIterator(db); + it.seek(bytes(startKey)); + return it; + } catch (DBException e) { + throw new IOException(e); + } + } - while (iter.hasNext()) { - Entry<byte[], byte[]> entry = iter.peekNext(); + // Base Recovery Iterator + private abstract class BaseRecoveryIterator<T> implements + RecoveryIterator<T> { + LeveldbIterator it; + T nextItem; + + BaseRecoveryIterator(String dbKey) throws IOException { + this.it = getLevelDBIterator(dbKey); + this.nextItem = null; + } + + protected abstract T getNextItem(LeveldbIterator it) throws IOException; + + @Override + public boolean hasNext() throws IOException { + if (nextItem == null) { + nextItem = getNextItem(it); + } + return (nextItem != null); + } + + @Override + public T next() throws IOException, NoSuchElementException { + T tmp = nextItem; + if (tmp != null) { + nextItem = null; + return tmp; + } else { + tmp = getNextItem(it); + if (tmp == null) { + throw new NoSuchElementException(); + } + return tmp; + } + } + + @Override + public void close() throws IOException { + if (it != null) { + it.close(); + } + } + } + + // Container Recovery Iterator + private class ContainerStateIterator extends + BaseRecoveryIterator<RecoveredContainerState> { + ContainerStateIterator() throws IOException { + super(CONTAINERS_KEY_PREFIX); + } + + @Override + protected RecoveredContainerState getNextItem(LeveldbIterator it) + throws IOException { + return getNextRecoveredContainer(it); + } + } + + private RecoveredContainerState getNextRecoveredContainer(LeveldbIterator it) + throws IOException { + RecoveredContainerState rcs = null; + try { + while (it.hasNext()) { + Entry<byte[], byte[]> entry = it.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { - break; + return null; } int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length()); if (idEndPos < 0) { throw new IOException("Unable to determine container in key: " + key); } - ContainerId containerId = ContainerId.fromString( - key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos)); - String keyPrefix = key.substring(0, idEndPos+1); - RecoveredContainerState rcs = loadContainerState(containerId, - iter, keyPrefix); - // Don't load container without StartContainerRequest + String keyPrefix = key.substring(0, idEndPos + 1); + rcs = loadContainerState(it, keyPrefix); if (rcs.startRequest != null) { - containers.add(rcs); + break; } else { - containersToRemove.add(containerId); + removeContainer(rcs.getContainerId()); + rcs = null; } } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return rcs; + } - // remove container without StartContainerRequest - for (ContainerId containerId : containersToRemove) { - LOG.warn("Remove container " + containerId + - " with incomplete records"); - try { - removeContainer(containerId); - // TODO: kill and cleanup the leaked container - } catch (IOException e) { - LOG.error("Unable to remove container " + containerId + - " in store", e); - } - } - return containers; + @Override + public RecoveryIterator<RecoveredContainerState> getContainerStateIterator() + throws IOException { + return new ContainerStateIterator(); } - private RecoveredContainerState loadContainerState(ContainerId containerId, - LeveldbIterator iter, String keyPrefix) throws IOException { - RecoveredContainerState rcs = new RecoveredContainerState(); + private RecoveredContainerState loadContainerState(LeveldbIterator iter, + String keyPrefix) throws IOException { + ContainerId containerId = ContainerId.fromString( + keyPrefix.substring(CONTAINERS_KEY_PREFIX.length(), + keyPrefix.length()-1)); + RecoveredContainerState rcs = new RecoveredContainerState(containerId); rcs.status = RecoveredContainerStatus.REQUESTED; while (iter.hasNext()) { Entry<byte[],byte[]> entry = iter.peekNext(); @@ -680,35 +733,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } - @Override - public RecoveredApplicationsState loadApplicationsState() - throws IOException { - RecoveredApplicationsState state = new RecoveredApplicationsState(); - state.applications = new ArrayList<ContainerManagerApplicationProto>(); - String keyPrefix = APPLICATIONS_KEY_PREFIX; - LeveldbIterator iter = null; + // Application Recovery Iterator + private class ApplicationStateIterator extends + BaseRecoveryIterator<ContainerManagerApplicationProto> { + ApplicationStateIterator() throws IOException { + super(APPLICATIONS_KEY_PREFIX); + } + + @Override + protected ContainerManagerApplicationProto getNextItem(LeveldbIterator it) + throws IOException { + return getNextRecoveredApplication(it); + } + } + + private ContainerManagerApplicationProto getNextRecoveredApplication( + LeveldbIterator it) throws IOException { + ContainerManagerApplicationProto applicationProto = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(keyPrefix)); - while (iter.hasNext()) { - Entry<byte[], byte[]> entry = iter.next(); + if (it.hasNext()) { + Entry<byte[], byte[]> entry = it.next(); String key = asString(entry.getKey()); - if (!key.startsWith(keyPrefix)) { - break; + if (!key.startsWith(APPLICATIONS_KEY_PREFIX)) { + return null; } - state.applications.add( - ContainerManagerApplicationProto.parseFrom(entry.getValue())); + applicationProto = ContainerManagerApplicationProto.parseFrom( + entry.getValue()); } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return applicationProto; + } + @Override + public RecoveredApplicationsState loadApplicationsState() + throws IOException { + RecoveredApplicationsState state = new RecoveredApplicationsState(); + state.it = new ApplicationStateIterator(); cleanupDeprecatedFinishedApps(); - return state; } @@ -752,24 +815,29 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } - @Override - public RecoveredLocalizationState loadLocalizationState() - throws IOException { - RecoveredLocalizationState state = new RecoveredLocalizationState(); + // User Resource Recovery Iterator. + private class UserResourcesIterator extends + BaseRecoveryIterator<Entry<String, RecoveredUserResources>> { + UserResourcesIterator() throws IOException { + super(LOCALIZATION_PRIVATE_KEY_PREFIX); + } - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX)); - state.publicTrackerState = loadResourceTrackerState(iter, - LOCALIZATION_PUBLIC_KEY_PREFIX); + @Override + protected Entry<String, RecoveredUserResources> getNextItem( + LeveldbIterator it) throws IOException { + return getNextRecoveredPrivateLocalizationEntry(it); + } + } - iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX)); - while (iter.hasNext()) { - Entry<byte[],byte[]> entry = iter.peekNext(); + private Entry<String, RecoveredUserResources> getNextRecoveredPrivateLocalizationEntry( + LeveldbIterator it) throws IOException { + Entry<String, RecoveredUserResources> localEntry = null; + try { + if (it.hasNext()) { + Entry<byte[], byte[]> entry = it.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) { - break; + return null; } int userEndPos = key.indexOf('/', @@ -780,17 +848,24 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } String user = key.substring( LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos); - state.userResources.put(user, loadUserLocalizedResources(iter, - key.substring(0, userEndPos+1))); + RecoveredUserResources val = loadUserLocalizedResources(it, + key.substring(0, userEndPos+1)); + localEntry = new AbstractMap.SimpleEntry<>(user, val); } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return localEntry; + } + @Override + public RecoveredLocalizationState loadLocalizationState() + throws IOException { + RecoveredLocalizationState state = new RecoveredLocalizationState(); + LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX); + state.publicTrackerState = loadResourceTrackerState(it, + LOCALIZATION_PUBLIC_KEY_PREFIX); + state.it = new UserResourcesIterator(); return state; } @@ -800,7 +875,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX; LocalResourceTrackerState state = new LocalResourceTrackerState(); while (iter.hasNext()) { - Entry<byte[],byte[]> entry = iter.peekNext(); + Entry<byte[], byte[]> entry = iter.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { break; @@ -981,32 +1056,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { + LOCALIZATION_APPCACHE_SUFFIX + appId + "/"; } + // Deletion State Recovery Iterator. + private class DeletionStateIterator extends + BaseRecoveryIterator<DeletionServiceDeleteTaskProto> { + DeletionStateIterator() throws IOException { + super(DELETION_TASK_KEY_PREFIX); + } - @Override - public RecoveredDeletionServiceState loadDeletionServiceState() - throws IOException { - RecoveredDeletionServiceState state = new RecoveredDeletionServiceState(); - state.tasks = new ArrayList<DeletionServiceDeleteTaskProto>(); - LeveldbIterator iter = null; + @Override + protected DeletionServiceDeleteTaskProto getNextItem(LeveldbIterator it) + throws IOException { + return getNextRecoveredDeletionService(it); + } + } + + private DeletionServiceDeleteTaskProto getNextRecoveredDeletionService( + LeveldbIterator it) throws IOException { + DeletionServiceDeleteTaskProto deleteProto = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(DELETION_TASK_KEY_PREFIX)); - while (iter.hasNext()) { - Entry<byte[], byte[]> entry = iter.next(); + if (it.hasNext()) { + Entry<byte[], byte[]> entry = it.next(); String key = asString(entry.getKey()); if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) { - break; + return null; } - state.tasks.add( - DeletionServiceDeleteTaskProto.parseFrom(entry.getValue())); + deleteProto = DeletionServiceDeleteTaskProto.parseFrom( + entry.getValue()); } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return deleteProto; + } + + @Override + public RecoveredDeletionServiceState loadDeletionServiceState() + throws IOException { + RecoveredDeletionServiceState state = new RecoveredDeletionServiceState(); + state.it = new DeletionStateIterator(); return state; } @@ -1033,29 +1120,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } } + private MasterKey getMasterKey(String dbKey) throws IOException { + try{ + byte[] data = db.get(bytes(dbKey)); + if (data == null || data.length == 0) { + return null; + } + return parseMasterKey(data); + } catch (DBException e) { + throw new IOException(e); + } + } - @Override - public RecoveredNMTokensState loadNMTokensState() throws IOException { - RecoveredNMTokensState state = new RecoveredNMTokensState(); - state.applicationMasterKeys = - new HashMap<ApplicationAttemptId, MasterKey>(); - LeveldbIterator iter = null; + // Recover NMTokens Iterator + private class NMTokensStateIterator extends + BaseRecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> { + NMTokensStateIterator() throws IOException { + super(NM_TOKENS_KEY_PREFIX); + } + + @Override + protected Entry<ApplicationAttemptId, MasterKey> getNextItem( + LeveldbIterator it) throws IOException { + return getNextMasterKeyEntry(it); + } + } + + private Entry<ApplicationAttemptId, MasterKey> getNextMasterKeyEntry( + LeveldbIterator it) throws IOException { + Entry<ApplicationAttemptId, MasterKey> masterKeyentry = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(NM_TOKENS_KEY_PREFIX)); - while (iter.hasNext()) { - Entry<byte[], byte[]> entry = iter.next(); + while (it.hasNext()) { + Entry<byte[], byte[]> entry = it.next(); String fullKey = asString(entry.getKey()); if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) { break; } String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length()); - if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) { - state.currentMasterKey = parseMasterKey(entry.getValue()); - } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) { - state.previousMasterKey = parseMasterKey(entry.getValue()); - } else if (key.startsWith( - ApplicationAttemptId.appAttemptIdStrPrefix)) { + if (key.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { ApplicationAttemptId attempt; try { attempt = ApplicationAttemptId.fromString(key); @@ -1063,17 +1165,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { throw new IOException("Bad application master key state for " + fullKey, e); } - state.applicationMasterKeys.put(attempt, + masterKeyentry = new AbstractMap.SimpleEntry<>(attempt, parseMasterKey(entry.getValue())); + break; } } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return masterKeyentry; + } + + @Override + public RecoveredNMTokensState loadNMTokensState() throws IOException { + RecoveredNMTokensState state = new RecoveredNMTokensState(); + state.currentMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX + + CURRENT_MASTER_KEY_SUFFIX); + state.previousMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX + + PREV_MASTER_KEY_SUFFIX); + state.it = new NMTokensStateIterator(); return state; } @@ -1122,45 +1232,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } } + // Recover ContainersToken Iterator. + private class ContainerTokensStateIterator extends + BaseRecoveryIterator<Entry<ContainerId, Long>> { + ContainerTokensStateIterator() throws IOException { + super(CONTAINER_TOKENS_KEY_PREFIX); + } - @Override - public RecoveredContainerTokensState loadContainerTokensState() + @Override + protected Entry<ContainerId, Long> getNextItem(LeveldbIterator it) + throws IOException { + return getNextContainerToken(it); + } + } + + private Entry<ContainerId, Long> getNextContainerToken(LeveldbIterator it) throws IOException { - RecoveredContainerTokensState state = new RecoveredContainerTokensState(); - state.activeTokens = new HashMap<ContainerId, Long>(); - LeveldbIterator iter = null; + Entry<ContainerId, Long> containerTokenEntry = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX)); - final int containerTokensKeyPrefixLength = - CONTAINER_TOKENS_KEY_PREFIX.length(); - while (iter.hasNext()) { - Entry<byte[], byte[]> entry = iter.next(); + while (it.hasNext()) { + Entry<byte[], byte[]> entry = it.next(); String fullKey = asString(entry.getKey()); if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) { break; } - String key = fullKey.substring(containerTokensKeyPrefixLength); - if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) { - state.currentMasterKey = parseMasterKey(entry.getValue()); - } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) { - state.previousMasterKey = parseMasterKey(entry.getValue()); - } else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) { - loadContainerToken(state, fullKey, key, entry.getValue()); + String key = fullKey.substring(CONTAINER_TOKENS_KEY_PREFIX.length()); + if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) { + containerTokenEntry = loadContainerToken(fullKey, key, + entry.getValue()); + break; } } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } - return state; + return containerTokenEntry; } - private static void loadContainerToken(RecoveredContainerTokensState state, - String key, String containerIdStr, byte[] value) throws IOException { + private static Entry<ContainerId, Long> loadContainerToken(String key, + String containerIdStr, byte[] value) throws IOException { ContainerId containerId; Long expTime; try { @@ -1169,7 +1279,19 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } catch (IllegalArgumentException e) { throw new IOException("Bad container token state for " + key, e); } - state.activeTokens.put(containerId, expTime); + return new AbstractMap.SimpleEntry<>(containerId, expTime); + } + + @Override + public RecoveredContainerTokensState loadContainerTokensState() + throws IOException { + RecoveredContainerTokensState state = new RecoveredContainerTokensState(); + state.currentMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX + + CURRENT_MASTER_KEY_SUFFIX); + state.previousMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX + + PREV_MASTER_KEY_SUFFIX); + state.it = new ContainerTokensStateIterator(); + return state; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/44c4928b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index dfad9cf..3ae00f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -65,7 +65,7 @@ public class NMNullStateStoreService extends NMStateStoreService { } @Override - public List<RecoveredContainerState> loadContainersState() + public RecoveryIterator<RecoveredContainerState> getContainerStateIterator() throws IOException { throw new UnsupportedOperationException( "Recovery not supported by this state store"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/44c4928b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 70decdb..35caec9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -67,12 +68,11 @@ public abstract class NMStateStoreService extends AbstractService { } public static class RecoveredApplicationsState { - List<ContainerManagerApplicationProto> applications; + RecoveryIterator<ContainerManagerApplicationProto> it = null; - public List<ContainerManagerApplicationProto> getApplications() { - return applications; + public RecoveryIterator<ContainerManagerApplicationProto> getIterator() { + return it; } - } /** @@ -106,6 +106,15 @@ public abstract class NMStateStoreService extends AbstractService { RecoveredContainerType.RECOVER; private long startTime; private ResourceMappings resMappings = new ResourceMappings(); + private final ContainerId containerId; + + RecoveredContainerState(ContainerId containerId){ + this.containerId = containerId; + } + + public ContainerId getContainerId() { + return containerId; + } public RecoveredContainerStatus getStatus() { return status; @@ -248,30 +257,33 @@ public abstract class NMStateStoreService extends AbstractService { public static class RecoveredLocalizationState { LocalResourceTrackerState publicTrackerState = new LocalResourceTrackerState(); - Map<String, RecoveredUserResources> userResources = - new HashMap<String, RecoveredUserResources>(); + RecoveryIterator<Entry<String, RecoveredUserResources>> it = null; public LocalResourceTrackerState getPublicTrackerState() { return publicTrackerState; } - public Map<String, RecoveredUserResources> getUserResources() { - return userResources; + public RecoveryIterator<Entry<String, RecoveredUserResources>> getIterator() { + return it; } } public static class RecoveredDeletionServiceState { - List<DeletionServiceDeleteTaskProto> tasks; + RecoveryIterator<DeletionServiceDeleteTaskProto> it = null; - public List<DeletionServiceDeleteTaskProto> getTasks() { - return tasks; + public RecoveryIterator<DeletionServiceDeleteTaskProto> getIterator(){ + return it; } } public static class RecoveredNMTokensState { MasterKey currentMasterKey; MasterKey previousMasterKey; - Map<ApplicationAttemptId, MasterKey> applicationMasterKeys; + RecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> it = null; + + public RecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> getIterator() { + return it; + } public MasterKey getCurrentMasterKey() { return currentMasterKey; @@ -281,15 +293,16 @@ public abstract class NMStateStoreService extends AbstractService { return previousMasterKey; } - public Map<ApplicationAttemptId, MasterKey> getApplicationMasterKeys() { - return applicationMasterKeys; - } } public static class RecoveredContainerTokensState { MasterKey currentMasterKey; MasterKey previousMasterKey; - Map<ContainerId, Long> activeTokens; + RecoveryIterator<Entry<ContainerId, Long>> it = null; + + public RecoveryIterator<Entry<ContainerId, Long>> getIterator() { + return it; + } public MasterKey getCurrentMasterKey() { return currentMasterKey; @@ -299,9 +312,6 @@ public abstract class NMStateStoreService extends AbstractService { return previousMasterKey; } - public Map<ContainerId, Long> getActiveTokens() { - return activeTokens; - } } public static class RecoveredLogDeleterState { @@ -400,11 +410,10 @@ public abstract class NMStateStoreService extends AbstractService { /** - * Load the state of containers - * @return recovered state for containers - * @throws IOException + * get the Recovered Container State Iterator + * @return recovery iterator */ - public abstract List<RecoveredContainerState> loadContainersState() + public abstract RecoveryIterator<RecoveredContainerState> getContainerStateIterator() throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/44c4928b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java new file mode 100644 index 0000000..0bb262a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.recovery; + +import java.io.Closeable; +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * A wrapper for a Iterator to translate the raw RuntimeExceptions that + * can be thrown into IOException. + */ +public interface RecoveryIterator<T> extends Closeable { + + /** + * Returns true if the iteration has more elements. + */ + boolean hasNext() throws IOException; + + /** + * Returns the next element in the iteration. + */ + T next() throws IOException, NoSuchElementException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/44c4928b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java index 256f649..b3df69b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java @@ -24,6 +24,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,17 +92,20 @@ public class NMContainerTokenSecretManager extends super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; } - for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) { - ContainerId containerId = entry.getKey(); - Long expTime = entry.getValue(); - List<ContainerId> containerList = - recentlyStartedContainerTracker.get(expTime); - if (containerList == null) { - containerList = new ArrayList<ContainerId>(); - recentlyStartedContainerTracker.put(expTime, containerList); - } - if (!containerList.contains(containerId)) { - containerList.add(containerId); + try (RecoveryIterator<Entry<ContainerId, Long>> it = state.getIterator()) { + while (it.hasNext()) { + Entry<ContainerId, Long> entry = it.next(); + ContainerId containerId = entry.getKey(); + Long expTime = entry.getValue(); + List<ContainerId> containerList = + recentlyStartedContainerTracker.get(expTime); + if (containerList == null) { + containerList = new ArrayList<ContainerId>(); + recentlyStartedContainerTracker.put(expTime, containerList); + } + if (!containerList.contains(containerId)) { + containerList.add(containerId); + } } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/44c4928b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java index 0956e77..f895791 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,11 +89,14 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager { super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; } - for (Map.Entry<ApplicationAttemptId, MasterKey> entry : - state.getApplicationMasterKeys().entrySet()) { - key = entry.getValue(); - oldMasterKeys.put(entry.getKey(), - new MasterKeyData(key, createSecretKey(key.getBytes().array()))); + try (RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it = + state.getIterator()) { + while (it.hasNext()) { + Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next(); + key = entry.getValue(); + oldMasterKeys.put(entry.getKey(), + new MasterKeyData(key, createSecretKey(key.getBytes().array()))); + } } // reconstruct app to app attempts map http://git-wip-us.apache.org/repos/asf/hadoop/blob/44c4928b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index c5428d1..9658ecd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -23,6 +23,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -56,6 +57,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService { private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks; private RecoveredNMTokensState nmTokenState; private RecoveredContainerTokensState containerTokenState; + private Map<ApplicationAttemptId, MasterKey> applicationMasterKeys; + private Map<ContainerId, Long> activeTokens; private Map<ApplicationId, LogDeleterProto> logDeleterState; private RecoveredAMRMProxyState amrmProxyState; @@ -68,10 +71,9 @@ public class NMMemoryStateStoreService extends NMStateStoreService { apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>(); containerStates = new HashMap<ContainerId, RecoveredContainerState>(); nmTokenState = new RecoveredNMTokensState(); - nmTokenState.applicationMasterKeys = - new HashMap<ApplicationAttemptId, MasterKey>(); + applicationMasterKeys = new HashMap<ApplicationAttemptId, MasterKey>(); containerTokenState = new RecoveredContainerTokensState(); - containerTokenState.activeTokens = new HashMap<ContainerId, Long>(); + activeTokens = new HashMap<ContainerId, Long>(); trackerStates = new HashMap<TrackerKey, TrackerState>(); deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>(); logDeleterState = new HashMap<ApplicationId, LogDeleterProto>(); @@ -86,13 +88,39 @@ public class NMMemoryStateStoreService extends NMStateStoreService { protected void closeStorage() { } + // Recovery Iterator Implementation. + private class NMMemoryRecoveryIterator<T> implements RecoveryIterator<T> { + + private Iterator<T> it; + + NMMemoryRecoveryIterator(Iterator<T> it){ + this.it = it; + } + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public T next() throws IOException { + return it.next(); + } + + @Override + public void close() throws IOException { + + } + } @Override public synchronized RecoveredApplicationsState loadApplicationsState() throws IOException { RecoveredApplicationsState state = new RecoveredApplicationsState(); - state.applications = new ArrayList<ContainerManagerApplicationProto>( - apps.values()); + List<ContainerManagerApplicationProto> containerList = + new ArrayList<ContainerManagerApplicationProto>(apps.values()); + state.it = new NMMemoryRecoveryIterator<ContainerManagerApplicationProto>( + containerList.iterator()); return state; } @@ -111,13 +139,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public synchronized List<RecoveredContainerState> loadContainersState() + public RecoveryIterator<RecoveredContainerState> getContainerStateIterator() throws IOException { // return a copy so caller can't modify our state List<RecoveredContainerState> result = new ArrayList<RecoveredContainerState>(containerStates.size()); for (RecoveredContainerState rcs : containerStates.values()) { - RecoveredContainerState rcsCopy = new RecoveredContainerState(); + RecoveredContainerState rcsCopy = new RecoveredContainerState(rcs.getContainerId()); rcsCopy.status = rcs.status; rcsCopy.exitCode = rcs.exitCode; rcsCopy.killed = rcs.killed; @@ -131,13 +159,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService { rcsCopy.setResourceMappings(rcs.getResourceMappings()); result.add(rcsCopy); } - return result; + return new NMMemoryRecoveryIterator<RecoveredContainerState>( + result.iterator()); } @Override public synchronized void storeContainer(ContainerId containerId, int version, long startTime, StartContainerRequest startRequest) { - RecoveredContainerState rcs = new RecoveredContainerState(); + RecoveredContainerState rcs = new RecoveredContainerState(containerId); rcs.startRequest = startRequest; rcs.version = version; try { @@ -284,6 +313,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override public synchronized RecoveredLocalizationState loadLocalizationState() { RecoveredLocalizationState result = new RecoveredLocalizationState(); + Map<String, RecoveredUserResources> userResources = + new HashMap<String, RecoveredUserResources>(); for (Map.Entry<TrackerKey, TrackerState> e : trackerStates.entrySet()) { TrackerKey tk = e.getKey(); TrackerState ts = e.getValue(); @@ -294,10 +325,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService { if (tk.user == null) { result.publicTrackerState = loadTrackerState(ts); } else { - RecoveredUserResources rur = result.userResources.get(tk.user); + RecoveredUserResources rur = userResources.get(tk.user); if (rur == null) { rur = new RecoveredUserResources(); - result.userResources.put(tk.user, rur); + userResources.put(tk.user, rur); } if (tk.appId == null) { rur.privateTrackerState = loadTrackerState(ts); @@ -306,6 +337,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } } } + result.it = new NMMemoryRecoveryIterator<Map.Entry<String, RecoveredUserResources>>( + userResources.entrySet().iterator()); return result; } @@ -341,8 +374,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService { throws IOException { RecoveredDeletionServiceState result = new RecoveredDeletionServiceState(); - result.tasks = new ArrayList<DeletionServiceDeleteTaskProto>( - deleteTasks.values()); + List<DeletionServiceDeleteTaskProto> deleteTaskProtos = + new ArrayList<DeletionServiceDeleteTaskProto>(deleteTasks.values()); + result.it = new NMMemoryRecoveryIterator<DeletionServiceDeleteTaskProto>( + deleteTaskProtos.iterator()); return result; } @@ -365,9 +400,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService { RecoveredNMTokensState result = new RecoveredNMTokensState(); result.currentMasterKey = nmTokenState.currentMasterKey; result.previousMasterKey = nmTokenState.previousMasterKey; - result.applicationMasterKeys = - new HashMap<ApplicationAttemptId, MasterKey>( - nmTokenState.applicationMasterKeys); + Map<ApplicationAttemptId, MasterKey> masterKeysMap = + new HashMap<ApplicationAttemptId, MasterKey>(applicationMasterKeys); + result.it = new NMMemoryRecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>>( + masterKeysMap.entrySet().iterator()); return result; } @@ -389,14 +425,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService { public synchronized void storeNMTokenApplicationMasterKey( ApplicationAttemptId attempt, MasterKey key) throws IOException { MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; - nmTokenState.applicationMasterKeys.put(attempt, + applicationMasterKeys.put(attempt, new MasterKeyPBImpl(keypb.getProto())); } @Override public synchronized void removeNMTokenApplicationMasterKey( ApplicationAttemptId attempt) throws IOException { - nmTokenState.applicationMasterKeys.remove(attempt); + applicationMasterKeys.remove(attempt); } @@ -408,8 +444,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService { new RecoveredContainerTokensState(); result.currentMasterKey = containerTokenState.currentMasterKey; result.previousMasterKey = containerTokenState.previousMasterKey; - result.activeTokens = - new HashMap<ContainerId, Long>(containerTokenState.activeTokens); + Map<ContainerId, Long> containersTokenMap = + new HashMap<ContainerId, Long>(activeTokens); + result.it = new NMMemoryRecoveryIterator<Map.Entry<ContainerId, Long>>( + containersTokenMap.entrySet().iterator()); return result; } @@ -432,13 +470,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override public synchronized void storeContainerToken(ContainerId containerId, Long expirationTime) throws IOException { - containerTokenState.activeTokens.put(containerId, expirationTime); + activeTokens.put(containerId, expirationTime); } @Override public synchronized void removeContainerToken(ContainerId containerId) throws IOException { - containerTokenState.activeTokens.remove(containerId); + activeTokens.remove(containerId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/44c4928b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 8a8cfa2..fcbbc52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -125,6 +125,73 @@ public class TestNMLeveldbStateStoreService { FileUtil.fullyDelete(TMP_DIR); } + private List<RecoveredContainerState> loadContainersState( + RecoveryIterator<RecoveredContainerState> it) throws IOException { + List<RecoveredContainerState> containers = + new ArrayList<RecoveredContainerState>(); + while (it.hasNext()) { + RecoveredContainerState rcs = it.next(); + containers.add(rcs); + } + return containers; + } + + private List<ContainerManagerApplicationProto> loadApplicationProtos( + RecoveryIterator<ContainerManagerApplicationProto> it) + throws IOException { + List<ContainerManagerApplicationProto> applicationProtos = + new ArrayList<ContainerManagerApplicationProto>(); + while (it.hasNext()) { + applicationProtos.add(it.next()); + } + return applicationProtos; + } + + private List<DeletionServiceDeleteTaskProto> loadDeletionTaskProtos( + RecoveryIterator<DeletionServiceDeleteTaskProto> it) throws IOException { + List<DeletionServiceDeleteTaskProto> deleteTaskProtos = + new ArrayList<DeletionServiceDeleteTaskProto>(); + while (it.hasNext()) { + deleteTaskProtos.add(it.next()); + } + return deleteTaskProtos; + } + + private Map<String, RecoveredUserResources> loadUserResources( + RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it) + throws IOException { + Map<String, RecoveredUserResources> userResources = + new HashMap<String, RecoveredUserResources>(); + while (it.hasNext()) { + Map.Entry<String, RecoveredUserResources> entry = it.next(); + userResources.put(entry.getKey(), entry.getValue()); + } + return userResources; + } + + private Map<ApplicationAttemptId, MasterKey> loadNMTokens( + RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it) + throws IOException { + Map<ApplicationAttemptId, MasterKey> nmTokens = + new HashMap<ApplicationAttemptId, MasterKey>(); + while (it.hasNext()) { + Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next(); + nmTokens.put(entry.getKey(), entry.getValue()); + } + return nmTokens; + } + + private Map<ContainerId, Long> loadContainerTokens( + RecoveryIterator<Map.Entry<ContainerId, Long>> it) throws IOException { + Map<ContainerId, Long> containerTokens = + new HashMap<ContainerId, Long>(); + while (it.hasNext()) { + Map.Entry<ContainerId, Long> entry = it.next(); + containerTokens.put(entry.getKey(), entry.getValue()); + } + return containerTokens; + } + private void restartStateStore() throws IOException { // need to close so leveldb releases database lock if (stateStore != null) { @@ -142,7 +209,7 @@ public class TestNMLeveldbStateStoreService { assertNotNull(pubts); assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); - assertTrue(state.getUserResources().isEmpty()); + assertTrue(loadUserResources(state.getIterator()).isEmpty()); } @Test @@ -183,7 +250,7 @@ public class TestNMLeveldbStateStoreService { restartStateStore(); Assert.fail("Incompatible version, should expect fail here."); } catch (ServiceStateException e) { - Assert.assertTrue("Exception message mismatch", + Assert.assertTrue("Exception message mismatch", e.getMessage().contains("Incompatible version for NM state:")); } } @@ -192,7 +259,9 @@ public class TestNMLeveldbStateStoreService { public void testApplicationStorage() throws IOException { // test empty when no state RecoveredApplicationsState state = stateStore.loadApplicationsState(); - assertTrue(state.getApplications().isEmpty()); + List<ContainerManagerApplicationProto> apps = + loadApplicationProtos(state.getIterator()); + assertTrue(apps.isEmpty()); // store an application and verify recovered final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); @@ -204,8 +273,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeApplication(appId1, appProto1); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(1, state.getApplications().size()); - assertEquals(appProto1, state.getApplications().get(0)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(1, apps.size()); + assertEquals(appProto1, apps.get(0)); // add a new app final ApplicationId appId2 = ApplicationId.newInstance(1234, 2); @@ -216,23 +286,25 @@ public class TestNMLeveldbStateStoreService { stateStore.storeApplication(appId2, appProto2); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(2, state.getApplications().size()); - assertTrue(state.getApplications().contains(appProto1)); - assertTrue(state.getApplications().contains(appProto2)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(2, apps.size()); + assertTrue(apps.contains(appProto1)); + assertTrue(apps.contains(appProto2)); // test removing an application stateStore.removeApplication(appId2); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(1, state.getApplications().size()); - assertEquals(appProto1, state.getApplications().get(0)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(1, apps.size()); + assertEquals(appProto1, apps.get(0)); } @Test public void testContainerStorage() throws IOException { // test empty when no state List<RecoveredContainerState> recoveredContainers = - stateStore.loadContainersState(); + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); // create a container request @@ -254,7 +326,8 @@ public class TestNMLeveldbStateStoreService { stateStore.getContainerVersionKey(containerId.toString())))); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); @@ -269,14 +342,16 @@ public class TestNMLeveldbStateStoreService { // store a new container record without StartContainerRequest ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); stateStore.storeContainerLaunched(containerId1); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); // check whether the new container record is discarded assertEquals(1, recoveredContainers.size()); // queue the container, and verify recovered stateStore.storeContainerQueued(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); @@ -292,7 +367,8 @@ public class TestNMLeveldbStateStoreService { diags.append("some diags for container"); stateStore.storeContainerDiagnostics(containerId, diags); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); @@ -305,7 +381,8 @@ public class TestNMLeveldbStateStoreService { // pause the container, and verify recovered stateStore.storeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus()); @@ -316,7 +393,8 @@ public class TestNMLeveldbStateStoreService { // Resume the container stateStore.removeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); // increase the container size, and verify recovered @@ -328,7 +406,8 @@ public class TestNMLeveldbStateStoreService { stateStore .storeContainerUpdateToken(containerId, updateTokenIdentifier); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); @@ -342,7 +421,8 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerKilled(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); @@ -358,7 +438,8 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerCompleted(containerId, 21); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); @@ -371,7 +452,8 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerWorkDir(containerId, "/test/workdir"); stateStore.storeContainerLogDir(containerId, "/test/logdir"); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(6, rcs.getRemainingRetryAttempts()); @@ -382,12 +464,13 @@ public class TestNMLeveldbStateStoreService { // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); // recover again to check remove clears all containers restartStateStore(); NMStateStoreService nmStoreSpy = spy(stateStore); - nmStoreSpy.loadContainersState(); + loadContainersState(nmStoreSpy.getContainerStateIterator()); verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class)); } @@ -399,7 +482,8 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerRestartTimes(containerId, finishTimeForRetryAttempts); restartStateStore(); - RecoveredContainerState rcs = stateStore.loadContainersState().get(0); + RecoveredContainerState rcs = + loadContainersState(stateStore.getContainerStateIterator()).get(0); List<Long> recoveredRestartTimes = rcs.getRestartTimes(); assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0)); assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1)); @@ -481,7 +565,7 @@ public class TestNMLeveldbStateStoreService { assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); Map<String, RecoveredUserResources> userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); @@ -535,7 +619,7 @@ public class TestNMLeveldbStateStoreService { pubts.getInProgressResources().get(pubRsrcProto1)); assertEquals(pubRsrcLocalPath2, pubts.getInProgressResources().get(pubRsrcProto2)); - userResources = state.getUserResources(); + userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); @@ -584,7 +668,7 @@ public class TestNMLeveldbStateStoreService { assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); Map<String, RecoveredUserResources> userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); @@ -654,7 +738,7 @@ public class TestNMLeveldbStateStoreService { assertEquals(1, pubts.getInProgressResources().size()); assertEquals(pubRsrcLocalPath2, pubts.getInProgressResources().get(pubRsrcProto2)); - userResources = state.getUserResources(); + userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); @@ -762,7 +846,7 @@ public class TestNMLeveldbStateStoreService { assertEquals(pubLocalizedProto1, pubts.getLocalizedResources().iterator().next()); Map<String, RecoveredUserResources> userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertTrue(userResources.isEmpty()); } @@ -771,7 +855,9 @@ public class TestNMLeveldbStateStoreService { // test empty when no state RecoveredDeletionServiceState state = stateStore.loadDeletionServiceState(); - assertTrue(state.getTasks().isEmpty()); + List<DeletionServiceDeleteTaskProto> deleteTaskProtos = + loadDeletionTaskProtos(state.getIterator()); + assertTrue(deleteTaskProtos.isEmpty()); // store a deletion task and verify recovered DeletionServiceDeleteTaskProto proto = @@ -788,8 +874,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeDeletionTask(proto.getId(), proto); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertEquals(1, state.getTasks().size()); - assertEquals(proto, state.getTasks().get(0)); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(1, deleteTaskProtos.size()); + assertEquals(proto, deleteTaskProtos.get(0)); // store another deletion task DeletionServiceDeleteTaskProto proto2 = @@ -802,31 +889,36 @@ public class TestNMLeveldbStateStoreService { stateStore.storeDeletionTask(proto2.getId(), proto2); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertEquals(2, state.getTasks().size()); - assertTrue(state.getTasks().contains(proto)); - assertTrue(state.getTasks().contains(proto2)); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(2, deleteTaskProtos.size()); + assertTrue(deleteTaskProtos.contains(proto)); + assertTrue(deleteTaskProtos.contains(proto2)); + // delete a task and verify gone after recovery stateStore.removeDeletionTask(proto2.getId()); restartStateStore(); - state = stateStore.loadDeletionServiceState(); - assertEquals(1, state.getTasks().size()); - assertEquals(proto, state.getTasks().get(0)); + state = stateStore.loadDeletionServiceState(); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(1, deleteTaskProtos.size()); + assertEquals(proto, deleteTaskProtos.get(0)); // delete the last task and verify none left stateStore.removeDeletionTask(proto.getId()); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertTrue(state.getTasks().isEmpty()); - } + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertTrue(deleteTaskProtos.isEmpty()); } @Test public void testNMTokenStorage() throws IOException { // test empty when no state RecoveredNMTokensState state = stateStore.loadNMTokensState(); + Map<ApplicationAttemptId, MasterKey> loadedAppKeys = + loadNMTokens(state.getIterator()); assertNull(state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a master key and verify recovered NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest(); @@ -834,18 +926,20 @@ public class TestNMLeveldbStateStoreService { stateStore.storeNMTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a previous key and verify recovered MasterKey prevKey = secretMgr.generateKey(); stateStore.storeNMTokenPreviousMasterKey(prevKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a few application keys and verify recovered ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance( @@ -858,10 +952,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - Map<ApplicationAttemptId, MasterKey> loadedAppKeys = - state.getApplicationMasterKeys(); assertEquals(2, loadedAppKeys.size()); assertEquals(attemptKey1, loadedAppKeys.get(attempt1)); assertEquals(attemptKey2, loadedAppKeys.get(attempt2)); @@ -880,9 +973,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeNMTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - loadedAppKeys = state.getApplicationMasterKeys(); assertEquals(2, loadedAppKeys.size()); assertNull(loadedAppKeys.get(attempt1)); assertEquals(attemptKey2, loadedAppKeys.get(attempt2)); @@ -894,9 +987,10 @@ public class TestNMLeveldbStateStoreService { // test empty when no state RecoveredContainerTokensState state = stateStore.loadContainerTokensState(); + Map<ContainerId, Long> loadedActiveTokens = loadContainerTokens(state.it); assertNull(state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a master key and verify recovered ContainerTokenKeyGeneratorForTest keygen = @@ -905,18 +999,20 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a previous key and verify recovered MasterKey prevKey = keygen.generateKey(); stateStore.storeContainerTokenPreviousMasterKey(prevKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a few container tokens and verify recovered ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1); @@ -927,10 +1023,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerToken(cid2, expTime2); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - Map<ContainerId, Long> loadedActiveTokens = - state.getActiveTokens(); assertEquals(2, loadedActiveTokens.size()); assertEquals(expTime1, loadedActiveTokens.get(cid1)); assertEquals(expTime2, loadedActiveTokens.get(cid2)); @@ -948,9 +1043,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - loadedActiveTokens = state.getActiveTokens(); assertEquals(2, loadedActiveTokens.size()); assertNull(loadedActiveTokens.get(cid1)); assertEquals(expTime2, loadedActiveTokens.get(cid2)); @@ -1029,8 +1124,8 @@ public class TestNMLeveldbStateStoreService { @Test public void testUnexpectedKeyDoesntThrowException() throws IOException { // test empty when no state - List<RecoveredContainerState> recoveredContainers = stateStore - .loadContainersState(); + List<RecoveredContainerState> recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); @@ -1045,7 +1140,8 @@ public class TestNMLeveldbStateStoreService { + containerId.toString() + "/invalidKey1234").getBytes(); stateStore.getDB().put(invalidKey, new byte[1]); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); @@ -1162,8 +1258,8 @@ public class TestNMLeveldbStateStoreService { @Test public void testStateStoreForResourceMapping() throws IOException { // test empty when no state - List<RecoveredContainerState> recoveredContainers = stateStore - .loadContainersState(); + List<RecoveredContainerState> recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); @@ -1190,7 +1286,8 @@ public class TestNMLeveldbStateStoreService { // add a invalid key restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); List<Serializable> res = rcs.getResourceMappings() @@ -1253,7 +1350,8 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerRestartTimes(containerId, restartTimes); restartStateStore(); - RecoveredContainerState rcs = stateStore.loadContainersState().get(0); + RecoveredContainerState rcs = + loadContainersState(stateStore.getContainerStateIterator()).get(0); List<Long> recoveredRestartTimes = rcs.getRestartTimes(); assertTrue(recoveredRestartTimes.isEmpty()); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
