Repository: hadoop
Updated Branches:
  refs/heads/trunk 0101267d9 -> a33ce45e3


YARN-5547. NMLeveldbStateStore should be more tolerant of unknown keys. 
Contributed by Ajith S


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a33ce45e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a33ce45e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a33ce45e

Branch: refs/heads/trunk
Commit: a33ce45e35ce77dbf297df618aec3106eafda68c
Parents: 0101267
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Jan 24 16:17:36 2017 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Jan 24 16:17:36 2017 +0000

----------------------------------------------------------------------
 .../containermanager/ContainerManagerImpl.java  |  7 ++
 .../recovery/NMLeveldbStateStoreService.java    | 19 +++++-
 .../recovery/NMStateStoreService.java           | 17 +++++
 .../TestNMLeveldbStateStoreService.java         | 69 ++++++++++++++++++++
 4 files changed, 111 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a33ce45e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index cb39862..9f1655f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -144,6 +144,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import 
org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -403,6 +404,12 @@ public class ContainerManagerImpl extends CompositeService 
implements
 
     if (context.getApplications().containsKey(appId)) {
       recoverActiveContainer(launchContext, token, rcs);
+      if (rcs.getRecoveryType() == RecoveredContainerType.KILL) {
+        dispatcher.getEventHandler().handle(
+            new ContainerKillEvent(containerId, ContainerExitStatus.ABORTED,
+                "Due to invalid StateStore info container was killed"
+                    + " during recovery"));
+      }
     } else {
       if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
         LOG.warn(containerId + " has no corresponding application!");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a33ce45e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index dc07b56..ab23456 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -70,6 +70,8 @@ import org.iq80.leveldb.Options;
 import org.iq80.leveldb.WriteBatch;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 
 public class NMLeveldbStateStoreService extends NMStateStoreService {
 
@@ -139,6 +141,12 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
   private boolean isNewlyCreated;
   private Timer compactionTimer;
 
+  /**
+   * Map of containerID vs List of unknown key suffixes.
+   */
+  private ListMultimap<ContainerId, String> containerUnknownKeySuffixes =
+      ArrayListMultimap.create();
+
   public NMLeveldbStateStoreService() {
     super(NMLeveldbStateStoreService.class.getName());
   }
@@ -268,7 +276,11 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
       } else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
         rcs.setLogDir(asString(entry.getValue()));
       } else {
-        throw new IOException("Unexpected container state key: " + key);
+        LOG.warn("the container " + containerId
+            + " will be killed because of the unknown key " + key
+            + " during recovery.");
+        containerUnknownKeySuffixes.put(containerId, suffix);
+        rcs.setRecoveryType(RecoveredContainerType.KILL);
       }
     }
     return rcs;
@@ -470,6 +482,11 @@ public class NMLeveldbStateStoreService extends 
NMStateStoreService {
         batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
+        List<String> unknownKeysForContainer = containerUnknownKeySuffixes
+            .removeAll(containerId);
+        for (String unknownKeySuffix : unknownKeysForContainer) {
+          batch.delete(bytes(keyPrefix + unknownKeySuffix));
+        }
         db.write(batch);
       } finally {
         batch.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a33ce45e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 9f9ee75..9dd1eb0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -60,6 +60,13 @@ public abstract class NMStateStoreService extends 
AbstractService {
 
   }
 
+  /**
+   * Type of post recovery action.
+   */
+  public enum RecoveredContainerType {
+    KILL, RECOVER
+  }
+
   public enum RecoveredContainerStatus {
     REQUESTED,
     QUEUED,
@@ -78,6 +85,8 @@ public abstract class NMStateStoreService extends 
AbstractService {
     private String workDir;
     private String logDir;
     int version;
+    private RecoveredContainerType recoveryType =
+        RecoveredContainerType.RECOVER;
 
     public RecoveredContainerStatus getStatus() {
       return status;
@@ -145,6 +154,14 @@ public abstract class NMStateStoreService extends 
AbstractService {
           .append(", LogDir: ").append(logDir)
           .toString();
     }
+
+    public RecoveredContainerType getRecoveryType() {
+      return recoveryType;
+    }
+
+    public void setRecoveryType(RecoveredContainerType recoveryType) {
+      this.recoveryType = recoveryType;
+    }
   }
 
   public static class LocalResourceTrackerState {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a33ce45e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index e93bbc9..6909474 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -70,6 +70,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
@@ -947,6 +948,74 @@ public class TestNMLeveldbStateStoreService {
     store.close();
   }
 
+  @Test
+  public void testUnexpectedKeyDoesntThrowException() throws IOException {
+    // test empty when no state
+    List<RecoveredContainerState> recoveredContainers = stateStore
+        .loadContainersState();
+    assertTrue(recoveredContainers.isEmpty());
+
+    // create a container request
+    ApplicationId appId = ApplicationId.newInstance(1234, 3);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        4);
+    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
+    LocalResource lrsrc = LocalResource.newInstance(
+        URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
+        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
+        1234567890L);
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put("rsrc", lrsrc);
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("somevar", "someval");
+    List<String> containerCmds = new ArrayList<String>();
+    containerCmds.add("somecmd");
+    containerCmds.add("somearg");
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+    serviceData.put("someservice",
+        ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+    ByteBuffer containerTokens = ByteBuffer
+        .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>();
+    acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
+    acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, env, containerCmds,
+        serviceData, containerTokens, acls);
+    Resource containerRsrc = Resource.newInstance(1357, 3);
+    ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
+        containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
+        Priority.newInstance(7), 13579);
+    Token containerToken = Token.newInstance(containerTokenId.getBytes(),
+        ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
+        "tokenservice");
+    StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
+        containerToken);
+
+    stateStore.storeContainer(containerId, 0, containerReq);
+
+    // add a invalid key
+    byte[] invalidKey = ("ContainerManager/containers/"
+    + containerId.toString() + "/invalidKey1234").getBytes();
+    stateStore.getDB().put(invalidKey, new byte[1]);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    RecoveredContainerState rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertTrue(rcs.getDiagnostics().isEmpty());
+    assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
+    // assert unknown keys are cleaned up finally
+    assertNotNull(stateStore.getDB().get(invalidKey));
+    stateStore.removeContainer(containerId);
+    assertNull(stateStore.getDB().get(invalidKey));
+  }
+
   private static class NMTokenSecretManagerForTest extends
       BaseNMTokenSecretManager {
     public MasterKey generateKey() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to