http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 112095e..4bcdf5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -70,7 +70,7 @@ public class NMNullStateStoreService extends NMStateStoreService { } @Override - public void storeContainer(ContainerId containerId, + public void storeContainer(ContainerId containerId, int version, StartContainerRequest startRequest) throws IOException { } @@ -90,7 +90,7 @@ public class NMNullStateStoreService extends NMStateStoreService { @Override public void storeContainerResourceChanged(ContainerId containerId, - Resource capability) throws IOException { + int version, Resource capability) throws IOException { } @Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 57f35a4..9f9ee75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -77,6 +77,7 @@ public abstract class NMStateStoreService extends AbstractService { private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID; private String workDir; private String logDir; + int version; public RecoveredContainerStatus getStatus() { return status; @@ -94,6 +95,10 @@ public abstract class NMStateStoreService extends AbstractService { return diagnostics; } + public int getVersion() { + return version; + } + public StartContainerRequest getStartRequest() { return startRequest; } @@ -130,6 +135,7 @@ public abstract class NMStateStoreService extends AbstractService { public String toString() { return new StringBuffer("Status: ").append(getStatus()) .append(", Exit code: ").append(exitCode) + .append(", Version: ").append(version) .append(", Killed: ").append(getKilled()) .append(", Diagnostics: ").append(getDiagnostics()) .append(", Capability: ").append(getCapability()) @@ -306,11 +312,13 @@ public abstract class NMStateStoreService extends AbstractService { /** * Record a container start request * @param containerId the container ID + * @param containerVersion the container Version * @param startRequest the container start request * @throws IOException */ public abstract void storeContainer(ContainerId containerId, - StartContainerRequest startRequest) throws IOException; + int containerVersion, StartContainerRequest startRequest) + throws IOException; /** * Record that a container has been queued at the NM @@ -331,11 +339,12 @@ public abstract class NMStateStoreService extends AbstractService { /** * Record that a container resource has been changed * @param containerId the container ID + * @param containerVersion the container version * @param capability the container resource capability * @throws IOException */ public abstract void storeContainerResourceChanged(ContainerId containerId, - Resource capability) throws IOException; + int containerVersion, Resource capability) throws IOException; /** * Record that a container has completed http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index ee2677c..f6593f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -875,7 +875,7 @@ public class TestNodeManagerResync { ApplicationAttemptId.newInstance(applicationId, 1); ContainerId containerId = ContainerId.newContainerId(applicationAttemptId, id); NMContainerStatus containerReport = - NMContainerStatus.newInstance(containerId, containerState, + NMContainerStatus.newInstance(containerId, 0, containerState, Resource.newInstance(1024, 1), "recover container", 0, Priority.newInstance(10), 0); return containerReport; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index c71b1e6..977cb76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -251,7 +251,7 @@ public class TestNodeStatusUpdater { String user = "testUser"; ContainerTokenIdentifier containerToken = BuilderUtils .newContainerTokenIdentifier(BuilderUtils.newContainerToken( - firstContainerID, InetAddress.getByName("localhost") + firstContainerID, 0, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); Context context = mock(Context.class); @@ -292,7 +292,7 @@ public class TestNodeStatusUpdater { Resource resource = BuilderUtils.newResource(3, 1); ContainerTokenIdentifier containerToken = BuilderUtils .newContainerTokenIdentifier(BuilderUtils.newContainerToken( - secondContainerID, InetAddress.getByName("localhost") + secondContainerID, 0, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); Context context = mock(Context.class); @@ -1013,7 +1013,7 @@ public class TestNodeStatusUpdater { ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); Token containerToken = - BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", + BuilderUtils.newContainerToken(cId, 0, "anyHost", 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); Container anyCompletedContainer = new ContainerImpl(conf, null, @@ -1035,7 +1035,7 @@ public class TestNodeStatusUpdater { ContainerId runningContainerId = ContainerId.newContainerId(appAttemptId, 3); Token runningContainerToken = - BuilderUtils.newContainerToken(runningContainerId, "anyHost", + BuilderUtils.newContainerToken(runningContainerId, 0, "anyHost", 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); Container runningContainer = @@ -1103,7 +1103,7 @@ public class TestNodeStatusUpdater { ContainerId runningContainerId = ContainerId.newContainerId(appAttemptId, 1); Token runningContainerToken = - BuilderUtils.newContainerToken(runningContainerId, "anyHost", + BuilderUtils.newContainerToken(runningContainerId, 0, "anyHost", 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); Container runningContainer = @@ -1131,14 +1131,16 @@ public class TestNodeStatusUpdater { appAttemptId, 2); ContainerTokenIdentifier killedQueuedContainerTokenId1 = BuilderUtils .newContainerTokenIdentifier(BuilderUtils.newContainerToken( - killedQueuedContainerId1, "anyHost", 1234, "anyUser", BuilderUtils - .newResource(1024, 1), 0, 123, "password".getBytes(), 0)); + killedQueuedContainerId1, 0, "anyHost", 1234, "anyUser", + BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0)); ContainerId killedQueuedContainerId2 = ContainerId.newContainerId( appAttemptId, 3); ContainerTokenIdentifier killedQueuedContainerTokenId2 = BuilderUtils .newContainerTokenIdentifier(BuilderUtils.newContainerToken( - killedQueuedContainerId2, "anyHost", 1234, "anyUser", BuilderUtils - .newResource(1024, 1), 0, 123, "password".getBytes(), 0)); + killedQueuedContainerId2, 0, "anyHost", 1234, "anyUser", + BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0)); nm.getNMContext().getQueuingContext().getKilledQueuedContainers().put( killedQueuedContainerTokenId1, "Queued container killed."); @@ -1214,7 +1216,7 @@ public class TestNodeStatusUpdater { ApplicationAttemptId.newInstance(appId, 0); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); Token containerToken = - BuilderUtils.newContainerToken(containerId, "host", 1234, "user", + BuilderUtils.newContainerToken(containerId, 0, "host", 1234, "user", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); @@ -1253,7 +1255,7 @@ public class TestNodeStatusUpdater { ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); Token containerToken = - BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", + BuilderUtils.newContainerToken(cId, 0, "anyHost", 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, "password".getBytes(), 0); Container anyCompletedContainer = new ContainerImpl(conf, null, http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index 0652e96..2ccf827 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -106,6 +106,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -297,8 +298,7 @@ public class MockResourceManagerFacade implements new ArrayList<ContainerStatus>(), containerList, new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null, new ArrayList<NMToken>(), - new ArrayList<Container>(), - new ArrayList<Container>()); + new ArrayList<UpdatedContainer>()); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 726b353..ec38501 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -403,7 +403,7 @@ public abstract class BaseContainerManagerTest { LogAggregationContext logAggregationContext, ExecutionType executionType) throws IOException { ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource, + new ContainerTokenIdentifier(cId, 0, nodeId.toString(), user, resource, System.currentTimeMillis() + 100000L, 123, rmIdentifier, Priority.newInstance(0), 0, logAggregationContext, null, ContainerType.TASK, executionType); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 3c5edc0..15c0e84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -125,9 +125,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override public synchronized void storeContainer(ContainerId containerId, - StartContainerRequest startRequest) throws IOException { + int version, StartContainerRequest startRequest) throws IOException { RecoveredContainerState rcs = new RecoveredContainerState(); rcs.startRequest = startRequest; + rcs.version = version; containerStates.put(containerId, rcs); } @@ -156,9 +157,11 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override public synchronized void storeContainerResourceChanged( - ContainerId containerId, Resource capability) throws IOException { + ContainerId containerId, int version, Resource capability) + throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); rcs.capability = capability; + rcs.version = version; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index d254e4b..1b21628 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -262,11 +262,12 @@ public class TestNMLeveldbStateStoreService { StartContainerRequest.newInstance(clc, containerToken); // store a container and verify recovered - stateStore.storeContainer(containerId, containerReq); + stateStore.storeContainer(containerId, 1, containerReq); restartStateStore(); recoveredContainers = stateStore.loadContainersState(); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(1, rcs.getVersion()); assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); @@ -308,11 +309,13 @@ public class TestNMLeveldbStateStoreService { assertEquals(diags.toString(), rcs.getDiagnostics()); // increase the container size, and verify recovered - stateStore.storeContainerResourceChanged(containerId, Resource.newInstance(2468, 4)); + stateStore.storeContainerResourceChanged(containerId, 2, + Resource.newInstance(2468, 4)); restartStateStore(); recoveredContainers = stateStore.loadContainersState(); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); + assertEquals(2, rcs.getVersion()); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 7513fdf..8332b2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -67,7 +67,7 @@ public class MockContainer implements Container { long currentTime = System.currentTimeMillis(); this.containerTokenIdentifier = BuilderUtils.newContainerTokenIdentifier(BuilderUtils - .newContainerToken(id, "127.0.0.1", 1234, user, + .newContainerToken(id, 0, "127.0.0.1", 1234, user, BuilderUtils.newResource(1024, 1), currentTime + 10000, 123, "password".getBytes(), currentTime)); this.state = ContainerState.NEW; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index 41037f7..be1dae1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -212,9 +212,9 @@ public class TestNMWebServer { recordFactory.newRecordInstance(ContainerLaunchContext.class); long currentTime = System.currentTimeMillis(); Token containerToken = - BuilderUtils.newContainerToken(containerId, "127.0.0.1", 1234, user, - BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123, - "password".getBytes(), currentTime); + BuilderUtils.newContainerToken(containerId, 0, "127.0.0.1", 1234, + user, BuilderUtils.newResource(1024, 1), currentTime + 10000L, + 123, "password".getBytes(), currentTime); Context context = mock(Context.class); Container container = new ContainerImpl(conf, dispatcher, launchContext, http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index f575961..4d73ba2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -66,6 +67,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; @@ -87,6 +91,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; + import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; @@ -489,15 +494,6 @@ public class ApplicationMasterService extends AbstractService implements throw e; } - try { - RMServerUtils.increaseDecreaseRequestSanityCheck(rmContext, - request.getIncreaseRequests(), request.getDecreaseRequests(), - maximumCapacity); - } catch (InvalidResourceRequestException e) { - LOG.warn(e); - throw e; - } - // In the case of work-preserving AM restart, it's possible for the // AM to release containers from the earlier attempt. if (!app.getApplicationSubmissionContext() @@ -505,11 +501,22 @@ public class ApplicationMasterService extends AbstractService implements try { RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); } catch (InvalidContainerReleaseException e) { - LOG.warn("Invalid container release by application " + appAttemptId, e); + LOG.warn("Invalid container release by application " + appAttemptId, + e); throw e; } } + // Split Update Resource Requests into increase and decrease. + // No Exceptions are thrown here. All update errors are aggregated + // and returned to the AM. + List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>(); + List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>(); + List<UpdateContainerError> updateContainerErrors = + RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext, + request, maximumCapacity, increaseResourceReqs, + decreaseResourceReqs); + // Send new requests to appAttempt. Allocation allocation; RMAppAttemptState state = @@ -524,7 +531,7 @@ public class ApplicationMasterService extends AbstractService implements allocation = this.rScheduler.allocate(appAttemptId, ask, release, blacklistAdditions, blacklistRemovals, - request.getIncreaseRequests(), request.getDecreaseRequests()); + increaseResourceReqs, decreaseResourceReqs); } if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { @@ -539,6 +546,10 @@ public class ApplicationMasterService extends AbstractService implements allocateResponse.setNMTokens(allocation.getNMTokens()); } + // Notify the AM of container update errors + if (!updateContainerErrors.isEmpty()) { + allocateResponse.setUpdateErrors(updateContainerErrors); + } // update the response with the deltas of node status changes List<RMNode> updatedNodes = new ArrayList<RMNode>(); if(app.pullRMNodeUpdates(updatedNodes) > 0) { @@ -572,8 +583,23 @@ public class ApplicationMasterService extends AbstractService implements allocateResponse.setAvailableResources(allocation.getResourceLimit()); // Handling increased/decreased containers - allocateResponse.setIncreasedContainers(allocation.getIncreasedContainers()); - allocateResponse.setDecreasedContainers(allocation.getDecreasedContainers()); + List<UpdatedContainer> updatedContainers = new ArrayList<>(); + if (allocation.getIncreasedContainers() != null) { + for (Container c : allocation.getIncreasedContainers()) { + updatedContainers.add( + UpdatedContainer.newInstance( + ContainerUpdateType.INCREASE_RESOURCE, c)); + } + } + if (allocation.getDecreasedContainers() != null) { + for (Container c : allocation.getDecreasedContainers()) { + updatedContainers.add( + UpdatedContainer.newInstance( + ContainerUpdateType.DECREASE_RESOURCE, c)); + } + } + + allocateResponse.setUpdatedContainers(updatedContainers); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); @@ -623,7 +649,7 @@ public class ApplicationMasterService extends AbstractService implements return allocateResponse; } } - + private PreemptionMessage generatePreemptionMessage(Allocation allocation){ PreemptionMessage pMsg = null; // assemble strict preemption request http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 5e9827a..7fcabab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,28 +33,35 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; -import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; +import org.apache.hadoop.yarn.exceptions + .InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt + .RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler .SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -68,6 +75,18 @@ import org.apache.hadoop.yarn.util.resource.Resources; */ public class RMServerUtils { + private static final String UPDATE_OUTSTANDING_ERROR = + "UPDATE_OUTSTANDING_ERROR"; + private static final String INCORRECT_CONTAINER_VERSION_ERROR = + "INCORRECT_CONTAINER_VERSION_ERROR"; + private static final String INVALID_CONTAINER_ID = + "INVALID_CONTAINER_ID"; + private static final String RESOURCE_OUTSIDE_ALLOWED_RANGE = + "RESOURCE_OUTSIDE_ALLOWED_RANGE"; + + protected static final RecordFactory RECORD_FACTORY = + RecordFactoryProvider.getRecordFactory(null); + public static List<RMNode> queryRMNodes(RMContext context, EnumSet<NodeState> acceptedStates) { // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING. @@ -97,6 +116,78 @@ public class RMServerUtils { } /** + * Check if we have: + * - Request for same containerId and different target resource + * - If targetResources violates maximum/minimumAllocation + * @param rmContext RM context + * @param request Allocate Request + * @param maximumAllocation Maximum Allocation + * @param increaseResourceReqs Increase Resource Request + * @param decreaseResourceReqs Decrease Resource Request + * @return List of container Errors + */ + public static List<UpdateContainerError> + validateAndSplitUpdateResourceRequests(RMContext rmContext, + AllocateRequest request, Resource maximumAllocation, + List<UpdateContainerRequest> increaseResourceReqs, + List<UpdateContainerRequest> decreaseResourceReqs) { + List<UpdateContainerError> errors = new ArrayList<>(); + Set<ContainerId> outstandingUpdate = new HashSet<>(); + for (UpdateContainerRequest updateReq : request.getUpdateRequests()) { + RMContainer rmContainer = rmContext.getScheduler().getRMContainer( + updateReq.getContainerId()); + String msg = null; + if (rmContainer == null) { + msg = INVALID_CONTAINER_ID; + } + // Only allow updates if the requested version matches the current + // version + if (msg == null && updateReq.getContainerVersion() != + rmContainer.getContainer().getVersion()) { + msg = INCORRECT_CONTAINER_VERSION_ERROR + "|" + + updateReq.getContainerVersion() + "|" + + rmContainer.getContainer().getVersion(); + } + // No more than 1 container update per request. + if (msg == null && + outstandingUpdate.contains(updateReq.getContainerId())) { + msg = UPDATE_OUTSTANDING_ERROR; + } + if (msg == null) { + Resource original = rmContainer.getContainer().getResource(); + Resource target = updateReq.getCapability(); + if (Resources.fitsIn(target, original)) { + // This is a decrease request + if (validateIncreaseDecreaseRequest(rmContext, updateReq, + maximumAllocation, false)) { + decreaseResourceReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else { + msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + } + } else { + // This is an increase request + if (validateIncreaseDecreaseRequest(rmContext, updateReq, + maximumAllocation, true)) { + increaseResourceReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else { + msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + } + } + } + if (msg != null) { + UpdateContainerError updateError = RECORD_FACTORY + .newRecordInstance(UpdateContainerError.class); + updateError.setReason(msg); + updateError.setUpdateContainerRequest(updateReq); + errors.add(updateError); + } + } + return errors; + } + + /** * Utility method to validate a list resource requests, by insuring that the * requested memory/vcore is non-negative and not greater than max */ @@ -122,8 +213,6 @@ public class RMServerUtils { * the queue lock to make sure that the access to container resource is * atomic. Refer to LeafQueue.decreaseContainer() and * CapacityScheduelr.updateIncreaseRequests() - * - * * <pre> * - Throw exception when any other error happens * </pre> @@ -145,7 +234,7 @@ public class RMServerUtils { if (increase) { if (originalResource.getMemorySize() > targetResource.getMemorySize() || originalResource.getVirtualCores() > targetResource - .getVirtualCores()) { + .getVirtualCores()) { String msg = "Trying to increase a container, but target resource has some" + " resource < original resource, target=" + targetResource @@ -156,7 +245,7 @@ public class RMServerUtils { } else { if (originalResource.getMemorySize() < targetResource.getMemorySize() || originalResource.getVirtualCores() < targetResource - .getVirtualCores()) { + .getVirtualCores()) { String msg = "Trying to decrease a container, but target resource has " + "some resource > original resource, target=" + targetResource @@ -194,112 +283,46 @@ public class RMServerUtils { } } } - - /** - * Check if we have: - * - Request for same containerId and different target resource - * - If targetResources violates maximum/minimumAllocation - */ - public static void increaseDecreaseRequestSanityCheck(RMContext rmContext, - List<ContainerResourceChangeRequest> incRequests, - List<ContainerResourceChangeRequest> decRequests, - Resource maximumAllocation) throws InvalidResourceRequestException { - checkDuplicatedIncreaseDecreaseRequest(incRequests, decRequests); - validateIncreaseDecreaseRequest(rmContext, incRequests, maximumAllocation, - true); - validateIncreaseDecreaseRequest(rmContext, decRequests, maximumAllocation, - false); - } - - private static void checkDuplicatedIncreaseDecreaseRequest( - List<ContainerResourceChangeRequest> incRequests, - List<ContainerResourceChangeRequest> decRequests) - throws InvalidResourceRequestException { - String msg = "There're multiple increase or decrease container requests " - + "for same containerId="; - Set<ContainerId> existedContainerIds = new HashSet<ContainerId>(); - if (incRequests != null) { - for (ContainerResourceChangeRequest r : incRequests) { - if (!existedContainerIds.add(r.getContainerId())) { - throw new InvalidResourceRequestException(msg + r.getContainerId()); - } - } - } - - if (decRequests != null) { - for (ContainerResourceChangeRequest r : decRequests) { - if (!existedContainerIds.add(r.getContainerId())) { - throw new InvalidResourceRequestException(msg + r.getContainerId()); - } - } - } - } // Sanity check and normalize target resource - private static void validateIncreaseDecreaseRequest(RMContext rmContext, - List<ContainerResourceChangeRequest> requests, Resource maximumAllocation, - boolean increase) - throws InvalidResourceRequestException { - if (requests == null) { - return; + private static boolean validateIncreaseDecreaseRequest(RMContext rmContext, + UpdateContainerRequest request, Resource maximumAllocation, + boolean increase) { + if (request.getCapability().getMemorySize() < 0 + || request.getCapability().getMemorySize() > maximumAllocation + .getMemorySize()) { + return false; } - for (ContainerResourceChangeRequest request : requests) { - if (request.getCapability().getMemorySize() < 0 - || request.getCapability().getMemorySize() > maximumAllocation - .getMemorySize()) { - throw new InvalidResourceRequestException("Invalid " - + (increase ? "increase" : "decrease") + " request" - + ", requested memory < 0" - + ", or requested memory > max configured" + ", requestedMemory=" - + request.getCapability().getMemorySize() + ", maxMemory=" - + maximumAllocation.getMemorySize()); - } - if (request.getCapability().getVirtualCores() < 0 - || request.getCapability().getVirtualCores() > maximumAllocation - .getVirtualCores()) { - throw new InvalidResourceRequestException("Invalid " - + (increase ? "increase" : "decrease") + " request" - + ", requested virtual cores < 0" - + ", or requested virtual cores > max configured" - + ", requestedVirtualCores=" - + request.getCapability().getVirtualCores() + ", maxVirtualCores=" - + maximumAllocation.getVirtualCores()); - } - ContainerId containerId = request.getContainerId(); - ResourceScheduler scheduler = rmContext.getScheduler(); - RMContainer rmContainer = scheduler.getRMContainer(containerId); - if (null == rmContainer) { - String msg = - "Failed to get rmContainer for " - + (increase ? "increase" : "decrease") - + " request, with container-id=" + containerId; - throw new InvalidResourceRequestException(msg); - } - ResourceCalculator rc = scheduler.getResourceCalculator(); - Resource targetResource = Resources.normalize(rc, request.getCapability(), - scheduler.getMinimumResourceCapability(), - scheduler.getMaximumResourceCapability(), - scheduler.getMinimumResourceCapability()); - // Update normalized target resource - request.setCapability(targetResource); + if (request.getCapability().getVirtualCores() < 0 + || request.getCapability().getVirtualCores() > maximumAllocation + .getVirtualCores()) { + return false; } + ResourceScheduler scheduler = rmContext.getScheduler(); + ResourceCalculator rc = scheduler.getResourceCalculator(); + Resource targetResource = Resources.normalize(rc, request.getCapability(), + scheduler.getMinimumResourceCapability(), + scheduler.getMaximumResourceCapability(), + scheduler.getMinimumResourceCapability()); + // Update normalized target resource + request.setCapability(targetResource); + return true; } /** * It will validate to make sure all the containers belong to correct * application attempt id. If not then it will throw * {@link InvalidContainerReleaseException} - * - * @param containerReleaseList - * containers to be released as requested by application master. - * @param appAttemptId - * Application attempt Id + * + * @param containerReleaseList containers to be released as requested by + * application master. + * @param appAttemptId Application attempt Id * @throws InvalidContainerReleaseException */ public static void validateContainerReleaseRequest(List<ContainerId> containerReleaseList, - ApplicationAttemptId appAttemptId) - throws InvalidContainerReleaseException { + ApplicationAttemptId appAttemptId) + throws InvalidContainerReleaseException { for (ContainerId cId : containerReleaseList) { if (!appAttemptId.equals(cId.getApplicationAttemptId())) { throw new InvalidContainerReleaseException( @@ -321,10 +344,11 @@ public class RMServerUtils { /** * Utility method to verify if the current user has access based on the * passed {@link AccessControlList} + * * @param authorizer the {@link AccessControlList} to check against - * @param method the method name to be logged - * @param module like AdminService or NodeLabelManager - * @param LOG the logger to use + * @param method the method name to be logged + * @param module like AdminService or NodeLabelManager + * @param LOG the logger to use * @return {@link UserGroupInformation} of the current user * @throws IOException */ @@ -347,11 +371,11 @@ public class RMServerUtils { " to call '" + method + "'"); RMAuditLogger.logFailure(user.getShortUserName(), method, "", module, - RMAuditLogger.AuditConstants.UNAUTHORIZED_USER); + RMAuditLogger.AuditConstants.UNAUTHORIZED_USER); throw new AccessControlException("User " + user.getShortUserName() + - " doesn't have permission" + - " to call '" + method + "'"); + " doesn't have permission" + + " to call '" + method + "'"); } if (LOG.isTraceEnabled()) { LOG.trace(method + " invoked by user " + user.getShortUserName()); @@ -362,56 +386,56 @@ public class RMServerUtils { public static YarnApplicationState createApplicationState( RMAppState rmAppState) { switch (rmAppState) { - case NEW: - return YarnApplicationState.NEW; - case NEW_SAVING: - return YarnApplicationState.NEW_SAVING; - case SUBMITTED: - return YarnApplicationState.SUBMITTED; - case ACCEPTED: - return YarnApplicationState.ACCEPTED; - case RUNNING: - return YarnApplicationState.RUNNING; - case FINISHING: - case FINISHED: - return YarnApplicationState.FINISHED; - case KILLED: - return YarnApplicationState.KILLED; - case FAILED: - return YarnApplicationState.FAILED; - default: - throw new YarnRuntimeException("Unknown state passed!"); - } + case NEW: + return YarnApplicationState.NEW; + case NEW_SAVING: + return YarnApplicationState.NEW_SAVING; + case SUBMITTED: + return YarnApplicationState.SUBMITTED; + case ACCEPTED: + return YarnApplicationState.ACCEPTED; + case RUNNING: + return YarnApplicationState.RUNNING; + case FINISHING: + case FINISHED: + return YarnApplicationState.FINISHED; + case KILLED: + return YarnApplicationState.KILLED; + case FAILED: + return YarnApplicationState.FAILED; + default: + throw new YarnRuntimeException("Unknown state passed!"); + } } public static YarnApplicationAttemptState createApplicationAttemptState( RMAppAttemptState rmAppAttemptState) { switch (rmAppAttemptState) { - case NEW: - return YarnApplicationAttemptState.NEW; - case SUBMITTED: - return YarnApplicationAttemptState.SUBMITTED; - case SCHEDULED: - return YarnApplicationAttemptState.SCHEDULED; - case ALLOCATED: - return YarnApplicationAttemptState.ALLOCATED; - case LAUNCHED: - return YarnApplicationAttemptState.LAUNCHED; - case ALLOCATED_SAVING: - case LAUNCHED_UNMANAGED_SAVING: - return YarnApplicationAttemptState.ALLOCATED_SAVING; - case RUNNING: - return YarnApplicationAttemptState.RUNNING; - case FINISHING: - return YarnApplicationAttemptState.FINISHING; - case FINISHED: - return YarnApplicationAttemptState.FINISHED; - case KILLED: - return YarnApplicationAttemptState.KILLED; - case FAILED: - return YarnApplicationAttemptState.FAILED; - default: - throw new YarnRuntimeException("Unknown state passed!"); + case NEW: + return YarnApplicationAttemptState.NEW; + case SUBMITTED: + return YarnApplicationAttemptState.SUBMITTED; + case SCHEDULED: + return YarnApplicationAttemptState.SCHEDULED; + case ALLOCATED: + return YarnApplicationAttemptState.ALLOCATED; + case LAUNCHED: + return YarnApplicationAttemptState.LAUNCHED; + case ALLOCATED_SAVING: + case LAUNCHED_UNMANAGED_SAVING: + return YarnApplicationAttemptState.ALLOCATED_SAVING; + case RUNNING: + return YarnApplicationAttemptState.RUNNING; + case FINISHING: + return YarnApplicationAttemptState.FINISHING; + case FINISHED: + return YarnApplicationAttemptState.FINISHED; + case KILLED: + return YarnApplicationAttemptState.KILLED; + case FAILED: + return YarnApplicationAttemptState.FAILED; + default: + throw new YarnRuntimeException("Unknown state passed!"); } } @@ -420,13 +444,12 @@ public class RMServerUtils { * a return value when a valid report cannot be found. */ public static final ApplicationResourceUsageReport - DUMMY_APPLICATION_RESOURCE_USAGE_REPORT = + DUMMY_APPLICATION_RESOURCE_USAGE_REPORT = BuilderUtils.newApplicationResourceUsageReport(-1, -1, Resources.createResource(-1, -1), Resources.createResource(-1, -1), Resources.createResource(-1, -1), 0, 0); - /** * Find all configs whose name starts with * YarnConfiguration.RM_PROXY_USER_PREFIX, and add a record for each one by @@ -438,7 +461,8 @@ public class RMServerUtils { String propName = entry.getKey(); if (propName.startsWith(YarnConfiguration.RM_PROXY_USER_PREFIX)) { rmProxyUsers.put(ProxyUsers.CONF_HADOOP_PROXYUSER + "." + - propName.substring(YarnConfiguration.RM_PROXY_USER_PREFIX.length()), + propName.substring(YarnConfiguration.RM_PROXY_USER_PREFIX + .length()), entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 755defd..45415de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -48,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -437,6 +437,7 @@ public abstract class AbstractYarnScheduler Container.newInstance(status.getContainerId(), node.getNodeID(), node.getHttpAddress(), status.getAllocatedResource(), status.getPriority(), null); + container.setVersion(status.getVersion()); ApplicationAttemptId attemptId = container.getId().getApplicationAttemptId(); RMContainer rmContainer = @@ -575,7 +576,7 @@ public abstract class AbstractYarnScheduler } protected void decreaseContainers( - List<ContainerResourceChangeRequest> decreaseRequests, + List<UpdateContainerRequest> decreaseRequests, SchedulerApplicationAttempt attempt) { if (null == decreaseRequests || decreaseRequests.isEmpty()) { return; @@ -748,7 +749,7 @@ public abstract class AbstractYarnScheduler /** * Sanity check increase/decrease request, and return * SchedulerContainerResourceChangeRequest according to given - * ContainerResourceChangeRequest. + * UpdateContainerRequest. * * <pre> * - Returns non-null value means validation succeeded @@ -756,7 +757,7 @@ public abstract class AbstractYarnScheduler * </pre> */ private SchedContainerChangeRequest createSchedContainerChangeRequest( - ContainerResourceChangeRequest request, boolean increase) + UpdateContainerRequest request, boolean increase) throws YarnException { ContainerId containerId = request.getContainerId(); RMContainer rmContainer = getRMContainer(containerId); @@ -775,11 +776,11 @@ public abstract class AbstractYarnScheduler protected List<SchedContainerChangeRequest> createSchedContainerChangeRequests( - List<ContainerResourceChangeRequest> changeRequests, + List<UpdateContainerRequest> changeRequests, boolean increase) { List<SchedContainerChangeRequest> schedulerChangeRequests = new ArrayList<SchedContainerChangeRequest>(); - for (ContainerResourceChangeRequest r : changeRequests) { + for (UpdateContainerRequest r : changeRequests) { SchedContainerChangeRequest sr = null; try { sr = createSchedContainerChangeRequest(r, increase); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java index e4ab3a2..94b006c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; /** - * This is ContainerResourceChangeRequest in scheduler side, it contains some + * This is UpdateContainerRequest in scheduler side, it contains some * pointers to runtime objects like RMContainer, SchedulerNode, etc. This will * be easier for scheduler making decision. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index c4b32a8..97d29cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -530,6 +530,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { boolean newContainer, boolean increasedContainer) { Container container = rmContainer.getContainer(); ContainerType containerType = ContainerType.TASK; + if (!newContainer) { + container.setVersion(container.getVersion() + 1); + } // The working knowledge is that masterContainer for AM is null as it // itself is the master container. if (isWaitingForAMContainer()) { @@ -538,10 +541,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { try { // create container token and NMToken altogether. container.setContainerToken(rmContext.getContainerTokenSecretManager() - .createContainerToken(container.getId(), container.getNodeId(), - getUser(), container.getResource(), container.getPriority(), - rmContainer.getCreationTime(), this.logAggregationContext, - rmContainer.getNodeLabelExpression(), containerType)); + .createContainerToken(container.getId(), container.getVersion(), + container.getNodeId(), getUser(), container.getResource(), + container.getPriority(), rmContainer.getCreationTime(), + this.logAggregationContext, rmContainer.getNodeLabelExpression(), + containerType)); NMToken nmToken = rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), getApplicationAttemptId(), container); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 0aff669..c4f575f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -43,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -143,8 +143,8 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { Allocation allocate(ApplicationAttemptId appAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests); + List<UpdateContainerRequest> increaseRequests, + List<UpdateContainerRequest> decreaseRequests); /** * Get node resource usage report. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 35e1147..33fe9ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -63,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -925,7 +925,7 @@ public class CapacityScheduler extends // SchedContainerChangeRequest // 2. Deadlock with the scheduling thread. private LeafQueue updateIncreaseRequests( - List<ContainerResourceChangeRequest> increaseRequests, + List<UpdateContainerRequest> increaseRequests, FiCaSchedulerApp app) { if (null == increaseRequests || increaseRequests.isEmpty()) { return null; @@ -953,8 +953,8 @@ public class CapacityScheduler extends public Allocation allocate(ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) { + List<UpdateContainerRequest> increaseRequests, + List<UpdateContainerRequest> decreaseRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 73d56d7..140b4f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -942,8 +942,8 @@ public class FairScheduler extends public Allocation allocate(ApplicationAttemptId appAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) { + List<UpdateContainerRequest> increaseRequests, + List<UpdateContainerRequest> decreaseRequests) { // Make sure this application exists FSAppAttempt application = getSchedulerApp(appAttemptId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 2863a97..e9ffd09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -330,8 +330,8 @@ public class FifoScheduler extends public Allocation allocate(ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) { + List<UpdateContainerRequest> increaseRequests, + List<UpdateContainerRequest> decreaseRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { LOG.error("Calling allocate on removed " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java index 6f00615..8c42255 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -168,39 +169,43 @@ public class RMContainerTokenSecretManager extends /** * Helper function for creating ContainerTokens * - * @param containerId - * @param nodeId - * @param appSubmitter - * @param capability - * @param priority - * @param createTime + * @param containerId Container Id + * @param containerVersion Container Version + * @param nodeId Node Id + * @param appSubmitter App Submitter + * @param capability Capability + * @param priority Priority + * @param createTime Create Time * @return the container-token */ - public Token createContainerToken(ContainerId containerId, NodeId nodeId, - String appSubmitter, Resource capability, Priority priority, - long createTime) { - return createContainerToken(containerId, nodeId, appSubmitter, capability, - priority, createTime, null, null, ContainerType.TASK); + public Token createContainerToken(ContainerId containerId, + int containerVersion, NodeId nodeId, String appSubmitter, + Resource capability, Priority priority, long createTime) { + return createContainerToken(containerId, containerVersion, nodeId, + appSubmitter, capability, priority, createTime, + null, null, ContainerType.TASK); } /** * Helper function for creating ContainerTokens * - * @param containerId - * @param nodeId - * @param appSubmitter - * @param capability - * @param priority - * @param createTime - * @param logAggregationContext - * @param nodeLabelExpression - * @param containerType + * @param containerId Container Id + * @param containerVersion Container version + * @param nodeId Node Id + * @param appSubmitter App Submitter + * @param capability Capability + * @param priority Priority + * @param createTime Create Time + * @param logAggregationContext Log Aggregation Context + * @param nodeLabelExpression Node Label Expression + * @param containerType Container Type * @return the container-token */ - public Token createContainerToken(ContainerId containerId, NodeId nodeId, - String appSubmitter, Resource capability, Priority priority, - long createTime, LogAggregationContext logAggregationContext, - String nodeLabelExpression, ContainerType containerType) { + public Token createContainerToken(ContainerId containerId, + int containerVersion, NodeId nodeId, String appSubmitter, + Resource capability, Priority priority, long createTime, + LogAggregationContext logAggregationContext, String nodeLabelExpression, + ContainerType containerType) { byte[] password; ContainerTokenIdentifier tokenIdentifier; long expiryTimeStamp = @@ -210,11 +215,12 @@ public class RMContainerTokenSecretManager extends this.readLock.lock(); try { tokenIdentifier = - new ContainerTokenIdentifier(containerId, nodeId.toString(), - appSubmitter, capability, expiryTimeStamp, this.currentMasterKey - .getMasterKey().getKeyId(), - ResourceManager.getClusterTimeStamp(), priority, createTime, - logAggregationContext, nodeLabelExpression, containerType); + new ContainerTokenIdentifier(containerId, containerVersion, + nodeId.toString(), appSubmitter, capability, expiryTimeStamp, + this.currentMasterKey.getMasterKey().getKeyId(), + ResourceManager.getClusterTimeStamp(), priority, createTime, + logAggregationContext, nodeLabelExpression, containerType, + ExecutionType.GUARANTEED); password = this.createPassword(tokenIdentifier); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 1b11472..593de08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -34,12 +34,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -245,10 +245,9 @@ public class MockAM { } public AllocateResponse sendContainerResizingRequest( - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) throws Exception { + List<UpdateContainerRequest> updateRequests) throws Exception { final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null, - null, increaseRequests, decreaseRequests); + null, updateRequests); return allocate(req); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index a7d8ba2..7c02264 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -386,9 +386,10 @@ public class TestApplicationCleanup { // nm1/nm2 register to rm2, and do a heartbeat nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1.registerNode(Arrays.asList(NMContainerStatus.newInstance( - ContainerId.newContainerId(am0.getApplicationAttemptId(), 1), - ContainerState.COMPLETE, Resource.newInstance(1024, 1), "", 0, - Priority.newInstance(0), 1234)), Arrays.asList(app0.getApplicationId())); + ContainerId.newContainerId(am0.getApplicationAttemptId(), 1), 0, + ContainerState.COMPLETE, Resource.newInstance(1024, 1), "", 0, + Priority.newInstance(0), 1234)), + Arrays.asList(app0.getApplicationId())); nm2.setResourceTrackerService(rm2.getResourceTrackerService()); nm2.registerNode(Arrays.asList(app0.getApplicationId())); @@ -598,7 +599,7 @@ public class TestApplicationCleanup { int memory) { ContainerId containerId = ContainerId.newContainerId(appAttemptId, id); NMContainerStatus containerReport = - NMContainerStatus.newInstance(containerId, containerState, + NMContainerStatus.newInstance(containerId, 0, containerState, Resource.newInstance(memory, 1), "recover container", 0, Priority.newInstance(0), 0); return containerReport; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6d9cff2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 64673d2..93befcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -35,16 +35,16 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -383,57 +383,47 @@ public class TestApplicationMasterService { // Ask for a normal increase should be successfull am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources.createResource(2048))), null); + UpdateContainerRequest.newInstance( + 0, ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(2048), null))); // Target resource is negative, should fail - boolean exceptionCaught = false; - try { - am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources.createResource(-1))), null); - } catch (InvalidResourceRequestException e) { - // This is expected - exceptionCaught = true; - } - Assert.assertTrue(exceptionCaught); - + AllocateResponse response = + am1.sendContainerResizingRequest(Arrays.asList( + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(-1), null))); + Assert.assertEquals(1, response.getUpdateErrors().size()); + Assert.assertEquals("RESOURCE_OUTSIDE_ALLOWED_RANGE", + response.getUpdateErrors().get(0).getReason()); + // Target resource is more than maxAllocation, should fail - try { - am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources - .add(registerResponse.getMaximumResourceCapability(), - Resources.createResource(1)))), null); - } catch (InvalidResourceRequestException e) { - // This is expected - exceptionCaught = true; - } + response = am1.sendContainerResizingRequest(Arrays.asList( + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.add( + registerResponse.getMaximumResourceCapability(), + Resources.createResource(1)), null))); + Assert.assertEquals(1, response.getUpdateErrors().size()); + Assert.assertEquals("RESOURCE_OUTSIDE_ALLOWED_RANGE", + response.getUpdateErrors().get(0).getReason()); - Assert.assertTrue(exceptionCaught); - // Contains multiple increase/decrease requests for same contaienrId - try { - am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources - .add(registerResponse.getMaximumResourceCapability(), - Resources.createResource(1)))), Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources - .add(registerResponse.getMaximumResourceCapability(), - Resources.createResource(1))))); - } catch (InvalidResourceRequestException e) { - // This is expected - exceptionCaught = true; - } - - Assert.assertTrue(exceptionCaught); + response = am1.sendContainerResizingRequest(Arrays.asList( + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(2048, 4), null), + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.DECREASE_RESOURCE, + Resources.createResource(1024, 1), null))); + Assert.assertEquals(1, response.getUpdateErrors().size()); + Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", + response.getUpdateErrors().get(0).getReason()); } finally { if (rm != null) { rm.close(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
