YARN-221. NM should provide a way for AM to tell it not to aggregate logs. Contributed by Ming Ma
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/37e1c3d8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/37e1c3d8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/37e1c3d8 Branch: refs/heads/HDFS-7240 Commit: 37e1c3d82a96d781e1c9982988b7de4aa5242d0c Parents: 490bb5e Author: Xuan <xg...@apache.org> Authored: Sat Aug 22 16:25:24 2015 -0700 Committer: Xuan <xg...@apache.org> Committed: Sat Aug 22 16:25:24 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/util/StringUtils.java | 13 +- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/api/records/LogAggregationContext.java | 95 +++ .../hadoop/yarn/conf/YarnConfiguration.java | 6 + .../api/ContainerLogAggregationPolicy.java | 54 ++ .../yarn/server/api/ContainerLogContext.java | 71 ++ .../src/main/proto/yarn_protos.proto | 2 + .../impl/pb/LogAggregationContextPBImpl.java | 40 ++ .../ContainerLogsRetentionPolicy.java | 15 +- .../src/main/resources/yarn-default.xml | 24 + .../application/ApplicationImpl.java | 5 +- .../AMOnlyLogAggregationPolicy.java | 31 + ...AMOrFailedContainerLogAggregationPolicy.java | 35 + .../AbstractContainerLogAggregationPolicy.java | 31 + .../logaggregation/AppLogAggregator.java | 5 +- .../logaggregation/AppLogAggregatorImpl.java | 131 ++-- .../FailedContainerLogAggregationPolicy.java | 33 + ...edOrKilledContainerLogAggregationPolicy.java | 30 + .../logaggregation/LogAggregationService.java | 19 +- .../NoneContainerLogAggregationPolicy.java | 30 + .../SampleContainerLogAggregationPolicy.java | 124 ++++ .../event/LogHandlerAppStartedEvent.java | 15 +- .../containermanager/TestAuxServices.java | 1 + .../TestLogAggregationService.java | 677 ++++++++++++++++--- .../TestNonAggregatingLogHandler.java | 12 +- .../capacity/TestContainerAllocation.java | 12 +- 26 files changed, 1343 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java index 153270f..1107007 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java @@ -315,7 +315,18 @@ public class StringUtils { * @return the arraylist of the comma seperated string values */ public static String[] getStrings(String str){ - Collection<String> values = getStringCollection(str); + String delim = ","; + return getStrings(str, delim); + } + + /** + * Returns an arraylist of strings. + * @param str the string values + * @param delim delimiter to separate the values + * @return the arraylist of the seperated string values + */ + public static String[] getStrings(String str, String delim){ + Collection<String> values = getStringCollection(str, delim); if(values.size() == 0) { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cf7b67f..5904a31 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -178,6 +178,9 @@ Release 2.8.0 - UNRELEASED YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed Node Label Configuration Setup. (Naganarasimha G R) + YARN-221. NM should provide a way for AM to tell it not to aggregate logs. + (Ming Ma via xgong) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java index 9383004..5ac7d2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java @@ -54,6 +54,43 @@ import org.apache.hadoop.yarn.util.Records; * name matches both the include and the exclude pattern, this file * will be excluded eventually. * </li> + * <li> + * policyClassName. The policy class name that implements + * ContainerLogAggregationPolicy. At runtime, nodemanager will the policy + * if a given container's log should be aggregated based on the + * ContainerType and other runtime state such as exit code by calling + * ContainerLogAggregationPolicy#shouldDoLogAggregation. + * This is useful when the app only wants to aggregate logs of a subset of + * containers. Here are the available policies. Please make sure to specify + * the canonical name by prefixing org.apache.hadoop.yarn.server. + * nodemanager.containermanager.logaggregation. + * to the class simple name below. + * NoneContainerLogAggregationPolicy: skip aggregation for all containers. + * AllContainerLogAggregationPolicy: aggregate all containers. + * AMOrFailedContainerLogAggregationPolicy: aggregate application master + * or failed containers. + * FailedOrKilledContainerLogAggregationPolicy: aggregate failed or killed + * containers + * FailedContainerLogAggregationPolicy: aggregate failed containers + * AMOnlyLogAggregationPolicy: aggregate application master containers + * SampleContainerLogAggregationPolicy: sample logs of successful worker + * containers, in addition to application master and failed/killed + * containers. + * If it isn't specified, it will use the cluster-wide default policy + * defined by configuration yarn.nodemanager.log-aggregation.policy.class. + * The default value of yarn.nodemanager.log-aggregation.policy.class is + * AllContainerLogAggregationPolicy. + * </li> + * <li> + * policyParameters. The parameters passed to the policy class via + * ContainerLogAggregationPolicy#parseParameters during the policy object + * initialization. This is optional. Some policy class might use parameters + * to adjust its settings. It is up to policy class to define the scheme of + * parameters. + * For example, SampleContainerLogAggregationPolicy supports the format of + * "SR:0.5,MIN:50", which means sample rate of 50% beyond the first 50 + * successful worker containers. + * </li> * </ul> * * @see ApplicationSubmissionContext @@ -87,6 +124,23 @@ public abstract class LogAggregationContext { return context; } + @Public + @Unstable + public static LogAggregationContext newInstance(String includePattern, + String excludePattern, String rolledLogsIncludePattern, + String rolledLogsExcludePattern, String policyClassName, + String policyParameters) { + LogAggregationContext context = + Records.newRecord(LogAggregationContext.class); + context.setIncludePattern(includePattern); + context.setExcludePattern(excludePattern); + context.setRolledLogsIncludePattern(rolledLogsIncludePattern); + context.setRolledLogsExcludePattern(rolledLogsExcludePattern); + context.setLogAggregationPolicyClassName(policyClassName); + context.setLogAggregationPolicyParameters(policyParameters); + return context; + } + /** * Get include pattern. This includePattern only takes affect * on logs that exist at the time of application finish. @@ -164,4 +218,45 @@ public abstract class LogAggregationContext { @Unstable public abstract void setRolledLogsExcludePattern( String rolledLogsExcludePattern); + + /** + * Get the log aggregation policy class. + * + * @return log aggregation policy class + */ + @Public + @Unstable + public abstract String getLogAggregationPolicyClassName(); + + /** + * Set the log aggregation policy class. + * + * @param className + */ + @Public + @Unstable + public abstract void setLogAggregationPolicyClassName( + String className); + + /** + * Get the log aggregation policy parameters. + * + * @return log aggregation policy parameters + */ + @Public + @Unstable + public abstract String getLogAggregationPolicyParameters(); + + /** + * Set the log aggregation policy parameters. + * There is no schema defined for the parameters string. + * It is up to the log aggregation policy class to decide how to parse + * the parameters string. + * + * @param parameters + */ + @Public + @Unstable + public abstract void setLogAggregationPolicyParameters( + String parameters); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 55eac85..a18ef7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1237,6 +1237,12 @@ public class YarnConfiguration extends Configuration { NM_RECOVERY_PREFIX + "supervised"; public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false; + public static final String NM_LOG_AGG_POLICY_CLASS = + NM_PREFIX + "log-aggregation.policy.class"; + + public static final String NM_LOG_AGG_POLICY_CLASS_PARAMETERS = NM_PREFIX + + "log-aggregation.policy.parameters"; + //////////////////////////////// // Web Proxy Configs //////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogAggregationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogAggregationPolicy.java new file mode 100644 index 0000000..2acbcf2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogAggregationPolicy.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + + +/** + * This API is used by NodeManager to decide if a given container's logs + * should be aggregated at run time. + */ +@Public +@Unstable +public interface ContainerLogAggregationPolicy { + + /** + * <p> + * The method used by the NodeManager log aggregation service + * to initial the policy object with parameters specified by the application + * or the cluster-wide setting. + * </p> + * + * @param parameters parameters with scheme defined by the policy class. + */ + void parseParameters(String parameters); + + /** + * <p> + * The method used by the NodeManager log aggregation service + * to ask the policy object if a given container's logs should be aggregated. + * </p> + * + * @param logContext ContainerLogContext + * @return Whether or not the container's logs should be aggregated. + */ + boolean shouldDoLogAggregation(ContainerLogContext logContext); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogContext.java new file mode 100644 index 0000000..ab3b75c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogContext.java @@ -0,0 +1,71 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; + +/** + * Context class for {@link ContainerLogAggregationPolicy}. + */ +@Public +@Unstable +public class ContainerLogContext { + private final ContainerId containerId; + private final ContainerType containerType; + private int exitCode; + + @Public + @Unstable + public ContainerLogContext(ContainerId containerId, + ContainerType containerType, int exitCode) { + this.containerId = containerId; + this.containerType = containerType; + this.exitCode = exitCode; + } + + /** + * Get {@link ContainerId} of the container. + * + * @return the container ID + */ + public ContainerId getContainerId() { + return containerId; + } + + /** + * Get {@link ContainerType} the type of the container. + * + * @return the type of the container + */ + public ContainerType getContainerType() { + return containerType; + } + + /** + * Get the exit code of the container. + * + * @return the exit code + */ + public int getExitCode() { + return exitCode; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 13d8365..1bd3dda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -343,6 +343,8 @@ message LogAggregationContextProto { optional string exclude_pattern = 2 [default = ""]; optional string rolled_logs_include_pattern = 3 [default = ""]; optional string rolled_logs_exclude_pattern = 4 [default = ".*"]; + optional string log_aggregation_policy_class_name = 5; + optional string log_aggregation_policy_parameters = 6; } enum ApplicationAccessTypeProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java index f6409bb..14a50fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java @@ -155,4 +155,44 @@ public class LogAggregationContextPBImpl extends LogAggregationContext{ } builder.setRolledLogsExcludePattern(rolledLogsExcludePattern); } + + @Override + public String getLogAggregationPolicyClassName() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasLogAggregationPolicyClassName()) { + return null; + } + return p.getLogAggregationPolicyClassName(); + } + + @Override + public void setLogAggregationPolicyClassName( + String className) { + maybeInitBuilder(); + if (className == null) { + builder.clearLogAggregationPolicyClassName(); + return; + } + builder.setLogAggregationPolicyClassName(className); + } + + @Override + public String getLogAggregationPolicyParameters() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasLogAggregationPolicyParameters()) { + return null; + } + return p.getLogAggregationPolicyParameters(); + } + + @Override + public void setLogAggregationPolicyParameters( + String config) { + maybeInitBuilder(); + if (config == null) { + builder.clearLogAggregationPolicyParameters(); + return; + } + builder.setLogAggregationPolicyParameters(config); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java index fa39f25..3e7cd5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java @@ -16,14 +16,15 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.logaggregation; +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; @Private -/** - * This API is not exposed to end-users yet. - */ -public enum ContainerLogsRetentionPolicy { - APPLICATION_MASTER_ONLY, AM_AND_FAILED_CONTAINERS_ONLY, ALL_CONTAINERS -} +public class AllContainerLogAggregationPolicy extends + AbstractContainerLogAggregationPolicy { + public boolean shouldDoLogAggregation(ContainerLogContext logContext) { + return true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 00a9fba..62ba599 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2226,4 +2226,28 @@ <value>0</value> </property> + <property> + <description> + The default log aggregation policy class. Applications can + override it via LogAggregationContext. This configuration can provide + some cluster-side default behavior so that if the application doesn't + specify any policy via LogAggregationContext administrators of the cluster + can adjust the policy globally. + </description> + <name>yarn.nodemanager.log-aggregation.policy.class</name> + <value>org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy</value> + </property> + + <property> + <description> + The default parameters for the log aggregation policy. Applications can + override it via LogAggregationContext. This configuration can provide + some cluster-side default behavior so that if the application doesn't + specify any policy via LogAggregationContext administrators of the cluster + can adjust the policy globally. + </description> + <name>yarn.nodemanager.log-aggregation.policy.parameters</name> + <value></value> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index e880c31..fbc8453 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; @@ -242,8 +241,8 @@ public class ApplicationImpl implements Application { app.logAggregationContext = initEvent.getLogAggregationContext(); app.dispatcher.getEventHandler().handle( new LogHandlerAppStartedEvent(app.appId, app.user, - app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS, - app.applicationACLs, app.logAggregationContext)); + app.credentials, app.applicationACLs, + app.logAggregationContext)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOnlyLogAggregationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOnlyLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOnlyLogAggregationPolicy.java new file mode 100644 index 0000000..1c740ee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOnlyLogAggregationPolicy.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; +import org.apache.hadoop.yarn.server.api.ContainerType; + +@Private +public class AMOnlyLogAggregationPolicy extends + AbstractContainerLogAggregationPolicy { + public boolean shouldDoLogAggregation(ContainerLogContext logContext) { + return logContext.getContainerType() == ContainerType.APPLICATION_MASTER; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOrFailedContainerLogAggregationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOrFailedContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOrFailedContainerLogAggregationPolicy.java new file mode 100644 index 0000000..faee004 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOrFailedContainerLogAggregationPolicy.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; +import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; + +@Private +public class AMOrFailedContainerLogAggregationPolicy extends + AbstractContainerLogAggregationPolicy { + public boolean shouldDoLogAggregation(ContainerLogContext logContext) { + int exitCode = logContext.getExitCode(); + return logContext.getContainerType() == ContainerType.APPLICATION_MASTER || + (exitCode != 0 && exitCode != ExitCode.FORCE_KILLED.getExitCode() + && exitCode != ExitCode.TERMINATED.getExitCode()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AbstractContainerLogAggregationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AbstractContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AbstractContainerLogAggregationPolicy.java new file mode 100644 index 0000000..8d9dc03 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AbstractContainerLogAggregationPolicy.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy; + +// The class provides no-op implementation for parseParameters. Polices +// that don't need parameters can derive from this class. +@Private +public abstract class AbstractContainerLogAggregationPolicy implements + ContainerLogAggregationPolicy { + public void parseParameters(String parameters) { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java index 0b72a39..83c5d5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java @@ -18,12 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; -import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; public interface AppLogAggregator extends Runnable { - void startContainerLogAggregation(ContainerId containerId, - boolean wasContainerSuccessful); + void startContainerLogAggregation(ContainerLogContext logContext); void abortLogAggregation(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 654eb0b..742b8a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -56,9 +57,12 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; -import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; @@ -107,7 +111,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final UserGroupInformation userUgi; private final Path remoteNodeLogFileForApp; private final Path remoteNodeTmpLogFileForApp; - private final ContainerLogsRetentionPolicy retentionPolicy; private final BlockingQueue<ContainerId> pendingContainers; private final AtomicBoolean appFinishing = new AtomicBoolean(); @@ -128,12 +131,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators = new HashMap<ContainerId, ContainerLogAggregator>(); + private final ContainerLogAggregationPolicy logAggPolicy; public AppLogAggregatorImpl(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, - ContainerLogsRetentionPolicy retentionPolicy, Map<ApplicationAccessType, String> appAcls, LogAggregationContext logAggregationContext, Context context, FileContext lfs) { @@ -146,7 +149,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.dirsHandler = dirsHandler; this.remoteNodeLogFileForApp = remoteNodeLogFileForApp; this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); - this.retentionPolicy = retentionPolicy; this.pendingContainers = new LinkedBlockingQueue<ContainerId>(); this.appAcls = appAcls; this.lfs = lfs; @@ -204,6 +206,66 @@ public class AppLogAggregatorImpl implements AppLogAggregator { || this.logAggregationContext.getRolledLogsIncludePattern() == null || this.logAggregationContext.getRolledLogsIncludePattern() .isEmpty() ? false : true; + this.logAggPolicy = getLogAggPolicy(conf); + } + + private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { + ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(conf); + String params = getLogAggPolicyParameters(conf); + if (params != null) { + policy.parseParameters(params); + } + return policy; + } + + // Use the policy class specified in LogAggregationContext if available. + // Otherwise use the cluster-wide default policy class. + private ContainerLogAggregationPolicy getLogAggPolicyInstance( + Configuration conf) { + Class<? extends ContainerLogAggregationPolicy> policyClass = null; + if (this.logAggregationContext != null) { + String className = + this.logAggregationContext.getLogAggregationPolicyClassName(); + if (className != null) { + try { + Class<?> policyFromContext = conf.getClassByName(className); + if (ContainerLogAggregationPolicy.class.isAssignableFrom( + policyFromContext)) { + policyClass = policyFromContext.asSubclass( + ContainerLogAggregationPolicy.class); + } else { + LOG.warn(this.appId + " specified invalid log aggregation policy " + + className); + } + } catch (ClassNotFoundException cnfe) { + // We don't fail the app if the policy class isn't valid. + LOG.warn(this.appId + " specified invalid log aggregation policy " + + className); + } + } + } + if (policyClass == null) { + policyClass = conf.getClass(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS, + AllContainerLogAggregationPolicy.class, + ContainerLogAggregationPolicy.class); + } else { + LOG.info(this.appId + " specifies ContainerLogAggregationPolicy of " + + policyClass); + } + return ReflectionUtils.newInstance(policyClass, conf); + } + + // Use the policy parameters specified in LogAggregationContext if available. + // Otherwise use the cluster-wide default policy parameters. + private String getLogAggPolicyParameters(Configuration conf) { + String params = null; + if (this.logAggregationContext != null) { + params = this.logAggregationContext.getLogAggregationPolicyParameters(); + } + if (params == null) { + params = conf.get(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS); + } + return params; } private void uploadLogsForContainers(boolean appFinished) { @@ -228,21 +290,22 @@ public class AppLogAggregatorImpl implements AppLogAggregator { // Create a set of Containers whose logs will be uploaded in this cycle. // It includes: // a) all containers in pendingContainers: those containers are finished - // and satisfy the retentionPolicy. + // and satisfy the ContainerLogAggregationPolicy. // b) some set of running containers: For all the Running containers, - // we have ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, - // so simply set wasContainerSuccessful as true to - // bypass FAILED_CONTAINERS check and find the running containers - // which satisfy the retentionPolicy. + // we use exitCode of 0 to find those which satisfy the + // ContainerLogAggregationPolicy. Set<ContainerId> pendingContainerInThisCycle = new HashSet<ContainerId>(); this.pendingContainers.drainTo(pendingContainerInThisCycle); Set<ContainerId> finishedContainers = new HashSet<ContainerId>(pendingContainerInThisCycle); if (this.context.getApplications().get(this.appId) != null) { - for (ContainerId container : this.context.getApplications() - .get(this.appId).getContainers().keySet()) { - if (shouldUploadLogs(container, true)) { - pendingContainerInThisCycle.add(container); + for (Container container : this.context.getApplications() + .get(this.appId).getContainers().values()) { + ContainerType containerType = + container.getContainerTokenIdentifier().getContainerType(); + if (shouldUploadLogs(new ContainerLogContext( + container.getContainerId(), containerType, 0))) { + pendingContainerInThisCycle.add(container.getContainerId()); } } } @@ -506,46 +569,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator { // TODO: The condition: containerId.getId() == 1 to determine an AM container // is not always true. - private boolean shouldUploadLogs(ContainerId containerId, - boolean wasContainerSuccessful) { - - // All containers - if (this.retentionPolicy - .equals(ContainerLogsRetentionPolicy.ALL_CONTAINERS)) { - return true; - } - - // AM Container only - if (this.retentionPolicy - .equals(ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY)) { - if ((containerId.getContainerId() - & ContainerId.CONTAINER_ID_BITMASK)== 1) { - return true; - } - return false; - } - - // AM + Failing containers - if (this.retentionPolicy - .equals(ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY)) { - if ((containerId.getContainerId() - & ContainerId.CONTAINER_ID_BITMASK) == 1) { - return true; - } else if(!wasContainerSuccessful) { - return true; - } - return false; - } - return false; + private boolean shouldUploadLogs(ContainerLogContext logContext) { + return logAggPolicy.shouldDoLogAggregation(logContext); } @Override - public void startContainerLogAggregation(ContainerId containerId, - boolean wasContainerSuccessful) { - if (shouldUploadLogs(containerId, wasContainerSuccessful)) { - LOG.info("Considering container " + containerId + public void startContainerLogAggregation(ContainerLogContext logContext) { + if (shouldUploadLogs(logContext)) { + LOG.info("Considering container " + logContext.getContainerId() + " for log-aggregation"); - this.pendingContainers.add(containerId); + this.pendingContainers.add(logContext.getContainerId()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedContainerLogAggregationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedContainerLogAggregationPolicy.java new file mode 100644 index 0000000..800315e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedContainerLogAggregationPolicy.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; + +@Private +public class FailedContainerLogAggregationPolicy extends + AbstractContainerLogAggregationPolicy { + public boolean shouldDoLogAggregation(ContainerLogContext logContext) { + int exitCode = logContext.getExitCode(); + return exitCode != 0 && exitCode != ExitCode.FORCE_KILLED.getExitCode() + && exitCode != ExitCode.TERMINATED.getExitCode(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedOrKilledContainerLogAggregationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedOrKilledContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedOrKilledContainerLogAggregationPolicy.java new file mode 100644 index 0000000..02a48ab --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedOrKilledContainerLogAggregationPolicy.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; + +@Private +public class FailedOrKilledContainerLogAggregationPolicy extends + AbstractContainerLogAggregationPolicy { + public boolean shouldDoLogAggregation(ContainerLogContext logContext) { + return logContext.getExitCode() != 0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index dbbfcd5..259e9ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -48,8 +48,9 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; @@ -314,13 +315,12 @@ public class LogAggregationService extends AbstractService implements @SuppressWarnings("unchecked") private void initApp(final ApplicationId appId, String user, - Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, - Map<ApplicationAccessType, String> appAcls, + Credentials credentials, Map<ApplicationAccessType, String> appAcls, LogAggregationContext logAggregationContext) { ApplicationEvent eventResponse; try { verifyAndCreateRemoteLogDir(getConfig()); - initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls, + initAppAggregator(appId, user, credentials, appAcls, logAggregationContext); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); @@ -342,8 +342,7 @@ public class LogAggregationService extends AbstractService implements protected void initAppAggregator(final ApplicationId appId, String user, - Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, - Map<ApplicationAccessType, String> appAcls, + Credentials credentials, Map<ApplicationAccessType, String> appAcls, LogAggregationContext logAggregationContext) { // Get user's FileSystem credentials @@ -357,7 +356,7 @@ public class LogAggregationService extends AbstractService implements final AppLogAggregator appLogAggregator = new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId, userUgi, this.nodeId, dirsHandler, - getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, + getRemoteNodeLogFileForApp(appId, user), appAcls, logAggregationContext, this.context, getLocalFileContext(getConfig())); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { @@ -420,7 +419,10 @@ public class LogAggregationService extends AbstractService implements + ", did it fail to start?"); return; } - aggregator.startContainerLogAggregation(containerId, exitCode == 0); + ContainerType containerType = context.getContainers().get( + containerId).getContainerTokenIdentifier().getContainerType(); + aggregator.startContainerLogAggregation( + new ContainerLogContext(containerId, containerType, exitCode)); } private void stopApp(ApplicationId appId) { @@ -445,7 +447,6 @@ public class LogAggregationService extends AbstractService implements (LogHandlerAppStartedEvent) event; initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(), appStartEvent.getCredentials(), - appStartEvent.getLogRetentionPolicy(), appStartEvent.getApplicationAcls(), appStartEvent.getLogAggregationContext()); break; http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/NoneContainerLogAggregationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/NoneContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/NoneContainerLogAggregationPolicy.java new file mode 100644 index 0000000..86dcd54 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/NoneContainerLogAggregationPolicy.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; + +@Private +public class NoneContainerLogAggregationPolicy extends + AbstractContainerLogAggregationPolicy { + public boolean shouldDoLogAggregation(ContainerLogContext logContext) { + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/SampleContainerLogAggregationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/SampleContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/SampleContainerLogAggregationPolicy.java new file mode 100644 index 0000000..56c760b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/SampleContainerLogAggregationPolicy.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; + +import java.util.Collection; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; +import org.apache.hadoop.yarn.server.api.ContainerType; + +/** + * The sample policy samples logs of successful worker containers to aggregate. + * It always aggregates AM container and failed/killed worker + * containers' logs. To make sure small applications have enough logs, it only + * applies sampling beyond minimal number of containers. The parameters can be + * configured by SAMPLE_RATE and MIN_THRESHOLD. For example if SAMPLE_RATE is + * 0.2 and MIN_THRESHOLD is 20, for an application with 100 successful + * worker containers, 20 + (100-20) * 0.2 = 36 containers's logs will be + * aggregated. + */ +@Private +public class SampleContainerLogAggregationPolicy implements + ContainerLogAggregationPolicy { + private static final Log LOG = + LogFactory.getLog(SampleContainerLogAggregationPolicy.class); + + static String SAMPLE_RATE = "SR"; + public static final float DEFAULT_SAMPLE_RATE = 0.2f; + + static String MIN_THRESHOLD = "MIN"; + public static final int DEFAULT_SAMPLE_MIN_THRESHOLD = 20; + + private float sampleRate = DEFAULT_SAMPLE_RATE; + private int minThreshold = DEFAULT_SAMPLE_MIN_THRESHOLD; + + static public String buildParameters(float sampleRate, int minThreshold) { + StringBuilder sb = new StringBuilder(); + sb.append(SAMPLE_RATE).append(":").append(sampleRate).append(","). + append(MIN_THRESHOLD).append(":").append(minThreshold); + return sb.toString(); + } + + // Parameters are comma separated properties, for example + // "SR:0.5,MIN:50" + public void parseParameters(String parameters) { + Collection<String> params = StringUtils.getStringCollection(parameters); + for(String param : params) { + // The first element is the property name. + // The second element is the property value. + String[] property = StringUtils.getStrings(param, ":"); + if (property == null || property.length != 2) { + continue; + } + if (property[0].equals(SAMPLE_RATE)) { + try { + float sampleRate = Float.parseFloat(property[1]); + if (sampleRate >= 0.0 && sampleRate <= 1.0) { + this.sampleRate = sampleRate; + } else { + LOG.warn("The format isn't valid. Sample rate falls back to the " + + "default value " + DEFAULT_SAMPLE_RATE); + } + } catch (NumberFormatException nfe) { + LOG.warn("The format isn't valid. Sample rate falls back to the " + + "default value " + DEFAULT_SAMPLE_RATE); + } + } else if (property[0].equals(MIN_THRESHOLD)) { + try { + int minThreshold = Integer.parseInt(property[1]); + if (minThreshold >= 0) { + this.minThreshold = minThreshold; + } else { + LOG.warn("The format isn't valid. Min threshold falls back to " + + "the default value " + DEFAULT_SAMPLE_MIN_THRESHOLD); + } + } catch (NumberFormatException nfe) { + LOG.warn("The format isn't valid. Min threshold falls back to the " + + "default value " + DEFAULT_SAMPLE_MIN_THRESHOLD); + } + } + } + } + + public boolean shouldDoLogAggregation(ContainerLogContext logContext) { + if (logContext.getContainerType() == + ContainerType.APPLICATION_MASTER || logContext.getExitCode() != 0) { + // If it is AM or failed or killed container, enable log aggregation. + return true; + } + + // Only sample log aggregation for large applications. + // We assume the container id is continuously allocated from number 1 and + // Worker containers start from id 2. So logs of worker containers with ids + // in [2, minThreshold + 1] will be aggregated. + if ((logContext.getContainerId().getContainerId() & + ContainerId.CONTAINER_ID_BITMASK) < minThreshold + 2) { + return true; + } + + // Sample log aggregation for the rest of successful worker containers + return (sampleRate != 0 && + logContext.getContainerId().hashCode() % (1/sampleRate) == 0); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java index 993f69c..d3ff771 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java @@ -24,32 +24,27 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.LogAggregationContext; -import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; public class LogHandlerAppStartedEvent extends LogHandlerEvent { private final ApplicationId applicationId; - private final ContainerLogsRetentionPolicy retentionPolicy; private final String user; private final Credentials credentials; private final Map<ApplicationAccessType, String> appAcls; private final LogAggregationContext logAggregationContext; public LogHandlerAppStartedEvent(ApplicationId appId, String user, - Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy, - Map<ApplicationAccessType, String> appAcls) { - this(appId, user, credentials, retentionPolicy, appAcls, null); + Credentials credentials, Map<ApplicationAccessType, String> appAcls) { + this(appId, user, credentials, appAcls, null); } public LogHandlerAppStartedEvent(ApplicationId appId, String user, - Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy, - Map<ApplicationAccessType, String> appAcls, + Credentials credentials, Map<ApplicationAccessType, String> appAcls, LogAggregationContext logAggregationContext) { super(LogHandlerEventType.APPLICATION_STARTED); this.applicationId = appId; this.user = user; this.credentials = credentials; - this.retentionPolicy = retentionPolicy; this.appAcls = appAcls; this.logAggregationContext = logAggregationContext; } @@ -62,10 +57,6 @@ public class LogHandlerAppStartedEvent extends LogHandlerEvent { return this.credentials; } - public ContainerLogsRetentionPolicy getLogRetentionPolicy() { - return this.retentionPolicy; - } - public String getUser() { return this.user; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index 757cdc8..1380752 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -87,6 +87,7 @@ public class TestAuxServices { this.stoppedApps = new ArrayList<Integer>(); } + @SuppressWarnings("unchecked") public ArrayList<Integer> getAppIdsStopped() { return (ArrayList<Integer>)this.stoppedApps.clone(); }