http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index 4d5a7dc..fa13df4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -24,6 +24,10 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -43,17 +47,25 @@ public abstract class AbstractContainerAllocator { FiCaSchedulerApp application; final ResourceCalculator rc; final RMContext rmContext; - + ActivitiesManager activitiesManager; + public AbstractContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { + this(application, rc, rmContext, null); + } + + public AbstractContainerAllocator(FiCaSchedulerApp application, + ResourceCalculator rc, RMContext rmContext, + ActivitiesManager activitiesManager) { this.application = application; this.rc = rc; this.rmContext = rmContext; + this.activitiesManager = activitiesManager; } protected CSAssignment getCSAssignmentFromAllocateResult( Resource clusterResource, ContainerAllocation result, - RMContainer rmContainer) { + RMContainer rmContainer, FiCaSchedulerNode node) { // Handle skipped CSAssignment.SkippedType skipped = (result.getAllocationState() == AllocationState.APP_SKIPPED) ? @@ -61,7 +73,7 @@ public abstract class AbstractContainerAllocator { CSAssignment.SkippedType.NONE; CSAssignment assignment = new CSAssignment(skipped); assignment.setApplication(application); - + // Handle excess reservation assignment.setExcessReservation(result.getContainerToBeUnreserved()); @@ -85,6 +97,23 @@ public abstract class AbstractContainerAllocator { assignment.getAssignmentInformation().incrReservations(); Resources.addTo(assignment.getAssignmentInformation().getReserved(), allocatedResource); + + if (rmContainer != null) { + ActivitiesLogger.APP.recordAppActivityWithAllocation( + activitiesManager, node, application, updatedContainer, + ActivityState.RE_RESERVED); + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); + } else { + ActivitiesLogger.APP.recordAppActivityWithAllocation( + activitiesManager, node, application, updatedContainer, + ActivityState.RESERVED); + ActivitiesLogger.APP.finishAllocatedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + updatedContainer.getId(), ActivityState.RESERVED, + ActivityDiagnosticConstant.EMPTY); + } } else if (result.getAllocationState() == AllocationState.ALLOCATED){ // This is a new container // Inform the ordering policy @@ -105,10 +134,18 @@ public abstract class AbstractContainerAllocator { assignment.getAssignmentInformation().incrAllocations(); Resources.addTo(assignment.getAssignmentInformation().getAllocated(), allocatedResource); - + if (rmContainer != null) { assignment.setFulfilledReservation(true); } + + ActivitiesLogger.APP.recordAppActivityWithAllocation(activitiesManager, + node, application, updatedContainer, ActivityState.ALLOCATED); + ActivitiesLogger.APP.finishAllocatedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + updatedContainer.getId(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + } assignment.setContainersToKill(result.getToKillContainers()); @@ -118,13 +155,13 @@ public abstract class AbstractContainerAllocator { CSAssignment.SkippedType.QUEUE_LIMIT); } } - + return assignment; } - + /** * allocate needs to handle following stuffs: - * + * * <ul> * <li>Select request: Select a request to allocate. E.g. select a resource * request based on requirement/priority/locality.</li>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 3be8e0e..4eaa24b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -36,12 +37,17 @@ public class ContainerAllocator extends AbstractContainerAllocator { public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { + this(application, rc, rmContext, null); + } + + public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, + RMContext rmContext, ActivitiesManager activitiesManager) { super(application, rc, rmContext); increaseContainerAllocator = new IncreaseContainerAllocator(application, rc, rmContext); - regularContainerAllocator = - new RegularContainerAllocator(application, rc, rmContext); + regularContainerAllocator = new RegularContainerAllocator(application, rc, + rmContext, activitiesManager); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 29b37d8..21114f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -24,11 +24,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; @@ -37,6 +39,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestK import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -57,10 +65,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); private ResourceRequest lastResourceRequest = null; - + public RegularContainerAllocator(FiCaSchedulerApp application, - ResourceCalculator rc, RMContext rmContext) { - super(application, rc, rmContext); + ResourceCalculator rc, RMContext rmContext, + ActivitiesManager activitiesManager) { + super(application, rc, rmContext, activitiesManager); } private boolean checkHeadroom(Resource clusterResource, @@ -85,15 +94,23 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { private ContainerAllocation preCheckForNewContainer(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) { + Priority priority = schedulerKey.getPriority(); + if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) { application.updateAppSkipNodeDiagnostics( CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE); return ContainerAllocation.APP_SKIPPED; } ResourceRequest anyRequest = application.getResourceRequest(schedulerKey, ResourceRequest.ANY); if (null == anyRequest) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -102,6 +119,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Do we need containers at this 'priority'? if (application.getTotalRequiredResources(schedulerKey) <= 0) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -116,6 +136,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } application.updateAppSkipNodeDiagnostics( "Skipping assigning to Node in Ignore Exclusivity mode. "); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_IN_IGNORE_EXCLUSIVITY_MODE); return ContainerAllocation.APP_SKIPPED; } } @@ -126,6 +149,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( anyRequest.getNodeLabelExpression(), node.getPartition(), schedulingMode)) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant. + PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -134,6 +161,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { if (LOG.isDebugEnabled()) { LOG.debug("doesn't need containers based on reservation algo!"); } + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.DO_NOT_NEED_ALLOCATIONATTEMPTINFOS); return ContainerAllocation.PRIORITY_SKIPPED; } } @@ -143,6 +173,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { LOG.debug("cannot allocate required resource=" + required + " because of headroom"); } + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.QUEUE_SKIPPED_HEADROOM); return ContainerAllocation.QUEUE_SKIPPED; } @@ -174,7 +207,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { + missedNonPartitionedRequestSchedulingOpportunity + " required=" + rmContext.getScheduler().getNumClusterNodes()); } - + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.NON_PARTITIONED_PARTITION_FIRST); return ContainerAllocation.APP_SKIPPED; } } @@ -301,6 +336,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } // Skip node-local request, go to rack-local request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, schedulerKey.getPriority(), + ActivityDiagnosticConstant.SKIP_NODE_LOCAL_REQUEST); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -316,6 +354,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } // Skip rack-local request, go to off-switch request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, schedulerKey.getPriority(), + ActivityDiagnosticConstant.SKIP_RACK_LOCAL_REQUEST); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -332,6 +373,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { application.updateAppSkipNodeDiagnostics( CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, schedulerKey.getPriority(), + ActivityDiagnosticConstant.SKIP_OFF_SWITCH_REQUEST); return ContainerAllocation.APP_SKIPPED; } @@ -339,6 +383,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer reservedContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + Priority priority = schedulerKey.getPriority(); ContainerAllocation allocation; @@ -364,6 +409,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { application.getResourceRequest(schedulerKey, node.getRackName()); if (rackLocalResourceRequest != null) { if (!rackLocalResourceRequest.getRelaxLocality()) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -387,6 +435,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { application.getResourceRequest(schedulerKey, ResourceRequest.ANY); if (offSwitchResourceRequest != null) { if (!offSwitchResourceRequest.getRelaxLocality()) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY); return ContainerAllocation.PRIORITY_SKIPPED; } if (requestType != NodeType.NODE_LOCAL @@ -408,7 +459,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { return allocation; } - + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -416,6 +469,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, ResourceRequest request, NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + Priority priority = schedulerKey.getPriority(); lastResourceRequest = request; if (LOG.isDebugEnabled()) { @@ -432,6 +486,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // this is a reserved container, but we cannot allocate it now according // to label not match. This can be caused by node label changed // We should un-reserve this container. + ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager, + node, application, priority, + ActivityDiagnosticConstant.REQUEST_CAN_NOT_ACCESS_NODE_LABEL, + ActivityState.REJECTED); return new ContainerAllocation(rmContainer, null, AllocationState.LOCALITY_SKIPPED); } @@ -446,6 +504,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { + " does not have sufficient resource for request : " + request + " node total capability : " + node.getTotalResource()); // Skip this locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -524,6 +585,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // continue. if (null == unreservedContainer) { // Skip the locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.LOCALITY_SKIPPED); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -548,6 +612,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { LOG.debug("we needed to unreserve to be able to allocate"); } // Skip the locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.LOCALITY_SKIPPED); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -560,6 +627,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { return result; } // Skip the locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.LOCALITY_SKIPPED); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -636,6 +706,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { ContainerAllocation ret = new ContainerAllocation(allocationResult.containerToBeUnreserved, null, AllocationState.APP_SKIPPED); + ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager, + node, application, schedulerKey.getPriority(), + ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, ActivityState.REJECTED); return ret; } @@ -662,6 +735,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { application .updateAppSkipNodeDiagnostics("Scheduling of container failed. "); LOG.warn("Couldn't get container for allocation!"); + ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager, + node, application, schedulerKey.getPriority(), + ActivityDiagnosticConstant.COULD_NOT_GET_CONTAINER, + ActivityState.REJECTED); return ContainerAllocation.APP_SKIPPED; } @@ -741,6 +818,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-label=" + node.getPartition()); } + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.APPLICATION_DO_NOT_NEED_RESOURCE); return CSAssignment.SKIP_ASSIGNMENT; } @@ -755,18 +835,21 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { continue; } return getCSAssignmentFromAllocateResult(clusterResource, result, - null); + null, node); } // We will reach here if we skipped all priorities of the app, so we will // skip the app. + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES); return CSAssignment.SKIP_ASSIGNMENT; } else { ContainerAllocation result = allocate(clusterResource, node, schedulingMode, resourceLimits, reservedContainer.getReservedSchedulerKey(), reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, - reservedContainer); + reservedContainer, node); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 67d93a4..33dee80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; @@ -108,8 +109,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) { + this(applicationAttemptId, user, queue, activeUsersManager, rmContext, + appPriority, isAttemptRecovering, null); + } + + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext, Priority appPriority, boolean isAttemptRecovering, + ActivitiesManager activitiesManager) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); - + RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); Resource amResource; @@ -139,8 +148,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { if (scheduler.getResourceCalculator() != null) { rc = scheduler.getResourceCalculator(); } - - containerAllocator = new ContainerAllocator(this, rc, rmContext); + + containerAllocator = new ContainerAllocator(this, rc, rmContext, + activitiesManager); if (scheduler instanceof CapacityScheduler) { capacitySchedulerContext = (CapacitySchedulerContext) scheduler; @@ -189,7 +199,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return null; } - // Required sanity check - AM can call 'allocate' to update resource + // Required sanity check - AM can call 'allocate' to update resource // request without locking the scheduler, hence we need to check if (getTotalRequiredResources(schedulerKey) <= 0) { return null; @@ -493,7 +503,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { public LeafQueue getCSLeafQueue() { return (LeafQueue)queue; } - + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, RMContainer reservedContainer) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 75bffc7..4305fd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -130,9 +130,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; @@ -176,6 +180,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webapp.WebServices; @@ -577,6 +582,124 @@ public class RMWebServices extends WebServices { } @GET + @Path("/scheduler/activities") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public ActivitiesInfo getActivities(@Context HttpServletRequest hsr, + @QueryParam("nodeId") String nodeId) { + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + + if (scheduler instanceof AbstractYarnScheduler) { + String errMessage = ""; + + AbstractYarnScheduler abstractYarnScheduler = + (AbstractYarnScheduler) scheduler; + + ActivitiesManager activitiesManager = + abstractYarnScheduler.getActivitiesManager(); + if (null == activitiesManager) { + errMessage = "Not Capacity Scheduler"; + return new ActivitiesInfo(errMessage, nodeId); + } + + List<FiCaSchedulerNode> nodeList = + abstractYarnScheduler.getNodeTracker().getAllNodes(); + + boolean illegalInput = false; + + if (nodeList.size() == 0) { + illegalInput = true; + errMessage = "No node manager running in the cluster"; + } else { + if (nodeId != null) { + String hostName = nodeId; + String portName = ""; + if (nodeId.contains(":")) { + int index = nodeId.indexOf(":"); + hostName = nodeId.substring(0, index); + portName = nodeId.substring(index + 1); + } + + boolean correctNodeId = false; + for (FiCaSchedulerNode node : nodeList) { + if ((portName.equals("") && node.getRMNode().getHostName().equals( + hostName)) || (!portName.equals("") && node.getRMNode() + .getHostName().equals(hostName) && String.valueOf( + node.getRMNode().getCommandPort()).equals(portName))) { + correctNodeId = true; + nodeId = node.getNodeID().toString(); + break; + } + } + if (!correctNodeId) { + illegalInput = true; + errMessage = "Cannot find node manager with given node id"; + } + } + } + + if (!illegalInput) { + activitiesManager.recordNextNodeUpdateActivities(nodeId); + return activitiesManager.getActivitiesInfo(nodeId); + } + + // Return a activities info with error message + return new ActivitiesInfo(errMessage, nodeId); + } + + return null; + } + + @GET + @Path("/scheduler/app-activities") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, + @QueryParam("appId") String appId, @QueryParam("maxTime") String time) { + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + + if (scheduler instanceof AbstractYarnScheduler) { + AbstractYarnScheduler abstractYarnScheduler = + (AbstractYarnScheduler) scheduler; + + ActivitiesManager activitiesManager = + abstractYarnScheduler.getActivitiesManager(); + if (null == activitiesManager) { + String errMessage = "Not Capacity Scheduler"; + return new AppActivitiesInfo(errMessage, appId); + } + + if(appId == null) { + String errMessage = "Must provide an application Id"; + return new AppActivitiesInfo(errMessage, null); + } + + double maxTime = 3.0; + + if (time != null) { + if (time.contains(".")) { + maxTime = Double.parseDouble(time); + } else { + maxTime = Double.parseDouble(time + ".0"); + } + } + + ApplicationId applicationId; + try { + applicationId = ApplicationId.fromString(appId); + activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime); + AppActivitiesInfo appActivitiesInfo = + activitiesManager.getAppActivitiesInfo(applicationId); + + return appActivitiesInfo; + } catch (Exception e) { + String errMessage = "Cannot find application with given appId"; + return new AppActivitiesInfo(errMessage, appId); + } + + } + return null; + } + + @GET @Path("/appstatistics") @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) public ApplicationStatisticsInfo getAppStatistics( http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java new file mode 100644 index 0000000..0de340a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Date; +import java.util.List; +import java.util.ArrayList; + +/* + * DAO object to display node allocation activity. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class ActivitiesInfo { + protected String nodeId; + protected String timeStamp; + protected String diagnostic = null; + protected List<NodeAllocationInfo> allocations; + + private static final Log LOG = LogFactory.getLog(ActivitiesInfo.class); + + public ActivitiesInfo() { + } + + public ActivitiesInfo(String errorMessage, String nodeId) { + this.diagnostic = errorMessage; + this.nodeId = nodeId; + } + + public ActivitiesInfo(List<NodeAllocation> nodeAllocations, String nodeId) { + this.nodeId = nodeId; + this.allocations = new ArrayList<>(); + + if (nodeAllocations == null) { + diagnostic = (nodeId != null ? + "waiting for display" : + "waiting for next allocation"); + } else { + if (nodeAllocations.size() == 0) { + diagnostic = "do not have available resources"; + } else { + this.nodeId = nodeAllocations.get(0).getNodeId(); + + Date date = new Date(); + date.setTime(nodeAllocations.get(0).getTimeStamp()); + this.timeStamp = date.toString(); + + for (int i = 0; i < nodeAllocations.size(); i++) { + NodeAllocation nodeAllocation = nodeAllocations.get(i); + NodeAllocationInfo allocationInfo = new NodeAllocationInfo( + nodeAllocation); + this.allocations.add(allocationInfo); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java new file mode 100644 index 0000000..9553a720 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/* + * DAO object to display node information in allocation tree. + * It corresponds to "ActivityNode" class. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class ActivityNodeInfo { + protected String name; // The name for activity node + protected String appPriority; + protected String requestPriority; + protected String allocationState; + protected String diagnostic; + + protected List<ActivityNodeInfo> children; + + ActivityNodeInfo() { + } + + ActivityNodeInfo(ActivityNode node) { + this.name = node.getName(); + getPriority(node); + this.allocationState = node.getState().name(); + this.diagnostic = node.getDiagnostic(); + this.children = new ArrayList<>(); + + for (ActivityNode child : node.getChildren()) { + ActivityNodeInfo containerInfo = new ActivityNodeInfo(child); + this.children.add(containerInfo); + } + } + + private void getPriority(ActivityNode node) { + if (node.getType()) { + this.appPriority = node.getAppPriority(); + } else { + this.requestPriority = node.getRequestPriority(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java new file mode 100644 index 0000000..38c45a2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation; +import org.apache.hadoop.yarn.util.SystemClock; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/* + * DAO object to display application activity. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class AppActivitiesInfo { + protected String applicationId; + protected String diagnostic; + protected String timeStamp; + protected List<AppAllocationInfo> allocations; + + private static final Log LOG = LogFactory.getLog(AppActivitiesInfo.class); + + public AppActivitiesInfo() { + } + + public AppActivitiesInfo(String errorMessage, String applicationId) { + this.diagnostic = errorMessage; + this.applicationId = applicationId; + + Date date = new Date(); + date.setTime(SystemClock.getInstance().getTime()); + this.timeStamp = date.toString(); + } + + public AppActivitiesInfo(List<AppAllocation> appAllocations, + ApplicationId applicationId) { + this.applicationId = applicationId.toString(); + this.allocations = new ArrayList<>(); + + if (appAllocations == null) { + diagnostic = "waiting for display"; + + Date date = new Date(); + date.setTime(SystemClock.getInstance().getTime()); + this.timeStamp = date.toString(); + } else { + for (int i = appAllocations.size() - 1; i > -1; i--) { + AppAllocation appAllocation = appAllocations.get(i); + AppAllocationInfo appAllocationInfo = new AppAllocationInfo( + appAllocation); + this.allocations.add(appAllocationInfo); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java new file mode 100644 index 0000000..21d3788 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/* + * DAO object to display application allocation detailed information. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class AppAllocationInfo { + protected String nodeId; + protected String queueName; + protected String appPriority; + protected String allocatedContainerId; + protected String allocationState; + protected String diagnostic; + protected String timeStamp; + protected List<ActivityNodeInfo> allocationAttempt; + + private static final Log LOG = LogFactory.getLog(AppAllocationInfo.class); + + AppAllocationInfo() { + } + + AppAllocationInfo(AppAllocation allocation) { + this.allocationAttempt = new ArrayList<>(); + + this.nodeId = allocation.getNodeId(); + this.queueName = allocation.getQueueName(); + this.appPriority = allocation.getPriority(); + this.allocatedContainerId = allocation.getContainerId(); + this.allocationState = allocation.getAppState().name(); + this.diagnostic = allocation.getDiagnostic(); + + Date date = new Date(); + date.setTime(allocation.getTime()); + this.timeStamp = date.toString(); + + for (ActivityNode attempt : allocation.getAllocationAttempts()) { + ActivityNodeInfo containerInfo = new ActivityNodeInfo(attempt); + this.allocationAttempt.add(containerInfo); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java new file mode 100644 index 0000000..1350a76 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/* + * DAO object to display each node allocation in node heartbeat. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class NodeAllocationInfo { + protected String allocatedContainerId; + protected String finalAllocationState; + protected ActivityNodeInfo root = null; + + private static final Log LOG = LogFactory.getLog(NodeAllocationInfo.class); + + NodeAllocationInfo() { + } + + NodeAllocationInfo(NodeAllocation allocation) { + this.allocatedContainerId = allocation.getContainerId(); + this.finalAllocationState = allocation.getFinalAllocationState().name(); + + root = new ActivityNodeInfo(allocation.getRoot()); + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java index 649d719..bbdfdd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java @@ -62,9 +62,9 @@ import com.sun.jersey.test.framework.WebAppDescriptor; public class TestRMWebServicesCapacitySched extends JerseyTestBase { - private static MockRM rm; - private static CapacitySchedulerConfiguration csConf; - private static YarnConfiguration conf; + protected static MockRM rm; + protected static CapacitySchedulerConfiguration csConf; + protected static YarnConfiguration conf; private class QueueInfo { float capacity; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java new file mode 100644 index 0000000..d7b0581 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java @@ -0,0 +1,777 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Test; + +import javax.ws.rs.core.MediaType; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +public class TestRMWebServicesSchedulerActivities + extends TestRMWebServicesCapacitySched { + + private static final Log LOG = LogFactory.getLog( + TestRMWebServicesSchedulerActivities.class); + + @Test + public void testAssignMultipleContainersPerNodeHeartbeat() + throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1:1234"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 11); + + JSONArray allocations = json.getJSONArray("allocations"); + for (int i = 0; i < allocations.length(); i++) { + if (i != allocations.length() - 1) { + verifyStateOfAllocations(allocations.getJSONObject(i), + "finalAllocationState", "ALLOCATED"); + verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1"); + } else { + verifyStateOfAllocations(allocations.getJSONObject(i), + "finalAllocationState", "SKIPPED"); + verifyQueueOrder(allocations.getJSONObject(i), "root-a-b"); + } + } + } + finally { + rm.stop(); + } + } + + @Test + public void testAssignWithoutAvailableResource() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testNoNM() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + try { + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1:1234"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testWrongNodeId() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.0"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testReserveNewContainer() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024, + rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024, + rm.getResourceTrackerService()); + + nm1.registerNode(); + nm2.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096), + 10)), null); + + // Reserve new container + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.2"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b3-b1"); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", "RESERVED"); + + // Do a node heartbeat again without releasing container from app2 + r = resource(); + params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.2"); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + verifyQueueOrder(json.getJSONObject("allocations"), "b1"); + + allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", "SKIPPED"); + + // Finish application 2 + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ContainerId containerId = ContainerId.newContainerId( + am2.getApplicationAttemptId(), 1); + cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.FINISHED); + + // Do a node heartbeat again + r = resource(); + params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.2"); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + verifyQueueOrder(json.getJSONObject("allocations"), "b1"); + + allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", + "ALLOCATED_FROM_RESERVED"); + } + finally { + rm.stop(); + } + } + + @Test + public void testActivityJSON() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", + "ALLOCATED"); + + verifyNumberOfNodes(allocations, 6); + + verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b1"); + } + finally { + rm.stop(); + } + } + + private void verifyNumberOfNodes(JSONObject allocation, int realValue) + throws Exception { + if (allocation.isNull("root")) { + assertEquals("State of allocation is wrong", 0, realValue); + } else { + assertEquals("State of allocation is wrong", + 1 + getNumberOfNodes(allocation.getJSONObject("root")), realValue); + } + } + + private int getNumberOfNodes(JSONObject allocation) throws Exception { + if (!allocation.isNull("children")) { + Object object = allocation.get("children"); + if (object.getClass() == JSONObject.class) { + return 1 + getNumberOfNodes((JSONObject) object); + } else { + int count = 0; + for (int i = 0; i < ((JSONArray) object).length(); i++) { + count += (1 + getNumberOfNodes( + ((JSONArray) object).getJSONObject(i))); + } + return count; + } + } else { + return 0; + } + } + + private void verifyStateOfAllocations(JSONObject allocation, + String nameToCheck, String realState) throws Exception { + assertEquals("State of allocation is wrong", allocation.get(nameToCheck), + realState); + } + + private void verifyNumberOfAllocations(JSONObject json, int realValue) + throws Exception { + if (json.isNull("allocations")) { + assertEquals("Number of allocations is wrong", 0, realValue); + } else { + Object object = json.get("allocations"); + if (object.getClass() == JSONObject.class) { + assertEquals("Number of allocations is wrong", 1, realValue); + } else if (object.getClass() == JSONArray.class) { + assertEquals("Number of allocations is wrong", + ((JSONArray) object).length(), realValue); + } + } + } + + private void verifyQueueOrder(JSONObject json, String realOrder) + throws Exception { + String order = ""; + if (!json.isNull("root")) { + JSONObject root = json.getJSONObject("root"); + order = root.getString("name") + "-" + getQueueOrder(root); + } + assertEquals("Order of queue is wrong", + order.substring(0, order.length() - 1), realOrder); + } + + private String getQueueOrder(JSONObject node) throws Exception { + if (!node.isNull("children")) { + Object children = node.get("children"); + if (children.getClass() == JSONObject.class) { + if (!((JSONObject) children).isNull("appPriority")) { + return ""; + } + return ((JSONObject) children).getString("name") + "-" + getQueueOrder( + (JSONObject) children); + } else if (children.getClass() == JSONArray.class) { + String order = ""; + for (int i = 0; i < ((JSONArray) children).length(); i++) { + JSONObject child = (JSONObject) ((JSONArray) children).get(i); + if (!child.isNull("appPriority")) { + return ""; + } + order += (child.getString("name") + "-" + getQueueOrder(child)); + } + return order; + } + } + return ""; + } + + private void verifyNumberOfAllocationAttempts(JSONObject allocation, + int realValue) throws Exception { + if (allocation.isNull("allocationAttempt")) { + assertEquals("Number of allocation attempts is wrong", 0, realValue); + } else { + Object object = allocation.get("allocationAttempt"); + if (object.getClass() == JSONObject.class) { + assertEquals("Number of allocations attempts is wrong", 1, realValue); + } else if (object.getClass() == JSONArray.class) { + assertEquals("Number of allocations attempts is wrong", + ((JSONArray) object).length(), realValue); + } + } + } + + @Test + public void testAppActivityJSON() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + nm.nodeHeartbeat(true); + Thread.sleep(5000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED"); + + verifyNumberOfAllocationAttempts(allocations, 1); + } + finally { + rm.stop(); + } + } + + @Test + public void testAppAssignMultipleContainersPerNodeHeartbeat() + throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + nm.nodeHeartbeat(true); + Thread.sleep(5000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 10); + + JSONArray allocations = json.getJSONArray("allocations"); + for (int i = 0; i < allocations.length(); i++) { + verifyStateOfAllocations(allocations.getJSONObject(i), + "allocationState", "ACCEPTED"); + } + } + finally { + rm.stop(); + } + } + + @Test + public void testAppAssignWithoutAvailableResource() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + nm.nodeHeartbeat(true); + Thread.sleep(5000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testAppNoNM() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testAppReserveNewContainer() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024, + rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024, + rm.getResourceTrackerService()); + + nm1.registerNode(); + nm2.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096), + 10)), null); + + // Reserve new container + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + // Do a node heartbeat again without releasing container from app2 + r = resource(); + params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 2); + + // Finish application 2 + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ContainerId containerId = ContainerId.newContainerId( + am2.getApplicationAttemptId(), 1); + cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.FINISHED); + + // Do a node heartbeat again + r = resource(); + params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 3); + } + finally { + rm.stop(); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org