YARN-5611. Provide an API to update lifetime of an application. Contributed by Rohith Sharma K S
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bcc15c62 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bcc15c62 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bcc15c62 Branch: refs/heads/YARN-2915 Commit: bcc15c6290b3912a054323695a6a931b0de163bd Parents: edbee9e Author: Jian He <[email protected]> Authored: Wed Nov 9 14:33:58 2016 -0800 Committer: Jian He <[email protected]> Committed: Wed Nov 9 16:08:05 2016 -0800 ---------------------------------------------------------------------- .../hadoop/mapred/TestClientRedirect.java | 9 + .../yarn/api/ApplicationClientProtocol.java | 23 ++ .../UpdateApplicationTimeoutsRequest.java | 81 +++++++ .../UpdateApplicationTimeoutsResponse.java | 46 ++++ .../records/ApplicationSubmissionContext.java | 4 + .../hadoop/yarn/conf/YarnConfiguration.java | 6 +- .../main/proto/applicationclient_protocol.proto | 1 + .../src/main/proto/yarn_protos.proto | 5 + .../src/main/proto/yarn_service_protos.proto | 9 + .../ApplicationClientProtocolPBClientImpl.java | 21 +- .../ApplicationClientProtocolPBServiceImpl.java | 22 ++ .../UpdateApplicationTimeoutsRequestPBImpl.java | 220 +++++++++++++++++++ ...UpdateApplicationTimeoutsResponsePBImpl.java | 73 ++++++ .../yarn/util/AbstractLivelinessMonitor.java | 17 +- .../java/org/apache/hadoop/yarn/util/Times.java | 33 +++ .../src/main/resources/yarn-default.xml | 4 +- .../amrmproxy/MockResourceManagerFacade.java | 9 + .../server/resourcemanager/ClientRMService.java | 137 +++++++++--- .../server/resourcemanager/RMAppManager.java | 37 ++++ .../server/resourcemanager/RMAuditLogger.java | 4 +- .../server/resourcemanager/RMServerUtils.java | 48 +++- .../resourcemanager/recovery/RMStateStore.java | 28 ++- .../recovery/RMStateUpdateAppEvent.java | 15 +- .../recovery/records/ApplicationStateData.java | 27 +++ .../impl/pb/ApplicationStateDataPBImpl.java | 86 ++++++++ .../server/resourcemanager/rmapp/RMApp.java | 3 + .../server/resourcemanager/rmapp/RMAppImpl.java | 64 +++++- .../rmapp/monitor/RMAppLifetimeMonitor.java | 72 +++--- .../scheduler/capacity/CapacityScheduler.java | 3 +- .../yarn_server_resourcemanager_recovery.proto | 1 + .../applicationsmanager/MockAsm.java | 6 + .../server/resourcemanager/rmapp/MockRMApp.java | 6 + .../rmapp/TestApplicationLifetimeMonitor.java | 150 ++++++++++++- 33 files changed, 1149 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 255f998..65eac65 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -124,6 +124,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -485,6 +487,13 @@ public class TestClientRedirect { SignalContainerRequest request) throws IOException { return null; } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return null; + } } class HistoryService extends AMService implements HSClientProtocol { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 8ee43fb..394454f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -566,4 +568,25 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol { SignalContainerResponse signalToContainer( SignalContainerRequest request) throws YarnException, IOException; + + /** + * <p> + * The interface used by client to set ApplicationTimeouts of an application. + * The UpdateApplicationTimeoutsRequest should have timeout value with + * absolute time with ISO8601 format <b>yyyy-MM-dd'T'HH:mm:ss.SSSZ</b>. + * </p> + * <b>Note:</b> If application timeout value is less than or equal to current + * time then update application throws YarnException. + * @param request to set ApplicationTimeouts of an application + * @return an empty response that the update has completed successfully. + * @throws YarnException if update request has empty values or application is + * in completing states. + * @throws IOException on IO failures + */ + @Public + @Unstable + @Idempotent + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java new file mode 100644 index 0000000..0e81e7e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.util.Records; + +/** + * <p> + * The request sent by the client to the <code>ResourceManager</code> to set or + * update the application timeout. + * </p> + * <p> + * The request includes the {@link ApplicationId} of the application and timeout + * to be set for an application + * </p> + */ +@Public +@Unstable +public abstract class UpdateApplicationTimeoutsRequest { + public static UpdateApplicationTimeoutsRequest newInstance( + ApplicationId applicationId, + Map<ApplicationTimeoutType, String> applicationTimeouts) { + UpdateApplicationTimeoutsRequest request = + Records.newRecord(UpdateApplicationTimeoutsRequest.class); + request.setApplicationId(applicationId); + request.setApplicationTimeouts(applicationTimeouts); + return request; + } + + /** + * Get the <code>ApplicationId</code> of the application. + * @return <code>ApplicationId</code> of the application + */ + public abstract ApplicationId getApplicationId(); + + /** + * Set the <code>ApplicationId</code> of the application. + * @param applicationId <code>ApplicationId</code> of the application + */ + public abstract void setApplicationId(ApplicationId applicationId); + + /** + * Get <code>ApplicationTimeouts</code> of the application. Timeout value is + * in ISO8601 standard with format <b>yyyy-MM-dd'T'HH:mm:ss.SSSZ</b>. + * @return all <code>ApplicationTimeouts</code> of the application. + */ + public abstract Map<ApplicationTimeoutType, String> getApplicationTimeouts(); + + /** + * Set the <code>ApplicationTimeouts</code> for the application. Timeout value + * is absolute. Timeout value should meet ISO8601 format. Support ISO8601 + * format is <b>yyyy-MM-dd'T'HH:mm:ss.SSSZ</b>. All pre-existing Map entries + * are cleared before adding the new Map. + * @param applicationTimeouts <code>ApplicationTimeouts</code>s for the + * application + */ + public abstract void setApplicationTimeouts( + Map<ApplicationTimeoutType, String> applicationTimeouts); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java new file mode 100644 index 0000000..bd02bb8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + * <p> + * The response sent by the <code>ResourceManager</code> to the client on update + * application timeout. + * </p> + * <p> + * A response without exception means that the update has completed + * successfully. + * </p> + */ +@Public +@Unstable +public abstract class UpdateApplicationTimeoutsResponse { + + public static UpdateApplicationTimeoutsResponse newInstance() { + UpdateApplicationTimeoutsResponse response = + Records.newRecord(UpdateApplicationTimeoutsResponse.class); + return response; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 83f601a..e562aaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -549,6 +549,10 @@ public abstract class ApplicationSubmissionContext { /** * Set the <code>ApplicationTimeouts</code> for the application in seconds. * All pre-existing Map entries are cleared before adding the new Map. + * <p> + * <b>Note:</b> If application timeout value is less than or equal to zero + * then application submission will throw an exception. + * </p> * @param applicationTimeouts <code>ApplicationTimeouts</code>s for the * application */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1fd25a7..b95bd1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1540,10 +1540,10 @@ public class YarnConfiguration extends Configuration { // Configurations for applicaiton life time monitor feature - public static final String RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS = - RM_PREFIX + "application-timeouts.lifetime-monitor.interval-ms"; + public static final String RM_APPLICATION_MONITOR_INTERVAL_MS = + RM_PREFIX + "application-timeouts.monitor.interval-ms"; - public static final long DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS = + public static final long DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS = 60000; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto index f1c3839..ba79db0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto @@ -60,4 +60,5 @@ service ApplicationClientProtocolService { rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto); rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto); + rpc updateApplicationTimeouts (UpdateApplicationTimeoutsRequestProto) returns (UpdateApplicationTimeoutsResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 9c746fd..b59d02b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -377,6 +377,11 @@ message ApplicationTimeoutMapProto { optional int64 timeout = 2; } +message ApplicationUpdateTimeoutMapProto { + optional ApplicationTimeoutTypeProto application_timeout_type = 1; + optional string expire_time = 2; +} + message LogAggregationContextProto { optional string include_pattern = 1 [default = ".*"]; optional string exclude_pattern = 2 [default = ""]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 6526bf9..d9230d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -267,6 +267,15 @@ message SignalContainerRequestProto { message SignalContainerResponseProto { } +message UpdateApplicationTimeoutsRequestProto { + required ApplicationIdProto applicationId = 1; + repeated ApplicationUpdateTimeoutMapProto application_timeouts = 2; +} + +message UpdateApplicationTimeoutsResponseProto { + repeated ApplicationUpdateTimeoutMapProto application_timeouts = 1; +} + ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// ////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java index 2d755a2..ad7cb29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java @@ -83,6 +83,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -139,6 +141,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateReque import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; @@ -165,7 +169,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationDeleteRequestPr import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import com.google.protobuf.ServiceException; @@ -600,4 +604,19 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP return null; } } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + UpdateApplicationTimeoutsRequestProto requestProto = + ((UpdateApplicationTimeoutsRequestPBImpl) request).getProto(); + try { + return new UpdateApplicationTimeoutsResponsePBImpl( + proxy.updateApplicationTimeouts(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java index 300ef57..93ce6a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; @@ -111,6 +112,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequest import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -162,6 +165,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationListResponsePro import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; @@ -609,4 +614,21 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient throw new ServiceException(e); } } + + @Override + public UpdateApplicationTimeoutsResponseProto updateApplicationTimeouts( + RpcController controller, UpdateApplicationTimeoutsRequestProto proto) + throws ServiceException { + UpdateApplicationTimeoutsRequestPBImpl request = + new UpdateApplicationTimeoutsRequestPBImpl(proto); + try { + UpdateApplicationTimeoutsResponse response = + real.updateApplicationTimeouts(request); + return ((UpdateApplicationTimeoutsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java new file mode 100644 index 0000000..1f86c55 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationUpdateTimeoutMapProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class UpdateApplicationTimeoutsRequestPBImpl + extends UpdateApplicationTimeoutsRequest { + + UpdateApplicationTimeoutsRequestProto proto = + UpdateApplicationTimeoutsRequestProto.getDefaultInstance(); + UpdateApplicationTimeoutsRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + private Map<ApplicationTimeoutType, String> applicationTimeouts = null; + + public UpdateApplicationTimeoutsRequestPBImpl() { + builder = UpdateApplicationTimeoutsRequestProto.newBuilder(); + } + + public UpdateApplicationTimeoutsRequestPBImpl( + UpdateApplicationTimeoutsRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public UpdateApplicationTimeoutsRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UpdateApplicationTimeoutsRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + if (this.applicationTimeouts != null) { + addApplicationTimeouts(); + } + } + + @Override + public ApplicationId getApplicationId() { + UpdateApplicationTimeoutsRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.applicationId != null) { + return applicationId; + } // Else via proto + if (!p.hasApplicationId()) { + return null; + } + applicationId = convertFromProtoFormat(p.getApplicationId()); + return applicationId; + } + + @Override + public void setApplicationId(ApplicationId applicationId) { + maybeInitBuilder(); + if (applicationId == null) { + builder.clearApplicationId(); + } + this.applicationId = applicationId; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public Map<ApplicationTimeoutType, String> getApplicationTimeouts() { + initApplicationTimeout(); + return this.applicationTimeouts; + } + + private void initApplicationTimeout() { + if (this.applicationTimeouts != null) { + return; + } + UpdateApplicationTimeoutsRequestProtoOrBuilder p = + viaProto ? proto : builder; + List<ApplicationUpdateTimeoutMapProto> lists = + p.getApplicationTimeoutsList(); + this.applicationTimeouts = + new HashMap<ApplicationTimeoutType, String>(lists.size()); + for (ApplicationUpdateTimeoutMapProto timeoutProto : lists) { + this.applicationTimeouts.put( + ProtoUtils + .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()), + timeoutProto.getExpireTime()); + } + } + + @Override + public void setApplicationTimeouts( + Map<ApplicationTimeoutType, String> appTimeouts) { + if (appTimeouts == null) { + return; + } + initApplicationTimeout(); + this.applicationTimeouts.clear(); + this.applicationTimeouts.putAll(appTimeouts); + } + + private void addApplicationTimeouts() { + maybeInitBuilder(); + builder.clearApplicationTimeouts(); + if (applicationTimeouts == null) { + return; + } + Iterable<? extends ApplicationUpdateTimeoutMapProto> values = + new Iterable<ApplicationUpdateTimeoutMapProto>() { + + @Override + public Iterator<ApplicationUpdateTimeoutMapProto> iterator() { + return new Iterator<ApplicationUpdateTimeoutMapProto>() { + private Iterator<ApplicationTimeoutType> iterator = + applicationTimeouts.keySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ApplicationUpdateTimeoutMapProto next() { + ApplicationTimeoutType key = iterator.next(); + return ApplicationUpdateTimeoutMapProto.newBuilder() + .setExpireTime(applicationTimeouts.get(key)) + .setApplicationTimeoutType( + ProtoUtils.convertToProtoFormat(key)) + .build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationTimeouts(values); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java new file mode 100644 index 0000000..74f1715 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class UpdateApplicationTimeoutsResponsePBImpl + extends UpdateApplicationTimeoutsResponse { + UpdateApplicationTimeoutsResponseProto proto = + UpdateApplicationTimeoutsResponseProto.getDefaultInstance(); + UpdateApplicationTimeoutsResponseProto.Builder builder = null; + boolean viaProto = false; + + public UpdateApplicationTimeoutsResponsePBImpl() { + builder = UpdateApplicationTimeoutsResponseProto.newBuilder(); + } + + public UpdateApplicationTimeoutsResponsePBImpl( + UpdateApplicationTimeoutsResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public UpdateApplicationTimeoutsResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java index b605026..638128e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java @@ -46,6 +46,7 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService { public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins private long expireInterval = DEFAULT_EXPIRE; private long monitorInterval = expireInterval / 3; + private volatile boolean resetTimerOnStart = true; private final Clock clock; @@ -105,8 +106,8 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService { register(ob, clock.getTime()); } - public synchronized void register(O ob, long monitorStartTime) { - running.put(ob, monitorStartTime); + public synchronized void register(O ob, long expireTime) { + running.put(ob, expireTime); } public synchronized void unregister(O ob) { @@ -114,12 +115,18 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService { } public synchronized void resetTimer() { - long time = clock.getTime(); - for (O ob : running.keySet()) { - running.put(ob, time); + if (resetTimerOnStart) { + long time = clock.getTime(); + for (O ob : running.keySet()) { + running.put(ob, time); + } } } + protected void setResetTimeOnStart(boolean resetTimeOnStart) { + this.resetTimerOnStart = resetTimeOnStart; + } + private class PingChecker implements Runnable { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java index 8ae3842..f113bd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.util; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; @@ -29,6 +30,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; public class Times { private static final Log LOG = LogFactory.getLog(Times.class); + static final String ISO8601DATEFORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; + // This format should match the one used in yarn.dt.plugins.js static final ThreadLocal<SimpleDateFormat> dateFormat = new ThreadLocal<SimpleDateFormat>() { @@ -37,6 +40,14 @@ public class Times { } }; + static final ThreadLocal<SimpleDateFormat> isoFormat = + new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat(ISO8601DATEFORMAT); + } + }; + public static long elapsed(long started, long finished) { return Times.elapsed(started, finished, true); } @@ -74,4 +85,26 @@ public class Times { return ts > 0 ? String.valueOf(dateFormat.get().format(new Date(ts))) : "N/A"; } + + /** + * Given a time stamp returns ISO-8601 formated string in format + * "yyyy-MM-dd'T'HH:mm:ss.SSSZ". + * @param ts to be formatted in ISO format. + * @return ISO 8601 formatted string. + */ + public static String formatISO8601(long ts) { + return isoFormat.get().format(new Date(ts)); + } + + /** + * Given ISO formatted string with format "yyyy-MM-dd'T'HH:mm:ss.SSSZ", return + * epoch time for local Time zone. + * @param isoString in format of "yyyy-MM-dd'T'HH:mm:ss.SSSZ". + * @return epoch time for local time zone. + * @throws ParseException if given ISO formatted string can not be parsed. + */ + public static long parseISO8601ToLocalTimeInMillis(String isoString) + throws ParseException { + return isoFormat.get().parse(isoString).getTime(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 834ead7..019166b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3048,9 +3048,9 @@ <property> <description> - The RMAppLifetimeMonitor Service uses this value as lifetime monitor interval + The RMAppLifetimeMonitor Service uses this value as monitor interval </description> - <name>yarn.resourcemanager.application-timeouts.lifetime-monitor.interval-ms</name> + <name>yarn.resourcemanager.application-timeouts.monitor.interval-ms</name> <value>60000</value> </property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index f02e306..c69313f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -91,6 +91,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.AMCommand; @@ -497,4 +499,11 @@ return null; FailApplicationAttemptRequest request) throws YarnException, IOException { throw new NotImplementedException(); } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index e9bd230..c8af526 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -108,12 +108,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -1589,37 +1592,11 @@ public class ClientRMService extends AbstractService implements ApplicationId applicationId = request.getApplicationId(); Priority newAppPriority = request.getApplicationPriority(); - UserGroupInformation callerUGI; - try { - callerUGI = UserGroupInformation.getCurrentUser(); - } catch (IOException ie) { - LOG.info("Error getting UGI ", ie); - RMAuditLogger.logFailure("UNKNOWN", AuditConstants.UPDATE_APP_PRIORITY, - "UNKNOWN", "ClientRMService", "Error getting UGI", applicationId); - throw RPCUtil.getRemoteException(ie); - } - - RMApp application = this.rmContext.getRMApps().get(applicationId); - if (application == null) { - RMAuditLogger.logFailure(callerUGI.getUserName(), - AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", - "Trying to update priority of an absent application", applicationId); - throw new ApplicationNotFoundException( - "Trying to update priority of an absent application " - + applicationId); - } - if (!checkAccess(callerUGI, application.getUser(), - ApplicationAccessType.MODIFY_APP, application)) { - RMAuditLogger.logFailure(callerUGI.getShortUserName(), - AuditConstants.UPDATE_APP_PRIORITY, - "User doesn't have permissions to " - + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", - AuditConstants.UNAUTHORIZED_USER, applicationId); - throw RPCUtil.getRemoteException(new AccessControlException("User " - + callerUGI.getShortUserName() + " cannot perform operation " - + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); - } + UserGroupInformation callerUGI = + getCallerUgi(applicationId, AuditConstants.UPDATE_APP_PRIORITY); + RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, + AuditConstants.UPDATE_APP_PRIORITY); UpdateApplicationPriorityResponse response = recordFactory .newRecordInstance(UpdateApplicationPriorityResponse.class); @@ -1724,4 +1701,104 @@ public class ClientRMService extends AbstractService implements .newRecordInstance(SignalContainerResponse.class); } + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + + ApplicationId applicationId = request.getApplicationId(); + Map<ApplicationTimeoutType, String> applicationTimeouts = + request.getApplicationTimeouts(); + + UserGroupInformation callerUGI = + getCallerUgi(applicationId, AuditConstants.UPDATE_APP_TIMEOUTS); + RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, + AuditConstants.UPDATE_APP_TIMEOUTS); + + if (applicationTimeouts.isEmpty()) { + String message = + "At least one ApplicationTimeoutType should be configured" + + " for updating timeouts."; + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", + message, applicationId); + throw RPCUtil.getRemoteException(message); + } + + UpdateApplicationTimeoutsResponse response = recordFactory + .newRecordInstance(UpdateApplicationTimeoutsResponse.class); + + RMAppState state = application.getState(); + if (!EnumSet + .of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.RUNNING) + .contains(state)) { + if (COMPLETED_APP_STATES.contains(state)) { + // If Application is in any of the final states, update timeout + // can be skipped rather throwing exception. + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", + applicationId); + return response; + } + String msg = + "Application is in " + state + " state can not update timeout."; + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", + msg); + throw RPCUtil.getRemoteException(msg); + } + + try { + rmAppManager.updateApplicationTimeout(application, applicationTimeouts); + } catch (YarnException ex) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", + ex.getMessage()); + throw RPCUtil.getRemoteException(ex); + } + + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId); + return response; + } + + private UserGroupInformation getCallerUgi(ApplicationId applicationId, + String operation) throws YarnException { + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + RMAuditLogger.logFailure("UNKNOWN", operation, "UNKNOWN", + "ClientRMService", "Error getting UGI", applicationId); + throw RPCUtil.getRemoteException(ie); + } + return callerUGI; + } + + private RMApp verifyUserAccessForRMApp(ApplicationId applicationId, + UserGroupInformation callerUGI, String operation) throws YarnException { + RMApp application = this.rmContext.getRMApps().get(applicationId); + if (application == null) { + RMAuditLogger.logFailure(callerUGI.getUserName(), operation, "UNKNOWN", + "ClientRMService", + "Trying to " + operation + " of an absent application", + applicationId); + throw new ApplicationNotFoundException("Trying to " + operation + + " of an absent application " + applicationId); + } + + if (!checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.MODIFY_APP, application)) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), operation, + "User doesn't have permissions to " + + ApplicationAccessType.MODIFY_APP.toString(), + "ClientRMService", AuditConstants.UNAUTHORIZED_USER, applicationId); + throw RPCUtil.getRemoteException(new AccessControlException("User " + + callerUGI.getShortUserName() + " cannot perform operation " + + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); + } + return application; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index c065b60..7144421 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -66,6 +67,8 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; /** * This class manages the list of applications for the resource manager. @@ -509,4 +512,38 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>, LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } } + + // transaction method. + public void updateApplicationTimeout(RMApp app, + Map<ApplicationTimeoutType, String> newTimeoutInISO8601Format) + throws YarnException { + ApplicationId applicationId = app.getApplicationId(); + synchronized (applicationId) { + Map<ApplicationTimeoutType, Long> newExpireTime = RMServerUtils + .validateISO8601AndConvertToLocalTimeEpoch(newTimeoutInISO8601Format); + + SettableFuture<Object> future = SettableFuture.create(); + + Map<ApplicationTimeoutType, Long> currentExpireTimeouts = + app.getApplicationTimeouts(); + currentExpireTimeouts.putAll(newExpireTime); + + ApplicationStateData appState = + ApplicationStateData.newInstance(app.getSubmitTime(), + app.getStartTime(), app.getApplicationSubmissionContext(), + app.getUser(), app.getCallerContext()); + appState.setApplicationTimeouts(currentExpireTimeouts); + + // update to state store. Though it synchronous call, update via future to + // know any exception has been set. It is required because in non-HA mode, + // state-store errors are skipped. + this.rmContext.getStateStore() + .updateApplicationStateSynchronously(appState, false, future); + + Futures.get(future, YarnException.class); + + // update in-memory + ((RMAppImpl) app).updateApplicationTimeout(newExpireTime); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index 0361059..d52e002 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -63,7 +63,9 @@ public class RMAuditLogger { public static final String ALLOC_CONTAINER = "AM Allocated Container"; public static final String RELEASE_CONTAINER = "AM Released Container"; public static final String UPDATE_APP_PRIORITY = - "Update Application Priority Request"; + "Update Application Priority"; + public static final String UPDATE_APP_TIMEOUTS = + "Update Application Timeouts"; public static final String CHANGE_CONTAINER_RESOURCE = "AM Changed Container Resource"; public static final String SIGNAL_CONTAINER = "Signal Container Request"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index b2a085a..7e31e70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.text.ParseException; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -69,6 +70,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -89,6 +93,8 @@ public class RMServerUtils { protected static final RecordFactory RECORD_FACTORY = RecordFactoryProvider.getRecordFactory(null); + private static Clock clock = SystemClock.getInstance(); + public static List<RMNode> queryRMNodes(RMContext context, EnumSet<NodeState> acceptedStates) { // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING. @@ -398,6 +404,7 @@ public class RMServerUtils { case FINISHING: case FINISHED: return YarnApplicationState.FINISHED; + case KILLING: case KILLED: return YarnApplicationState.KILLED; case FAILED: @@ -475,7 +482,7 @@ public class RMServerUtils { if (timeouts != null) { for (Map.Entry<ApplicationTimeoutType, Long> timeout : timeouts .entrySet()) { - if (timeout.getValue() < 0) { + if (timeout.getValue() <= 0) { String message = "Invalid application timeout, value=" + timeout.getValue() + " for type=" + timeout.getKey(); throw new YarnException(message); @@ -483,4 +490,43 @@ public class RMServerUtils { } } } + + /** + * Validate ISO8601 format with epoch time. + * @param timeoutsInISO8601 format + * @return expire time in local epoch + * @throws YarnException if given application timeout value is lesser than + * current time. + */ + public static Map<ApplicationTimeoutType, Long> validateISO8601AndConvertToLocalTimeEpoch( + Map<ApplicationTimeoutType, String> timeoutsInISO8601) + throws YarnException { + long currentTimeMillis = clock.getTime(); + Map<ApplicationTimeoutType, Long> newApplicationTimeout = + new HashMap<ApplicationTimeoutType, Long>(); + if (timeoutsInISO8601 != null) { + for (Map.Entry<ApplicationTimeoutType, String> timeout : timeoutsInISO8601 + .entrySet()) { + long expireTime = 0L; + try { + expireTime = + Times.parseISO8601ToLocalTimeInMillis(timeout.getValue()); + } catch (ParseException ex) { + String message = + "Expire time is not in ISO8601 format. ISO8601 supported " + + "format is yyyy-MM-dd'T'HH:mm:ss.SSSZ"; + throw new YarnException(message); + } + if (expireTime < currentTimeMillis) { + String message = + "Expire time is less than current time, current-time=" + + Times.formatISO8601(currentTimeMillis) + " expire-time=" + + Times.formatISO8601(expireTime); + throw new YarnException(message); + } + newApplicationTimeout.put(timeout.getKey(), expireTime); + } + } + return newApplicationTimeout; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index de273c4..d1f8b40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -31,13 +31,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import javax.crypto.SecretKey; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.SettableFuture; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.AbstractService; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; @@ -237,6 +239,8 @@ public abstract class RMStateStore extends AbstractService { boolean isFenced = false; ApplicationStateData appState = ((RMStateUpdateAppEvent) event).getAppState(); + SettableFuture<Object> result = + ((RMStateUpdateAppEvent) event).getResult(); ApplicationId appId = appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Updating info for app: " + appId); @@ -246,9 +250,18 @@ public abstract class RMStateStore extends AbstractService { store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED)); } + + if (result != null) { + result.set(null); + } + } catch (Exception e) { - LOG.error("Error updating app: " + appId, e); + String msg = "Error updating app: " + appId; + LOG.error(msg, e); isFenced = store.notifyStoreOperationFailedInternal(e); + if (result != null) { + result.setException(new YarnException(msg, e)); + } } return finalState(isFenced); }; @@ -774,18 +787,19 @@ public abstract class RMStateStore extends AbstractService { ApplicationStateData appState = ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), context, app.getUser(), app.getCallerContext()); + appState.setApplicationTimeouts(app.getApplicationTimeouts()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } @SuppressWarnings("unchecked") - public void updateApplicationState( - ApplicationStateData appState) { + public void updateApplicationState(ApplicationStateData appState) { dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); } - public void updateApplicationStateSynchronously( - ApplicationStateData appState, boolean notifyApp) { - handleStoreEvent(new RMStateUpdateAppEvent(appState, notifyApp)); + public void updateApplicationStateSynchronously(ApplicationStateData appState, + boolean notifyApp, SettableFuture<Object> resultFuture) { + handleStoreEvent( + new RMStateUpdateAppEvent(appState, notifyApp, resultFuture)); } public void updateFencedState() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java index 69169dd..0a6220b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java @@ -20,21 +20,28 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import com.google.common.util.concurrent.SettableFuture; + public class RMStateUpdateAppEvent extends RMStateStoreEvent { private final ApplicationStateData appState; // After application state is updated in state store, // should notify back to application or not private boolean notifyApplication; + private SettableFuture<Object> future; public RMStateUpdateAppEvent(ApplicationStateData appState) { super(RMStateStoreEventType.UPDATE_APP); this.appState = appState; this.notifyApplication = true; + this.future = null; } - public RMStateUpdateAppEvent(ApplicationStateData appState, boolean notifyApp) { - this(appState); + public RMStateUpdateAppEvent(ApplicationStateData appState, boolean notifyApp, + SettableFuture<Object> future) { + super(RMStateStoreEventType.UPDATE_APP); + this.appState = appState; this.notifyApplication = notifyApp; + this.future = future; } public ApplicationStateData getAppState() { @@ -44,4 +51,8 @@ public class RMStateUpdateAppEvent extends RMStateStoreEvent { public boolean isNotifyApplication() { return notifyApplication; } + + public SettableFuture<Object> getResult() { + return future; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 2348380..79a5de2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -29,6 +29,7 @@ import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.Records; @@ -60,6 +61,25 @@ public abstract class ApplicationStateData { } public static ApplicationStateData newInstance(long submitTime, + long startTime, String user, + ApplicationSubmissionContext submissionContext, RMAppState state, + String diagnostics, long finishTime, CallerContext callerContext, + Map<ApplicationTimeoutType, Long> applicationTimeouts) { + ApplicationStateData appState = + Records.newRecord(ApplicationStateData.class); + appState.setSubmitTime(submitTime); + appState.setStartTime(startTime); + appState.setUser(user); + appState.setApplicationSubmissionContext(submissionContext); + appState.setState(state); + appState.setDiagnostics(diagnostics); + appState.setFinishTime(finishTime); + appState.setCallerContext(callerContext); + appState.setApplicationTimeouts(applicationTimeouts); + return appState; + } + + public static ApplicationStateData newInstance(long submitTime, long startTime, ApplicationSubmissionContext context, String user, CallerContext callerContext) { return newInstance(submitTime, startTime, user, context, null, "", 0, @@ -168,4 +188,11 @@ public abstract class ApplicationStateData { public abstract CallerContext getCallerContext(); public abstract void setCallerContext(CallerContext callerContext); + + @Public + public abstract Map<ApplicationTimeoutType, Long> getApplicationTimeouts(); + + @Public + public abstract void setApplicationTimeouts( + Map<ApplicationTimeoutType, Long> applicationTimeouts); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index 15ed770..d037e68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -18,10 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutMapProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAppStateProto; @@ -38,6 +46,7 @@ public class ApplicationStateDataPBImpl extends ApplicationStateData { boolean viaProto = false; private ApplicationSubmissionContext applicationSubmissionContext = null; + private Map<ApplicationTimeoutType, Long> applicationTimeouts = null; public ApplicationStateDataPBImpl() { builder = ApplicationStateDataProto.newBuilder(); @@ -63,6 +72,10 @@ public class ApplicationStateDataPBImpl extends ApplicationStateData { ((ApplicationSubmissionContextPBImpl)applicationSubmissionContext) .getProto()); } + + if (this.applicationTimeouts != null) { + addApplicationTimeouts(); + } } private void mergeLocalToProto() { @@ -256,4 +269,77 @@ public class ApplicationStateDataPBImpl extends ApplicationStateData { public static RMAppState convertFromProtoFormat(RMAppStateProto e) { return RMAppState.valueOf(e.name().replace(RM_APP_PREFIX, "")); } + + @Override + public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() { + initApplicationTimeout(); + return this.applicationTimeouts; + } + + private void initApplicationTimeout() { + if (this.applicationTimeouts != null) { + return; + } + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + List<ApplicationTimeoutMapProto> lists = p.getApplicationTimeoutsList(); + this.applicationTimeouts = + new HashMap<ApplicationTimeoutType, Long>(lists.size()); + for (ApplicationTimeoutMapProto timeoutProto : lists) { + this.applicationTimeouts.put( + ProtoUtils + .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()), + timeoutProto.getTimeout()); + } + } + + @Override + public void setApplicationTimeouts( + Map<ApplicationTimeoutType, Long> appTimeouts) { + if (appTimeouts == null) { + return; + } + initApplicationTimeout(); + this.applicationTimeouts.clear(); + this.applicationTimeouts.putAll(appTimeouts); + } + + private void addApplicationTimeouts() { + maybeInitBuilder(); + builder.clearApplicationTimeouts(); + if (applicationTimeouts == null) { + return; + } + Iterable<? extends ApplicationTimeoutMapProto> values = + new Iterable<ApplicationTimeoutMapProto>() { + + @Override + public Iterator<ApplicationTimeoutMapProto> iterator() { + return new Iterator<ApplicationTimeoutMapProto>() { + private Iterator<ApplicationTimeoutType> iterator = + applicationTimeouts.keySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ApplicationTimeoutMapProto next() { + ApplicationTimeoutType key = iterator.next(); + return ApplicationTimeoutMapProto.newBuilder() + .setTimeout(applicationTimeouts.get(key)) + .setApplicationTimeoutType( + ProtoUtils.convertToProtoFormat(key)) + .build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationTimeouts(values); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 98cbd92..cd08743 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -280,4 +281,6 @@ public interface RMApp extends EventHandler<RMAppEvent> { String getAppNodeLabelExpression(); CallerContext getCallerContext(); + + Map<ApplicationTimeoutType, Long> getApplicationTimeouts(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcc15c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 0fdc311..74e641d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -121,6 +121,9 @@ public class RMAppImpl implements RMApp, Recoverable { private static final Log LOG = LogFactory.getLog(RMAppImpl.class); private static final String UNAVAILABLE = "N/A"; + private static final EnumSet<RMAppState> COMPLETED_APP_STATES = + EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING); // Immutable fields private final ApplicationId applicationId; @@ -179,6 +182,8 @@ public class RMAppImpl implements RMApp, Recoverable { private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs = new HashMap<NodeId, List<String>>(); private final int maxLogAggregationDiagnosticsInMemory; + private Map<ApplicationTimeoutType, Long> applicationTimeouts = + new HashMap<ApplicationTimeoutType, Long>(); // These states stored are only valid when app is at killing or final_saving. private RMAppState stateBeforeKilling; @@ -897,6 +902,7 @@ public class RMAppImpl implements RMApp, Recoverable { this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); this.callerContext = appState.getCallerContext(); + this.applicationTimeouts = appState.getApplicationTimeouts(); // If interval > 0, some attempts might have been deleted. if (this.attemptFailuresValidityInterval > 0) { this.firstAttemptIdInStateStore = appState.getFirstAttemptId(); @@ -1109,17 +1115,16 @@ public class RMAppImpl implements RMApp, Recoverable { } } - long applicationLifetime = - app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); - if (applicationLifetime > 0) { + for (Map.Entry<ApplicationTimeoutType, Long> timeout : + app.applicationTimeouts.entrySet()) { app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, - ApplicationTimeoutType.LIFETIME, app.submitTime, - applicationLifetime * 1000); + timeout.getKey(), timeout.getValue()); if (LOG.isDebugEnabled()) { + long remainingTime = timeout.getValue() - app.systemClock.getTime(); LOG.debug("Application " + app.applicationId - + " is registered for timeout monitor, type=" - + ApplicationTimeoutType.LIFETIME + " value=" - + applicationLifetime + " seconds"); + + " is registered for timeout monitor, type=" + timeout.getKey() + + " remaining timeout=" + + (remainingTime > 0 ? remainingTime / 1000 : 0) + " seconds"); } } @@ -1235,10 +1240,17 @@ public class RMAppImpl implements RMApp, Recoverable { long applicationLifetime = app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); if (applicationLifetime > 0) { + // calculate next timeout value + Long newTimeout = + Long.valueOf(app.submitTime + (applicationLifetime * 1000)); app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, - ApplicationTimeoutType.LIFETIME, app.submitTime, - applicationLifetime * 1000); - LOG.debug("Application " + app.applicationId + ApplicationTimeoutType.LIFETIME, newTimeout); + + // update applicationTimeouts with new absolute value. + app.applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, + newTimeout); + + LOG.info("Application " + app.applicationId + " is registered for timeout monitor, type=" + ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime + " seconds"); @@ -1292,6 +1304,7 @@ public class RMAppImpl implements RMApp, Recoverable { ApplicationStateData.newInstance(this.submitTime, this.startTime, this.user, this.submissionContext, stateToBeStored, diags, this.storedFinishTime, this.callerContext); + appState.setApplicationTimeouts(this.applicationTimeouts); this.rmContext.getStateStore().updateApplicationState(appState); } @@ -1967,4 +1980,31 @@ public class RMAppImpl implements RMApp, Recoverable { } return applicationLifetime; } -} + + @Override + public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() { + this.readLock.lock(); + try { + return new HashMap(this.applicationTimeouts); + } finally { + this.readLock.unlock(); + } + } + + public void updateApplicationTimeout( + Map<ApplicationTimeoutType, Long> updateTimeout) { + this.writeLock.lock(); + try { + if (COMPLETED_APP_STATES.contains(getState())) { + return; + } + // update monitoring service + this.rmContext.getRMAppLifetimeMonitor() + .updateApplicationTimeouts(getApplicationId(), updateTimeout); + this.applicationTimeouts.putAll(updateTimeout); + + } finally { + this.writeLock.unlock(); + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
