YARN-5552. Add Builder methods for common yarn API records. (Tao Jie via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ede1a473 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ede1a473 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ede1a473 Branch: refs/heads/YARN-3926 Commit: ede1a473f5061cf40f6affc1c8c30a645c1fef6c Parents: aa6010c Author: Wangda Tan <wan...@apache.org> Authored: Fri Nov 11 13:34:56 2016 -0800 Committer: Wangda Tan <wan...@apache.org> Committed: Fri Nov 11 13:34:56 2016 -0800 ---------------------------------------------------------------------- .../api/protocolrecords/AllocateRequest.java | 134 ++++++++- .../api/protocolrecords/AllocateResponse.java | 283 +++++++++++++++++-- .../yarn/api/records/ResourceRequest.java | 177 +++++++++++- .../hadoop/yarn/client/api/AMRMClient.java | 109 +++++-- .../yarn/client/api/impl/AMRMClientImpl.java | 29 +- .../impl/pb/AllocateResponsePBImpl.java | 4 +- .../scheduler/AppSchedulingInfo.java | 11 +- .../scheduler/common/fica/FiCaSchedulerApp.java | 5 +- 8 files changed, 657 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ede1a473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index f7ce127..0786794 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -66,8 +66,10 @@ public abstract class AllocateRequest { List<ResourceRequest> resourceAsk, List<ContainerId> containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest) { - return newInstance(responseID, appProgress, resourceAsk, - containersToBeReleased, resourceBlacklistRequest, null); + return AllocateRequest.newBuilder().responseId(responseID) + .progress(appProgress).askList(resourceAsk) + .releaseList(containersToBeReleased) + .resourceBlacklistRequest(resourceBlacklistRequest).build(); } @Public @@ -77,14 +79,12 @@ public abstract class AllocateRequest { List<ContainerId> containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest, List<UpdateContainerRequest> updateRequests) { - AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - allocateRequest.setResponseId(responseID); - allocateRequest.setProgress(appProgress); - allocateRequest.setAskList(resourceAsk); - allocateRequest.setReleaseList(containersToBeReleased); - allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); - allocateRequest.setUpdateRequests(updateRequests); - return allocateRequest; + return AllocateRequest.newBuilder().responseId(responseID) + .progress(appProgress).askList(resourceAsk) + .releaseList(containersToBeReleased) + .resourceBlacklistRequest(resourceBlacklistRequest) + .updateRequests(updateRequests) + .build(); } /** @@ -211,4 +211,116 @@ public abstract class AllocateRequest { @Unstable public abstract void setUpdateRequests( List<UpdateContainerRequest> updateRequests); -} + + @Public + @Unstable + public static AllocateRequestBuilder newBuilder() { + return new AllocateRequestBuilder(); + } + + /** + * Class to construct instances of {@link AllocateRequest} with specific + * options. + */ + @Public + @Stable + public static final class AllocateRequestBuilder { + private AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + + private AllocateRequestBuilder() { + } + + /** + * Set the <code>responseId</code> of the request. + * @see AllocateRequest#setResponseId(int) + * @param responseId <code>responseId</code> of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Stable + public AllocateRequestBuilder responseId(int responseId) { + allocateRequest.setResponseId(responseId); + return this; + } + + /** + * Set the <code>progress</code> of the request. + * @see AllocateRequest#setProgress(float) + * @param progress <code>progress</code> of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Stable + public AllocateRequestBuilder progress(float progress) { + allocateRequest.setProgress(progress); + return this; + } + + /** + * Set the <code>askList</code> of the request. + * @see AllocateRequest#setAskList(List) + * @param askList <code>askList</code> of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Stable + public AllocateRequestBuilder askList(List<ResourceRequest> askList) { + allocateRequest.setAskList(askList); + return this; + } + + /** + * Set the <code>releaseList</code> of the request. + * @see AllocateRequest#setReleaseList(List) + * @param releaseList <code>releaseList</code> of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Stable + public AllocateRequestBuilder releaseList(List<ContainerId> releaseList) { + allocateRequest.setReleaseList(releaseList); + return this; + } + + /** + * Set the <code>resourceBlacklistRequest</code> of the request. + * @see AllocateRequest#setResourceBlacklistRequest( + * ResourceBlacklistRequest) + * @param resourceBlacklistRequest + * <code>resourceBlacklistRequest</code> of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Stable + public AllocateRequestBuilder resourceBlacklistRequest( + ResourceBlacklistRequest resourceBlacklistRequest) { + allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); + return this; + } + + /** + * Set the <code>updateRequests</code> of the request. + * @see AllocateRequest#setUpdateRequests(List) + * @param updateRequests <code>updateRequests</code> of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Unstable + public AllocateRequestBuilder updateRequests( + List<UpdateContainerRequest> updateRequests) { + allocateRequest.setUpdateRequests(updateRequests); + return this; + } + + /** + * Return generated {@link AllocateRequest} object. + * @return {@link AllocateRequest} + */ + @Public + @Stable + public AllocateRequest build() { + return allocateRequest; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ede1a473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 69089ee..d3ca765 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -84,17 +84,12 @@ public abstract class AllocateResponse { List<Container> allocatedContainers, List<NodeReport> updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List<NMToken> nmTokens) { - AllocateResponse response = Records.newRecord(AllocateResponse.class); - response.setNumClusterNodes(numClusterNodes); - response.setResponseId(responseId); - response.setCompletedContainersStatuses(completedContainers); - response.setAllocatedContainers(allocatedContainers); - response.setUpdatedNodes(updatedNodes); - response.setAvailableResources(availResources); - response.setAMCommand(command); - response.setPreemptionMessage(preempt); - response.setNMTokens(nmTokens); - return response; + return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) + .responseId(responseId) + .completedContainersStatuses(completedContainers) + .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes) + .availableResources(availResources).amCommand(command) + .preemptionMessage(preempt).nmTokens(nmTokens).build(); } @Public @@ -105,11 +100,13 @@ public abstract class AllocateResponse { Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List<NMToken> nmTokens, List<UpdatedContainer> updatedContainers) { - AllocateResponse response = newInstance(responseId, completedContainers, - allocatedContainers, updatedNodes, availResources, command, - numClusterNodes, preempt, nmTokens); - response.setUpdatedContainers(updatedContainers); - return response; + return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) + .responseId(responseId) + .completedContainersStatuses(completedContainers) + .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes) + .availableResources(availResources).amCommand(command) + .preemptionMessage(preempt).nmTokens(nmTokens) + .updatedContainers(updatedContainers).build(); } @Private @@ -120,12 +117,13 @@ public abstract class AllocateResponse { Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken, List<UpdatedContainer> updatedContainers) { - AllocateResponse response = - newInstance(responseId, completedContainers, allocatedContainers, - updatedNodes, availResources, command, numClusterNodes, preempt, - nmTokens, updatedContainers); - response.setAMRMToken(amRMToken); - return response; + return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) + .responseId(responseId) + .completedContainersStatuses(completedContainers) + .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes) + .availableResources(availResources).amCommand(command) + .preemptionMessage(preempt).nmTokens(nmTokens) + .updatedContainers(updatedContainers).amRmToken(amRMToken).build(); } @Public @@ -136,13 +134,14 @@ public abstract class AllocateResponse { Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken, List<UpdatedContainer> updatedContainers, String collectorAddr) { - AllocateResponse response = - newInstance(responseId, completedContainers, allocatedContainers, - updatedNodes, availResources, command, numClusterNodes, preempt, - nmTokens, updatedContainers); - response.setAMRMToken(amRMToken); - response.setCollectorAddr(collectorAddr); - return response; + return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) + .responseId(responseId) + .completedContainersStatuses(completedContainers) + .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes) + .availableResources(availResources).amCommand(command) + .preemptionMessage(preempt).nmTokens(nmTokens) + .updatedContainers(updatedContainers).amRmToken(amRMToken) + .collectorAddr(collectorAddr).build(); } /** @@ -370,4 +369,230 @@ public abstract class AllocateResponse { @Unstable public void setUpdateErrors(List<UpdateContainerError> updateErrors) { } + + @Private + @Unstable + public static AllocateResponseBuilder newBuilder() { + return new AllocateResponseBuilder(); + } + + /** + * Class to construct instances of {@link AllocateResponse} with specific + * options. + */ + @Private + @Unstable + public static final class AllocateResponseBuilder { + private AllocateResponse allocateResponse = + Records.newRecord(AllocateResponse.class); + + private AllocateResponseBuilder() { + allocateResponse.setApplicationPriority(Priority.newInstance(0)); + } + + /** + * Set the <code>amCommand</code> of the response. + * @see AllocateResponse#setAMCommand(AMCommand) + * @param amCommand <code>amCommand</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder amCommand(AMCommand amCommand) { + allocateResponse.setAMCommand(amCommand); + return this; + } + + /** + * Set the <code>responseId</code> of the response. + * @see AllocateResponse#setResponseId(int) + * @param responseId <code>responseId</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder responseId(int responseId) { + allocateResponse.setResponseId(responseId); + return this; + } + + /** + * Set the <code>allocatedContainers</code> of the response. + * @see AllocateResponse#setAllocatedContainers(List) + * @param allocatedContainers + * <code>allocatedContainers</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder allocatedContainers( + List<Container> allocatedContainers) { + allocateResponse.setAllocatedContainers(allocatedContainers); + return this; + } + + /** + * Set the <code>availableResources</code> of the response. + * @see AllocateResponse#setAvailableResources(Resource) + * @param availableResources + * <code>availableResources</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder availableResources( + Resource availableResources) { + allocateResponse.setAvailableResources(availableResources); + return this; + } + + /** + * Set the <code>completedContainersStatuses</code> of the response. + * @see AllocateResponse#setCompletedContainersStatuses(List) + * @param completedContainersStatuses + * <code>completedContainersStatuses</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder completedContainersStatuses( + List<ContainerStatus> completedContainersStatuses) { + allocateResponse + .setCompletedContainersStatuses(completedContainersStatuses); + return this; + } + + /** + * Set the <code>updatedNodes</code> of the response. + * @see AllocateResponse#setUpdatedNodes(List) + * @param updatedNodes <code>updatedNodes</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder updatedNodes( + List<NodeReport> updatedNodes) { + allocateResponse.setUpdatedNodes(updatedNodes); + return this; + } + + /** + * Set the <code>numClusterNodes</code> of the response. + * @see AllocateResponse#setNumClusterNodes(int) + * @param numClusterNodes <code>numClusterNodes</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder numClusterNodes(int numClusterNodes) { + allocateResponse.setNumClusterNodes(numClusterNodes); + return this; + } + + /** + * Set the <code>preemptionMessage</code> of the response. + * @see AllocateResponse#setPreemptionMessage(PreemptionMessage) + * @param preemptionMessage <code>preemptionMessage</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder preemptionMessage( + PreemptionMessage preemptionMessage) { + allocateResponse.setPreemptionMessage(preemptionMessage); + return this; + } + + /** + * Set the <code>nmTokens</code> of the response. + * @see AllocateResponse#setNMTokens(List) + * @param nmTokens <code>nmTokens</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder nmTokens(List<NMToken> nmTokens) { + allocateResponse.setNMTokens(nmTokens); + return this; + } + + /** + * Set the <code>updatedContainers</code> of the response. + * @see AllocateResponse#setUpdatedContainers(List) + * @param updatedContainers <code>updatedContainers</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder updatedContainers( + List<UpdatedContainer> updatedContainers) { + allocateResponse.setUpdatedContainers(updatedContainers); + return this; + } + + /** + * Set the <code>amRmToken</code> of the response. + * @see AllocateResponse#setAMRMToken(Token) + * @param amRmToken <code>amRmToken</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder amRmToken(Token amRmToken) { + allocateResponse.setAMRMToken(amRmToken); + return this; + } + + /** + * Set the <code>applicationPriority</code> of the response. + * @see AllocateResponse#setApplicationPriority(Priority) + * @param applicationPriority + * <code>applicationPriority</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder applicationPriority( + Priority applicationPriority) { + allocateResponse.setApplicationPriority(applicationPriority); + return this; + } + + /** + * Set the <code>collectorAddr</code> of the response. + * @see AllocateResponse#setCollectorAddr(String) + * @param collectorAddr <code>collectorAddr</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder collectorAddr(String collectorAddr) { + allocateResponse.setCollectorAddr(collectorAddr); + return this; + } + + /** + * Set the <code>updateErrors</code> of the response. + * @see AllocateResponse#setUpdateErrors(List) + * @param updateErrors <code>updateErrors</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder updateErrors( + List<UpdateContainerError> updateErrors) { + allocateResponse.setUpdateErrors(updateErrors); + return this; + } + + /** + * Return generated {@link AllocateResponse} object. + * @return {@link AllocateResponse} + */ + @Private + @Unstable + public AllocateResponse build() { + return allocateResponse; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ede1a473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index cefae0e..be2c783 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -23,6 +23,7 @@ import java.io.Serializable; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.util.Records; @@ -63,15 +64,18 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> { @Stable public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers) { - return newInstance(priority, hostName, capability, numContainers, true); + return ResourceRequest.newBuilder().priority(priority) + .resourceName(hostName).capability(capability) + .numContainers(numContainers).build(); } @Public @Stable public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers, boolean relaxLocality) { - return newInstance(priority, hostName, capability, numContainers, - relaxLocality, null); + return ResourceRequest.newBuilder().priority(priority) + .resourceName(hostName).capability(capability) + .numContainers(numContainers).relaxLocality(relaxLocality).build(); } @Public @@ -79,8 +83,10 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> { public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers, boolean relaxLocality, String labelExpression) { - return newInstance(priority, hostName, capability, numContainers, - relaxLocality, labelExpression, ExecutionTypeRequest.newInstance()); + return ResourceRequest.newBuilder().priority(priority) + .resourceName(hostName).capability(capability) + .numContainers(numContainers).relaxLocality(relaxLocality) + .nodeLabelExpression(labelExpression).build(); } @Public @@ -88,15 +94,158 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> { public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers, boolean relaxLocality, String labelExpression, ExecutionTypeRequest executionTypeRequest) { - ResourceRequest request = Records.newRecord(ResourceRequest.class); - request.setPriority(priority); - request.setResourceName(hostName); - request.setCapability(capability); - request.setNumContainers(numContainers); - request.setRelaxLocality(relaxLocality); - request.setNodeLabelExpression(labelExpression); - request.setExecutionTypeRequest(executionTypeRequest); - return request; + return ResourceRequest.newBuilder().priority(priority) + .resourceName(hostName).capability(capability) + .numContainers(numContainers).relaxLocality(relaxLocality) + .nodeLabelExpression(labelExpression) + .executionTypeRequest(executionTypeRequest).build(); + } + + @Public + @Unstable + public static ResourceRequestBuilder newBuilder() { + return new ResourceRequestBuilder(); + } + + /** + * Class to construct instances of {@link ResourceRequest} with specific + * options. + */ + @Public + @Stable + public static final class ResourceRequestBuilder { + private ResourceRequest resourceRequest = + Records.newRecord(ResourceRequest.class); + + private ResourceRequestBuilder() { + resourceRequest.setResourceName(ANY); + resourceRequest.setNumContainers(1); + resourceRequest.setPriority(Priority.newInstance(0)); + resourceRequest.setRelaxLocality(true); + resourceRequest.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance()); + } + + /** + * Set the <code>priority</code> of the request. + * @see ResourceRequest#setPriority(Priority) + * @param priority <code>priority</code> of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Stable + public ResourceRequestBuilder priority(Priority priority) { + resourceRequest.setPriority(priority); + return this; + } + + /** + * Set the <code>resourceName</code> of the request. + * @see ResourceRequest#setResourceName(String) + * @param resourceName <code>resourceName</code> of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Stable + public ResourceRequestBuilder resourceName(String resourceName) { + resourceRequest.setResourceName(resourceName); + return this; + } + + /** + * Set the <code>capability</code> of the request. + * @see ResourceRequest#setCapability(Resource) + * @param capability <code>capability</code> of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Stable + public ResourceRequestBuilder capability(Resource capability) { + resourceRequest.setCapability(capability); + return this; + } + + /** + * Set the <code>numContainers</code> of the request. + * @see ResourceRequest#setNumContainers(int) + * @param numContainers <code>numContainers</code> of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Stable + public ResourceRequestBuilder numContainers(int numContainers) { + resourceRequest.setNumContainers(numContainers); + return this; + } + + /** + * Set the <code>relaxLocality</code> of the request. + * @see ResourceRequest#setRelaxLocality(boolean) + * @param relaxLocality <code>relaxLocality</code> of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Stable + public ResourceRequestBuilder relaxLocality(boolean relaxLocality) { + resourceRequest.setRelaxLocality(relaxLocality); + return this; + } + + /** + * Set the <code>nodeLabelExpression</code> of the request. + * @see ResourceRequest#setNodeLabelExpression(String) + * @param nodeLabelExpression + * <code>nodeLabelExpression</code> of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Evolving + public ResourceRequestBuilder nodeLabelExpression( + String nodeLabelExpression) { + resourceRequest.setNodeLabelExpression(nodeLabelExpression); + return this; + } + + /** + * Set the <code>executionTypeRequest</code> of the request. + * @see ResourceRequest#setExecutionTypeRequest( + * ExecutionTypeRequest) + * @param executionTypeRequest + * <code>executionTypeRequest</code> of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Evolving + public ResourceRequestBuilder executionTypeRequest( + ExecutionTypeRequest executionTypeRequest) { + resourceRequest.setExecutionTypeRequest(executionTypeRequest); + return this; + } + + /** + * Set the <code>allocationRequestId</code> of the request. + * @see ResourceRequest#setAllocationRequestId(long) + * @param allocationRequestId + * <code>allocationRequestId</code> of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Evolving + public ResourceRequestBuilder allocationRequestId( + long allocationRequestId) { + resourceRequest.setAllocationRequestId(allocationRequestId); + return this; + } + + /** + * Return generated {@link ResourceRequest} object. + * @return {@link ResourceRequest} + */ + @Public + @Stable + public ResourceRequest build() { + return resourceRequest; + } } @Public http://git-wip-us.apache.org/repos/asf/hadoop/blob/ede1a473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 2990c05..52155f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; @@ -106,14 +105,14 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends * All getters return immutable values. */ public static class ContainerRequest { - final Resource capability; - final List<String> nodes; - final List<String> racks; - final Priority priority; - final long allocationRequestId; - final boolean relaxLocality; - final String nodeLabelsExpression; - final ExecutionTypeRequest executionTypeRequest; + private Resource capability; + private List<String> nodes; + private List<String> racks; + private Priority priority; + private long allocationRequestId; + private boolean relaxLocality; + private String nodeLabelsExpression; + private ExecutionTypeRequest executionTypeRequest; /** * Instantiates a {@link ContainerRequest} with the given constraints and @@ -306,17 +305,6 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends Priority priority, long allocationRequestId, boolean relaxLocality, String nodeLabelsExpression, ExecutionTypeRequest executionTypeRequest) { - // Validate request - Preconditions.checkArgument(capability != null, - "The Resource to be requested for each container " + - "should not be null "); - Preconditions.checkArgument(priority != null, - "The priority at which to request containers should not be null "); - Preconditions.checkArgument( - !(!relaxLocality && (racks == null || racks.length == 0) - && (nodes == null || nodes.length == 0)), - "Can't turn off locality relaxation on a " + - "request with no location constraints"); this.allocationRequestId = allocationRequestId; this.capability = capability; this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); @@ -325,8 +313,25 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends this.relaxLocality = relaxLocality; this.nodeLabelsExpression = nodeLabelsExpression; this.executionTypeRequest = executionTypeRequest; + sanityCheck(); + } + + // Validate request + private void sanityCheck() { + Preconditions.checkArgument(capability != null, + "The Resource to be requested for each container " + + "should not be null "); + Preconditions.checkArgument(priority != null, + "The priority at which to request containers should not be null "); + Preconditions.checkArgument( + !(!relaxLocality && (racks == null || racks.size() == 0) + && (nodes == null || nodes.size() == 0)), + "Can't turn off locality relaxation on a " + + "request with no location constraints"); } + private ContainerRequest() {}; + public Resource getCapability() { return capability; } @@ -368,8 +373,70 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends .append("]"); return sb.toString(); } + + public static ContainerRequestBuilder newBuilder() { + return new ContainerRequestBuilder(); + } + + /** + * Class to construct instances of {@link ContainerRequest} with specific + * options. + */ + public static final class ContainerRequestBuilder { + private ContainerRequest containerRequest = new ContainerRequest(); + + public ContainerRequestBuilder capability(Resource capability) { + containerRequest.capability = capability; + return this; + } + + public ContainerRequestBuilder nodes(String[] nodes) { + containerRequest.nodes = + (nodes != null ? ImmutableList.copyOf(nodes): null); + return this; + } + + public ContainerRequestBuilder racks(String[] racks) { + containerRequest.racks = + (racks != null ? ImmutableList.copyOf(racks) : null); + return this; + } + + public ContainerRequestBuilder priority(Priority priority) { + containerRequest.priority = priority; + return this; + } + + public ContainerRequestBuilder allocationRequestId( + long allocationRequestId) { + containerRequest.allocationRequestId = allocationRequestId; + return this; + } + + public ContainerRequestBuilder relaxLocality(boolean relaxLocality) { + containerRequest.relaxLocality = relaxLocality; + return this; + } + + public ContainerRequestBuilder nodeLabelsExpression( + String nodeLabelsExpression) { + containerRequest.nodeLabelsExpression = nodeLabelsExpression; + return this; + } + + public ContainerRequestBuilder executionTypeRequest( + ExecutionTypeRequest executionTypeRequest) { + containerRequest.executionTypeRequest = executionTypeRequest; + return this; + } + + public ContainerRequest build() { + containerRequest.sanityCheck(); + return containerRequest; + } + } } - + /** * Register the application master. This must be called before any * other interaction http://git-wip-us.apache.org/repos/asf/hadoop/blob/ede1a473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 3ed43b0..44fc1e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -112,10 +112,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { ResourceRequestInfo(Long allocationRequestId, Priority priority, String resourceName, Resource capability, boolean relaxLocality) { - remoteRequest = ResourceRequest.newInstance(priority, resourceName, - capability, 0); - remoteRequest.setAllocationRequestId(allocationRequestId); - remoteRequest.setRelaxLocality(relaxLocality); + remoteRequest = ResourceRequest.newBuilder().priority(priority) + .resourceName(resourceName).capability(capability).numContainers(0) + .allocationRequestId(allocationRequestId) + .relaxLocality(relaxLocality).build(); containerRequests = new LinkedHashSet<T>(); } } @@ -279,10 +279,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { ResourceBlacklistRequest blacklistRequest = ResourceBlacklistRequest.newInstance(blacklistToAdd, blacklistToRemove); - - allocateRequest = - AllocateRequest.newInstance(lastResponseId, progressIndicator, - askList, releaseList, blacklistRequest, updateList); + + allocateRequest = AllocateRequest.newBuilder() + .responseId(lastResponseId).progress(progressIndicator) + .askList(askList).resourceBlacklistRequest(blacklistRequest) + .releaseList(releaseList).updateRequests(updateList).build(); // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -415,11 +416,13 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { for(ResourceRequest r : ask) { // create a copy of ResourceRequest as we might change it while the // RPC layer is using it to send info across - ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(), - r.getResourceName(), r.getCapability(), r.getNumContainers(), - r.getRelaxLocality(), r.getNodeLabelExpression(), - r.getExecutionTypeRequest()); - rr.setAllocationRequestId(r.getAllocationRequestId()); + ResourceRequest rr = ResourceRequest.newBuilder() + .priority(r.getPriority()).resourceName(r.getResourceName()) + .capability(r.getCapability()).numContainers(r.getNumContainers()) + .relaxLocality(r.getRelaxLocality()) + .nodeLabelExpression(r.getNodeLabelExpression()) + .executionTypeRequest(r.getExecutionTypeRequest()) + .allocationRequestId(r.getAllocationRequestId()).build(); askList.add(rr); } return askList; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ede1a473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index b4f51ef..c0d52a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -261,7 +261,9 @@ public class AllocateResponsePBImpl extends AllocateResponse { public synchronized void setUpdateErrors( List<UpdateContainerError> updateErrors) { if (updateErrors == null) { - this.updateErrors.clear(); + if (this.updateErrors != null) { + this.updateErrors.clear(); + } return; } this.updateErrors = new ArrayList<>( http://git-wip-us.apache.org/repos/asf/hadoop/blob/ede1a473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index ffb1885..80811b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -946,10 +946,13 @@ public class AppSchedulingInfo { } public ResourceRequest cloneResourceRequest(ResourceRequest request) { - ResourceRequest newRequest = - ResourceRequest.newInstance(request.getPriority(), - request.getResourceName(), request.getCapability(), 1, - request.getRelaxLocality(), request.getNodeLabelExpression()); + ResourceRequest newRequest = ResourceRequest.newBuilder() + .priority(request.getPriority()) + .resourceName(request.getResourceName()) + .capability(request.getCapability()) + .numContainers(1) + .relaxLocality(request.getRelaxLocality()) + .nodeLabelExpression(request.getNodeLabelExpression()).build(); return newRequest; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ede1a473/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 6d9dda8..f076e4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -744,8 +744,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } int numCont = (int) Math.ceil( Resources.divide(rc, clusterResource, tot, minimumAllocation)); - ResourceRequest rr = ResourceRequest.newInstance(Priority.UNDEFINED, - ResourceRequest.ANY, minimumAllocation, numCont); + ResourceRequest rr = ResourceRequest.newBuilder() + .priority(Priority.UNDEFINED).resourceName(ResourceRequest.ANY) + .capability(minimumAllocation).numContainers(numCont).build(); List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers(); List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers(); List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org