YARN-4014. Support user cli interface in for Application Priority. Contributed by Rohith Sharma K S
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/57c7ae1a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/57c7ae1a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/57c7ae1a Branch: refs/heads/YARN-1197 Commit: 57c7ae1affb2e1821fbdc3f47738d7e6fd83c7c1 Parents: 3b00eae Author: Jian He <jia...@apache.org> Authored: Mon Aug 24 20:36:08 2015 -0700 Committer: Jian He <jia...@apache.org> Committed: Mon Aug 24 20:36:44 2015 -0700 ---------------------------------------------------------------------- .../hadoop/mapred/ResourceMgrDelegate.java | 7 + .../hadoop/mapred/TestClientRedirect.java | 9 + hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/api/ApplicationClientProtocol.java | 18 ++ .../UpdateApplicationPriorityRequest.java | 80 +++++++++ .../UpdateApplicationPriorityResponse.java | 47 +++++ .../main/proto/applicationclient_protocol.proto | 1 + .../src/main/proto/yarn_service_protos.proto | 8 + .../hadoop/yarn/client/api/YarnClient.java | 17 ++ .../yarn/client/api/impl/YarnClientImpl.java | 11 ++ .../hadoop/yarn/client/cli/ApplicationCLI.java | 29 ++++ .../hadoop/yarn/client/cli/TestYarnCLI.java | 29 ++++ .../ApplicationClientProtocolPBClientImpl.java | 20 +++ .../ApplicationClientProtocolPBServiceImpl.java | 22 +++ .../UpdateApplicationPriorityRequestPBImpl.java | 171 +++++++++++++++++++ ...UpdateApplicationPriorityResponsePBImpl.java | 69 ++++++++ .../server/resourcemanager/ClientRMService.java | 73 ++++++++ .../server/resourcemanager/RMAuditLogger.java | 2 + .../resourcemanager/recovery/RMStateStore.java | 12 +- .../recovery/RMStateUpdateAppEvent.java | 13 ++ .../scheduler/capacity/CapacityScheduler.java | 16 +- .../resourcemanager/TestClientRMService.java | 63 +++++++ 22 files changed, 713 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 90f6876..91c3086 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -466,4 +467,10 @@ public class ResourceMgrDelegate extends YarnClient { throws YarnException, IOException { return client.getClusterNodeLabels(); } + + @Override + public void updateApplicationPriority(ApplicationId applicationId, + Priority priority) throws YarnException, IOException { + client.updateApplicationPriority(applicationId, priority); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index bb00b19..1bf1408 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -114,6 +114,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -444,6 +446,13 @@ public class TestClientRedirect { GetLabelsToNodesRequest request) throws YarnException, IOException { return null; } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) throws YarnException, + IOException { + return null; + } } class HistoryService extends AMService implements HSClientProtocol { http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index bf58c96..1190619 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -181,6 +181,9 @@ Release 2.8.0 - UNRELEASED YARN-221. NM should provide a way for AM to tell it not to aggregate logs. (Ming Ma via xgong) + YARN-4014. Support user cli interface in for Application Priority. + (Rohith Sharma K S via jianhe) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 8b9937b..08fc289 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -419,4 +421,20 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol { @Unstable public GetClusterNodeLabelsResponse getClusterNodeLabels( GetClusterNodeLabelsRequest request) throws YarnException, IOException; + + /** + * <p> + * The interface used by client to set priority of an application. + * </p> + * @param request to set priority of an application + * @return an empty response + * @throws YarnException + * @throws IOException + */ + @Public + @Unstable + @Idempotent + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) throws YarnException, + IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationPriorityRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationPriorityRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationPriorityRequest.java new file mode 100644 index 0000000..4cd10af --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationPriorityRequest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.util.Records; + +/** + * <p> + * The request sent by the client to the <code>ResourceManager</code> to set or + * update the application priority. + * </p> + * <p> + * The request includes the {@link ApplicationId} of the application and + * {@link Priority} to be set for an application + * </p> + * + * @see ApplicationClientProtocol#updateApplicationPriority(UpdateApplicationPriorityRequest) + */ + +@Public +@Unstable +public abstract class UpdateApplicationPriorityRequest { + public static UpdateApplicationPriorityRequest newInstance( + ApplicationId applicationId, Priority priority) { + UpdateApplicationPriorityRequest request = + Records.newRecord(UpdateApplicationPriorityRequest.class); + request.setApplicationId(applicationId); + request.setApplicationPriority(priority); + return request; + } + + /** + * Get the <code>ApplicationId</code> of the application. + * + * @return <code>ApplicationId</code> of the application + */ + public abstract ApplicationId getApplicationId(); + + /** + * Set the <code>ApplicationId</code> of the application. + * + * @param applicationId <code>ApplicationId</code> of the application + */ + public abstract void setApplicationId(ApplicationId applicationId); + + /** + * Get the <code>Priority</code> of the application to be set. + * + * @return <code>Priority</code> of the application to be set. + */ + public abstract Priority getApplicationPriority(); + + /** + * Set the <code>Priority</code> of the application. + * + * @param priority <code>Priority</code> of the application + */ + public abstract void setApplicationPriority(Priority priority); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationPriorityResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationPriorityResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationPriorityResponse.java new file mode 100644 index 0000000..0fdbe75 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationPriorityResponse.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + * <p> + * The response sent by the <code>ResourceManager</code> to the client on update + * the application priority. + * </p> + * <p> + * A response without exception means that the move has completed successfully. + * </p> + * + * @see ApplicationClientProtocol#updateApplicationPriority(UpdateApplicationPriorityRequest) + */ + +@Public +@Unstable +public abstract class UpdateApplicationPriorityResponse { + + public static UpdateApplicationPriorityResponse newInstance() { + UpdateApplicationPriorityResponse response = + Records.newRecord(UpdateApplicationPriorityResponse.class); + return response; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto index e7e3654..117c930 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto @@ -55,4 +55,5 @@ service ApplicationClientProtocolService { rpc getNodeToLabels (GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto); rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto); rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); + rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 098785a..b0b12d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -216,6 +216,14 @@ message GetClusterNodeLabelsResponseProto { repeated NodeLabelProto nodeLabels = 1; } +message UpdateApplicationPriorityRequestProto { + required ApplicationIdProto applicationId = 1; + required PriorityProto applicationPriority = 2; +} + +message UpdateApplicationPriorityResponseProto { +} + ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// ////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index ff03c7d..ff90da1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; @@ -666,4 +669,18 @@ public abstract class YarnClient extends AbstractService { @Unstable public abstract List<NodeLabel> getClusterNodeLabels() throws YarnException, IOException; + + /** + * <p> + * The interface used by client to set priority of an application + * </p> + * @param applicationId + * @param priority + * @throws YarnException + * @throws IOException + */ + @Public + @Unstable + public abstract void updateApplicationPriority(ApplicationId applicationId, + Priority priority) throws YarnException, IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index be4c8c4..1713f9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -77,6 +77,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -89,6 +91,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Token; @@ -820,4 +823,12 @@ public class YarnClientImpl extends YarnClient { return rmClient.getClusterNodeLabels( GetClusterNodeLabelsRequest.newInstance()).getNodeLabels(); } + + @Override + public void updateApplicationPriority(ApplicationId applicationId, + Priority priority) throws YarnException, IOException { + UpdateApplicationPriorityRequest request = + UpdateApplicationPriorityRequest.newInstance(applicationId, priority); + rmClient.updateApplicationPriority(request); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 6263814..ab29366 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; @@ -73,6 +74,8 @@ public class ApplicationCLI extends YarnCLI { public static final String APPLICATION = "application"; public static final String APPLICATION_ATTEMPT = "applicationattempt"; public static final String CONTAINER = "container"; + public static final String APP_ID = "appId"; + public static final String UPDATE_PRIORITY = "updatePriority"; private boolean allAppStates; @@ -117,10 +120,16 @@ public class ApplicationCLI extends YarnCLI { appStateOpt.setArgs(Option.UNLIMITED_VALUES); appStateOpt.setArgName("States"); opts.addOption(appStateOpt); + opts.addOption(APP_ID, true, "Specify Application Id to be operated"); + opts.addOption(UPDATE_PRIORITY, true, + "update priority of an application. ApplicationId can be" + + " passed using 'appId' option."); opts.getOption(KILL_CMD).setArgName("Application ID"); opts.getOption(MOVE_TO_QUEUE_CMD).setArgName("Application ID"); opts.getOption(QUEUE_CMD).setArgName("Queue Name"); opts.getOption(STATUS_CMD).setArgName("Application ID"); + opts.getOption(APP_ID).setArgName("Application ID"); + opts.getOption(UPDATE_PRIORITY).setArgName("Priority"); } else if (args.length > 0 && args[0].equalsIgnoreCase(APPLICATION_ATTEMPT)) { title = APPLICATION_ATTEMPT; opts.addOption(STATUS_CMD, true, @@ -238,6 +247,13 @@ public class ApplicationCLI extends YarnCLI { } else if (cliParser.hasOption(HELP_CMD)) { printUsage(title, opts); return 0; + } else if (cliParser.hasOption(UPDATE_PRIORITY)) { + if (!cliParser.hasOption(APP_ID)) { + printUsage(title, opts); + return exitCode; + } + updateApplicationPriority(cliParser.getOptionValue(APP_ID), + cliParser.getOptionValue(UPDATE_PRIORITY)); } else { syserr.println("Invalid Command Usage : "); printUsage(title, opts); @@ -619,4 +635,17 @@ public class ApplicationCLI extends YarnCLI { } writer.flush(); } + + /** + * Updates priority of an application with the given ID. + */ + private void updateApplicationPriority(String applicationId, String priority) + throws YarnException, IOException { + ApplicationId appId = ConverterUtils.toApplicationId(applicationId); + Priority newAppPriority = Priority.newInstance(Integer.parseInt(priority)); + sysout.println("Updating priority of an aplication " + applicationId); + client.updateApplicationPriority(appId, newAppPriority); + sysout.println("Successfully updated the priority of any application " + + applicationId); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 8f17c8f..f942a4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -1409,6 +1409,31 @@ public class TestYarnCLI { Assert.assertNotSame("should return non-zero exit code.", 0, exitCode); } + @Test(timeout = 60000) + public void testUpdateApplicationPriority() throws Exception { + ApplicationCLI cli = createAndGetAppCLI(); + ApplicationId applicationId = ApplicationId.newInstance(1234, 6); + + ApplicationReport appReport = + ApplicationReport.newInstance(applicationId, + ApplicationAttemptId.newInstance(applicationId, 1), "user", + "queue", "appname", "host", 124, null, + YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, + FinalApplicationStatus.UNDEFINED, null, "N/A", 0.53789f, "YARN", + null); + when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( + appReport); + + int result = + cli.run(new String[] { "application", "-appId", + applicationId.toString(), + "-updatePriority", "1" }); + Assert.assertEquals(result, 0); + verify(client).updateApplicationPriority(any(ApplicationId.class), + any(Priority.class)); + + } + private void verifyUsageInfo(YarnCLI cli) throws Exception { cli.setSysErrPrintStream(sysErr); cli.run(new String[] { "application" }); @@ -1458,6 +1483,7 @@ public class TestYarnCLI { ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintWriter pw = new PrintWriter(baos); pw.println("usage: application"); + pw.println(" -appId <Application ID> Specify Application Id to be operated"); pw.println(" -appStates <States> Works with -list to filter applications"); pw.println(" based on input comma-separated list of"); pw.println(" application states. The valid application"); @@ -1480,6 +1506,9 @@ public class TestYarnCLI { pw.println(" specify which queue to move an"); pw.println(" application to."); pw.println(" -status <Application ID> Prints the status of the application."); + pw.println(" -updatePriority <Priority> update priority of an application."); + pw.println(" ApplicationId can be passed using 'appId'"); + pw.println(" option."); pw.close(); String appsHelpStr = baos.toString("UTF-8"); return appsHelpStr; http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java index 959f399..9ccc326 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java @@ -75,6 +75,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; @@ -121,6 +123,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionR import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -143,6 +147,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueue import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationDeleteRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import com.google.protobuf.ServiceException; @@ -507,4 +513,18 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP return null; } } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) throws YarnException, IOException { + UpdateApplicationPriorityRequestProto requestProto = + ((UpdateApplicationPriorityRequestPBImpl) request).getProto(); + try { + return new UpdateApplicationPriorityResponsePBImpl( + proxy.updateApplicationPriority(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java index 36bd3af..6ca2136 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl; @@ -96,6 +97,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionR import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -137,6 +140,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionReque import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; @@ -507,4 +512,21 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient throw new ServiceException(e); } } + + @Override + public UpdateApplicationPriorityResponseProto updateApplicationPriority( + RpcController controller, UpdateApplicationPriorityRequestProto proto) + throws ServiceException { + UpdateApplicationPriorityRequestPBImpl request = + new UpdateApplicationPriorityRequestPBImpl(proto); + try { + UpdateApplicationPriorityResponse response = + real.updateApplicationPriority(request); + return ((UpdateApplicationPriorityResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationPriorityRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationPriorityRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationPriorityRequestPBImpl.java new file mode 100644 index 0000000..c8f5a1a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationPriorityRequestPBImpl.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class UpdateApplicationPriorityRequestPBImpl extends + UpdateApplicationPriorityRequest { + + UpdateApplicationPriorityRequestProto proto = + UpdateApplicationPriorityRequestProto + .getDefaultInstance(); + UpdateApplicationPriorityRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + private Priority applicationPriority = null; + + public UpdateApplicationPriorityRequestPBImpl() { + builder = UpdateApplicationPriorityRequestProto.newBuilder(); + } + + public UpdateApplicationPriorityRequestPBImpl( + UpdateApplicationPriorityRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public UpdateApplicationPriorityRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UpdateApplicationPriorityRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + if (this.applicationPriority != null) { + builder + .setApplicationPriority(convertToProtoFormat(this.applicationPriority)); + } + } + + @Override + public Priority getApplicationPriority() { + UpdateApplicationPriorityRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.applicationPriority != null) { + return this.applicationPriority; + } + if (!p.hasApplicationPriority()) { + return null; + } + this.applicationPriority = + convertFromProtoFormat(p.getApplicationPriority()); + return this.applicationPriority; + } + + @Override + public void setApplicationPriority(Priority priority) { + maybeInitBuilder(); + if (priority == null) + builder.clearApplicationPriority(); + this.applicationPriority = priority; + } + + @Override + public ApplicationId getApplicationId() { + UpdateApplicationPriorityRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.applicationId != null) { + return applicationId; + } // Else via proto + if (!p.hasApplicationId()) { + return null; + } + applicationId = convertFromProtoFormat(p.getApplicationId()); + return applicationId; + } + + @Override + public void setApplicationId(ApplicationId applicationId) { + maybeInitBuilder(); + if (applicationId == null) + builder.clearApplicationId(); + this.applicationId = applicationId; + } + + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { + return new PriorityPBImpl(p); + } + + private PriorityProto convertToProtoFormat(Priority t) { + return ((PriorityPBImpl) t).getProto(); + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationPriorityResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationPriorityResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationPriorityResponsePBImpl.java new file mode 100644 index 0000000..0898e5a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationPriorityResponsePBImpl.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; + +import com.google.protobuf.TextFormat; + +public class UpdateApplicationPriorityResponsePBImpl extends + UpdateApplicationPriorityResponse { + + UpdateApplicationPriorityResponseProto proto = + UpdateApplicationPriorityResponseProto.getDefaultInstance(); + UpdateApplicationPriorityResponseProto.Builder builder = null; + boolean viaProto = false; + + public UpdateApplicationPriorityResponsePBImpl() { + builder = UpdateApplicationPriorityResponseProto.newBuilder(); + } + + public UpdateApplicationPriorityResponsePBImpl( + UpdateApplicationPriorityResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public UpdateApplicationPriorityResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 2dcfe9a..3e16165 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -93,6 +93,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -105,6 +107,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; @@ -1304,4 +1307,74 @@ public class ClientRMService extends AbstractService implements } return callerUGI.getShortUserName(); } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) throws YarnException, + IOException { + + ApplicationId applicationId = request.getApplicationId(); + Priority newAppPriority = request.getApplicationPriority(); + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + RMAuditLogger.logFailure("UNKNOWN", AuditConstants.UPDATE_APP_PRIORITY, + "UNKNOWN", "ClientRMService", "Error getting UGI", applicationId); + throw RPCUtil.getRemoteException(ie); + } + + RMApp application = this.rmContext.getRMApps().get(applicationId); + if (application == null) { + RMAuditLogger.logFailure(callerUGI.getUserName(), + AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", + "Trying to update priority of an absent application", applicationId); + throw new ApplicationNotFoundException( + "Trying to update priority o an absent application " + applicationId); + } + + if (!checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.MODIFY_APP, application)) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_PRIORITY, + "User doesn't have permissions to " + + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", + AuditConstants.UNAUTHORIZED_USER, applicationId); + throw RPCUtil.getRemoteException(new AccessControlException("User " + + callerUGI.getShortUserName() + " cannot perform operation " + + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); + } + + // Update priority only when app is tracked by the scheduler + if (!EnumSet.of(RMAppState.ACCEPTED, RMAppState.RUNNING).contains( + application.getState())) { + String msg = + "Application in " + application.getState() + + " state cannot be update priority."; + RMAuditLogger + .logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", + msg); + throw new YarnException(msg); + } + + try { + rmContext.getScheduler().updateApplicationPriority(newAppPriority, + applicationId); + } catch (YarnException ex) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", + ex.getMessage()); + throw ex; + } + + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId); + UpdateApplicationPriorityResponse response = + recordFactory + .newRecordInstance(UpdateApplicationPriorityResponse.class); + return response; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index db8a46a..f049d97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -54,6 +54,8 @@ public class RMAuditLogger { public static final String UNREGISTER_AM = "Unregister App Master"; public static final String ALLOC_CONTAINER = "AM Allocated Container"; public static final String RELEASE_CONTAINER = "AM Released Container"; + public static final String UPDATE_APP_PRIORITY = + "Update Application Priority Request"; // Some commonly used descriptions public static final String UNAUTHORIZED_USER = "Unauthorized user"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index affbee1..b7f1e6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -116,7 +116,7 @@ public abstract class RMStateStore extends AbstractService { .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, RMStateStoreEventType.STORE_APP, new StoreAppTransition()) .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, - RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition()) + RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition()) .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition()) .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, @@ -215,8 +215,10 @@ public abstract class RMStateStore extends AbstractService { LOG.info("Updating info for app: " + appId); try { store.updateApplicationStateInternal(appId, appState); - store.notifyApplication(new RMAppEvent(appId, - RMAppEventType.APP_UPDATE_SAVED)); + if (((RMStateUpdateAppEvent) event).isNotifyApplication()) { + store.notifyApplication(new RMAppEvent(appId, + RMAppEventType.APP_UPDATE_SAVED)); + } } catch (Exception e) { LOG.error("Error updating app: " + appId, e); store.notifyStoreOperationFailed(e); @@ -707,8 +709,8 @@ public abstract class RMStateStore extends AbstractService { } public void updateApplicationStateSynchronously( - ApplicationStateData appState) { - handleStoreEvent(new RMStateUpdateAppEvent(appState)); + ApplicationStateData appState, boolean notifyApp) { + handleStoreEvent(new RMStateUpdateAppEvent(appState, notifyApp)); } public void updateFencedState() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java index cec364c..69169dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java @@ -22,13 +22,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio public class RMStateUpdateAppEvent extends RMStateStoreEvent { private final ApplicationStateData appState; + // After application state is updated in state store, + // should notify back to application or not + private boolean notifyApplication; public RMStateUpdateAppEvent(ApplicationStateData appState) { super(RMStateStoreEventType.UPDATE_APP); this.appState = appState; + this.notifyApplication = true; + } + + public RMStateUpdateAppEvent(ApplicationStateData appState, boolean notifyApp) { + this(appState); + this.notifyApplication = notifyApp; } public ApplicationStateData getAppState() { return appState; } + + public boolean isNotifyApplication() { + return notifyApplication; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index b4b1383..6a3e4c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -785,6 +785,17 @@ public class CapacityScheduler extends } application.setCurrentAppAttempt(attempt); + // Update attempt priority to the latest to avoid race condition i.e + // SchedulerApplicationAttempt is created with old priority but it is not + // set to SchedulerApplication#setCurrentAppAttempt. + // Scenario would occur is + // 1. SchdulerApplicationAttempt is created with old priority. + // 2. updateApplicationPriority() updates SchedulerApplication. Since + // currentAttempt is null, it just return. + // 3. ScheduelerApplcationAttempt is set in + // SchedulerApplication#setCurrentAppAttempt. + attempt.setPriority(application.getPriority()); + queue.submitApplicationAttempt(attempt, application.getUser()); LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " @@ -1853,7 +1864,7 @@ public class CapacityScheduler extends } @Override - public synchronized void updateApplicationPriority(Priority newPriority, + public void updateApplicationPriority(Priority newPriority, ApplicationId applicationId) throws YarnException { Priority appPriority = null; SchedulerApplication<FiCaSchedulerApp> application = applications @@ -1879,7 +1890,8 @@ public class CapacityScheduler extends ApplicationStateData appState = ApplicationStateData.newInstance( rmApp.getSubmitTime(), rmApp.getStartTime(), rmApp.getApplicationSubmissionContext(), rmApp.getUser()); - rmContext.getStateStore().updateApplicationStateSynchronously(appState); + rmContext.getStateStore().updateApplicationStateSynchronously(appState, + false); // As we use iterator over a TreeSet for OrderingPolicy, once we change // priority then reinsert back to make order correct. http://git-wip-us.apache.org/repos/asf/hadoop/blob/57c7ae1a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index b9e1d81..8031759 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -102,6 +103,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; @@ -131,6 +133,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSyst import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -1543,4 +1546,64 @@ public class TestClientRMService { rpc.stopProxy(client, conf); rm.close(); } + + @Test(timeout = 120000) + public void testUpdateApplicationPriorityRequest() throws Exception { + int maxPriority = 10; + int appPriorty = 5; + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, + maxPriority); + MockRM rm = new MockRM(conf); + rm.init(conf); + rm.start(); + + // Start app1 with appPriority 5 + RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriorty)); + + Assert.assertEquals("Incorrect priority has been set to application", + appPriorty, app1.getApplicationSubmissionContext().getPriority() + .getPriority()); + + appPriorty = 9; + ClientRMService rmService = rm.getClientRMService(); + UpdateApplicationPriorityRequest updateRequest = + UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(), + Priority.newInstance(appPriorty)); + + rmService.updateApplicationPriority(updateRequest); + + Assert.assertEquals("Incorrect priority has been set to application", + appPriorty, app1.getApplicationSubmissionContext().getPriority() + .getPriority()); + + rm.killApp(app1.getApplicationId()); + rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); + + // Update priority request for application in KILLED state + try { + rmService.updateApplicationPriority(updateRequest); + Assert.fail("Can not update priority for an application in KILLED state"); + } catch (YarnException e) { + String msg = + "Application in " + app1.getState() + + " state cannot be update priority."; + Assert.assertTrue("", msg.contains(e.getMessage())); + } + + // Update priority request for invalid application id. + ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3); + updateRequest = + UpdateApplicationPriorityRequest.newInstance(invalidAppId, + Priority.newInstance(appPriorty)); + try { + rmService.updateApplicationPriority(updateRequest); + Assert + .fail("ApplicationNotFoundException should be thrown for invalid application id"); + } catch (ApplicationNotFoundException e) { + // Expected + } + + rm.stop(); + } }