YARN-7952. RM should be able to recover log aggregation status after restart/fail-over. (Xuan Gong via wangda)
Change-Id: I725c9afe64831eda0aa6b0bebdbc79d2dd165707 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4bf62204 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4bf62204 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4bf62204 Branch: refs/heads/trunk Commit: 4bf622043f034835d65ff2a4785b9b06d0ef1fd2 Parents: f47659f Author: Wangda Tan <[email protected]> Authored: Thu Mar 15 13:26:45 2018 -0700 Committer: Wangda Tan <[email protected]> Committed: Thu Mar 15 13:26:45 2018 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 5 +- .../src/main/resources/yarn-default.xml | 4 +- .../RegisterNodeManagerRequest.java | 5 + .../pb/RegisterNodeManagerRequestPBImpl.java | 80 ++++++ .../yarn_server_common_service_protos.proto | 1 + .../hadoop/yarn/server/nodemanager/Context.java | 4 +- .../yarn/server/nodemanager/NodeManager.java | 24 ++ .../nodemanager/NodeStatusUpdaterImpl.java | 14 + .../logaggregation/AppLogAggregatorImpl.java | 11 +- .../tracker/NMLogAggregationStatusTracker.java | 270 +++++++++++++++++++ .../amrmproxy/BaseAMRMProxyTest.java | 6 + .../BaseContainerManagerTest.java | 19 ++ .../TestNMLogAggregationStatusTracker.java | 151 +++++++++++ .../resourcemanager/ResourceTrackerService.java | 17 +- .../resourcemanager/rmnode/RMNodeImpl.java | 6 + .../rmnode/RMNodeStartedEvent.java | 11 + 16 files changed, 619 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2afff43..b76f457 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1328,7 +1328,10 @@ public class YarnConfiguration extends Configuration { * How long for ResourceManager to wait for NodeManager to report its * log aggregation status. If waiting time of which the log aggregation status * is reported from NodeManager exceeds the configured value, RM will report - * log aggregation status for this NodeManager as TIME_OUT + * log aggregation status for this NodeManager as TIME_OUT. + * + * This configuration will be used in NodeManager as well to decide + * whether and when to delete the cached log aggregation status. */ public static final String LOG_AGGREGATION_STATUS_TIME_OUT_MS = YARN_PREFIX + "log-aggregation-status.time-out.ms"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e192a0d..114ba4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1286,7 +1286,9 @@ How long for ResourceManager to wait for NodeManager to report its log aggregation status. If waiting time of which the log aggregation status is reported from NodeManager exceeds the configured value, RM - will report log aggregation status for this NodeManager as TIME_OUT + will report log aggregation status for this NodeManager as TIME_OUT. + This configuration will be used in NodeManager as well to decide + whether and when to delete the cached log aggregation status. </description> <name>yarn.log-aggregation-status.time-out.ms</name> <value>600000</value> http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index fc30a80..ff50330 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -112,4 +112,9 @@ public abstract class RegisterNodeManagerRequest { * @param physicalResource Physical resources in the node. */ public abstract void setPhysicalResource(Resource physicalResource); + + public abstract List<LogAggregationReport> getLogAggregationReportsForApps(); + + public abstract void setLogAggregationReportsForApps( + List<LogAggregationReport> logAggregationReportsForApps); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index eda06d0..02fd20f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -38,11 +38,13 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -57,6 +59,8 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest private List<ApplicationId> runningApplications = null; private Set<NodeLabel> labels = null; + private List<LogAggregationReport> logAggregationReportsForApps = null; + /** Physical resources in the node. */ private Resource physicalResource = null; @@ -100,6 +104,48 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest if (this.physicalResource != null) { builder.setPhysicalResource(convertToProtoFormat(this.physicalResource)); } + if (this.logAggregationReportsForApps != null) { + addLogAggregationStatusForAppsToProto(); + } + } + + private void addLogAggregationStatusForAppsToProto() { + maybeInitBuilder(); + builder.clearLogAggregationReportsForApps(); + if (this.logAggregationReportsForApps == null) { + return; + } + Iterable<LogAggregationReportProto> it = + new Iterable<LogAggregationReportProto>() { + @Override + public Iterator<LogAggregationReportProto> iterator() { + return new Iterator<LogAggregationReportProto>() { + private Iterator<LogAggregationReport> iter = + logAggregationReportsForApps.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public LogAggregationReportProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllLogAggregationReportsForApps(it); + } + + private LogAggregationReportProto convertToProtoFormat( + LogAggregationReport value) { + return ((LogAggregationReportPBImpl) value).getProto(); } private synchronized void addNMContainerStatusesToProto() { @@ -400,4 +446,38 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest NMContainerStatus c) { return ((NMContainerStatusPBImpl)c).getProto(); } + + @Override + public synchronized List<LogAggregationReport> + getLogAggregationReportsForApps() { + if (this.logAggregationReportsForApps != null) { + return this.logAggregationReportsForApps; + } + initLogAggregationReportsForApps(); + return logAggregationReportsForApps; + } + + private void initLogAggregationReportsForApps() { + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + List<LogAggregationReportProto> list = + p.getLogAggregationReportsForAppsList(); + this.logAggregationReportsForApps = new ArrayList<LogAggregationReport>(); + for (LogAggregationReportProto c : list) { + this.logAggregationReportsForApps.add(convertFromProtoFormat(c)); + } + } + + private LogAggregationReport convertFromProtoFormat( + LogAggregationReportProto logAggregationReport) { + return new LogAggregationReportPBImpl(logAggregationReport); + } + + @Override + public synchronized void setLogAggregationReportsForApps( + List<LogAggregationReport> logAggregationStatusForApps) { + if(logAggregationStatusForApps == null) { + builder.clearLogAggregationReportsForApps(); + } + this.logAggregationReportsForApps = logAggregationStatusForApps; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index e782cc2..1b090bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -66,6 +66,7 @@ message RegisterNodeManagerRequestProto { repeated ApplicationIdProto runningApplications = 7; optional NodeLabelsProto nodeLabels = 8; optional ResourceProto physicalResource = 9; + repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10; } message RegisterNodeManagerResponseProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index d7e3b52..84b3915 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; - import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; @@ -121,6 +121,8 @@ public interface Context { NMTimelinePublisher getNMTimelinePublisher(); + NMLogAggregationStatusTracker getNMLogAggregationStatusTracker(); + ContainerExecutor getContainerExecutor(); ContainerStateTransitionListener getContainerStateTransitionListener(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 42b7b5f..2748a8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; @@ -135,6 +136,7 @@ public class NodeManager extends CompositeService private boolean rmWorkPreservingRestartEnabled; private boolean shouldExitOnShutdownEvent = false; + private NMLogAggregationStatusTracker nmLogAggregationStatusTracker; /** * Default Container State transition listener. */ @@ -424,6 +426,12 @@ public class NodeManager extends CompositeService addService(containerManager); ((NMContext) context).setContainerManager(containerManager); + this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker( + context); + addService(nmLogAggregationStatusTracker); + ((NMContext)context).setNMLogAggregationStatusTracker( + this.nmLogAggregationStatusTracker); + WebServer webServer = createWebServer(context, containerManager .getContainersMonitor(), this.aclsManager, dirsHandler); addService(webServer); @@ -621,6 +629,8 @@ public class NodeManager extends CompositeService private ResourcePluginManager resourcePluginManager; + private NMLogAggregationStatusTracker nmLogAggregationStatusTracker; + public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, @@ -862,6 +872,15 @@ public class NodeManager extends CompositeService public void setDeletionService(DeletionService deletionService) { this.deletionService = deletionService; } + + public void setNMLogAggregationStatusTracker( + NMLogAggregationStatusTracker nmLogAggregationStatusTracker) { + this.nmLogAggregationStatusTracker = nmLogAggregationStatusTracker; + } + @Override + public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() { + return nmLogAggregationStatusTracker; + } } /** @@ -965,4 +984,9 @@ public class NodeManager extends CompositeService public NodeStatusUpdater getNodeStatusUpdater() { return nodeStatusUpdater; } + + private NMLogAggregationStatusTracker createNMLogAggregationStatusTracker( + Context ctxt) { + return new NMLogAggregationStatusTracker(ctxt); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3d3f573..8154723 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -381,6 +381,20 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } + if (logAggregationEnabled) { + // pull log aggregation status for application running in this NM + List<LogAggregationReport> logAggregationReports = + context.getNMLogAggregationStatusTracker() + .pullCachedLogAggregationReports(); + if (LOG.isDebugEnabled()) { + LOG.debug("The cache log aggregation status size:" + + logAggregationReports.size()); + } + if (logAggregationReports != null + && !logAggregationReports.isEmpty()) { + request.setLogAggregationReportsForApps(logAggregationReports); + } + } regNMResponse = resourceTracker.registerNodeManager(request); // Make sure rmIdentifier is set before we release the lock http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 4ac150a..c7e06ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -385,7 +385,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { logAggregationSucceedInThisCycle ? LogAggregationStatus.RUNNING : LogAggregationStatus.RUNNING_WITH_FAILURE; - sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage); + sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage, + false); if (appFinished) { // If the app is finished, one extra final report with log aggregation // status SUCCEEDED/FAILED will be sent to RM to inform the RM @@ -394,18 +395,22 @@ public class AppLogAggregatorImpl implements AppLogAggregator { renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED; - sendLogAggregationReportInternal(finalLogAggregationStatus, ""); + sendLogAggregationReportInternal(finalLogAggregationStatus, "", true); } } private void sendLogAggregationReportInternal( - LogAggregationStatus logAggregationStatus, String diagnosticMessage) { + LogAggregationStatus logAggregationStatus, String diagnosticMessage, + boolean finalized) { LogAggregationReport report = Records.newRecord(LogAggregationReport.class); report.setApplicationId(appId); report.setDiagnosticMessage(diagnosticMessage); report.setLogAggregationStatus(logAggregationStatus); this.context.getLogAggregationStatusForApps().add(report); + this.context.getNMLogAggregationStatusTracker().updateLogAggregationStatus( + appId, logAggregationStatus, System.currentTimeMillis(), + diagnosticMessage, finalized); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/logaggregation/tracker/NMLogAggregationStatusTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/logaggregation/tracker/NMLogAggregationStatusTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/logaggregation/tracker/NMLogAggregationStatusTracker.java new file mode 100644 index 0000000..510d6d8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/logaggregation/tracker/NMLogAggregationStatusTracker.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link NMLogAggregationStatusTracker} is used to cache log aggregation + * status for finished applications. It will also delete the old cached + * log aggregation status periodically. + * + */ +public class NMLogAggregationStatusTracker extends CompositeService { + + private static final Logger LOG = + LoggerFactory.getLogger(NMLogAggregationStatusTracker.class); + + private final ReadLock readLocker; + private final WriteLock writeLocker; + private final Context nmContext; + private final long rollingInterval; + private final Timer timer; + private final Map<ApplicationId, AppLogAggregationStatusForRMRecovery> + recoveryStatuses; + private boolean disabled = false; + + public NMLogAggregationStatusTracker(Context context) { + super(NMLogAggregationStatusTracker.class.getName()); + this.nmContext = context; + Configuration conf = context.getConf(); + if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { + disabled = true; + } + this.recoveryStatuses = new ConcurrentHashMap<>(); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.readLocker = lock.readLock(); + this.writeLocker = lock.writeLock(); + this.timer = new Timer(); + long configuredRollingInterval = conf.getLong( + YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS); + if (configuredRollingInterval <= 0) { + this.rollingInterval = YarnConfiguration + .DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS; + LOG.warn("The configured log-aggregation-status.time-out.ms is " + + configuredRollingInterval + " which should be larger than 0. " + + "Using the default value:" + this.rollingInterval + " instead."); + } else { + this.rollingInterval = configuredRollingInterval; + } + LOG.info("the rolling interval seconds for the NodeManager Cached Log " + + "aggregation status is " + (rollingInterval/1000)); + } + + @Override + protected void serviceStart() throws Exception { + if (disabled) { + LOG.warn("Log Aggregation is disabled." + + "So is the LogAggregationStatusTracker."); + } else { + this.timer.scheduleAtFixedRate(new LogAggregationStatusRoller(), + rollingInterval, rollingInterval); + } + } + + @Override + public void serviceStop() throws Exception { + this.timer.cancel(); + } + + public void updateLogAggregationStatus(ApplicationId appId, + LogAggregationStatus logAggregationStatus, long updateTime, + String diagnosis, boolean finalized) { + if (disabled) { + LOG.warn("The log aggregation is diabled. No need to update " + + "the log aggregation status"); + } + // In NM, each application has exactly one appLogAggregator thread + // to handle the log aggregation. So, it is fine which multiple + // appLogAggregator thread to update log aggregation status for its + // own application. This is why we are using readLocker here. + this.readLocker.lock(); + try { + AppLogAggregationStatusForRMRecovery tracker = recoveryStatuses + .get(appId); + if (tracker == null) { + Application application = this.nmContext.getApplications().get(appId); + if (application == null) { + LOG.warn("The application:" + appId + " has already finished," + + " and has been removed from NodeManager, we should not " + + "receive the log aggregation status update for " + + "this application."); + return; + } + AppLogAggregationStatusForRMRecovery newTracker = + new AppLogAggregationStatusForRMRecovery(logAggregationStatus, + diagnosis); + newTracker.setLastModifiedTime(updateTime); + newTracker.setFinalized(finalized); + recoveryStatuses.put(appId, newTracker); + } else { + if (tracker.isFinalized()) { + LOG.warn("Ignore the log aggregation status update request " + + "for the application:" + appId + ". The cached log aggregation " + + "status is " + tracker.getLogAggregationStatus() + "."); + } else { + if (tracker.getLastModifiedTime() > updateTime) { + LOG.warn("Ignore the log aggregation status update request " + + "for the application:" + appId + ". The request log " + + "aggregation status update is older than the cached " + + "log aggregation status."); + } else { + tracker.setLogAggregationStatus(logAggregationStatus); + tracker.setDiagnosis(diagnosis); + tracker.setLastModifiedTime(updateTime); + tracker.setFinalized(finalized); + recoveryStatuses.put(appId, tracker); + } + } + } + } finally { + this.readLocker.unlock(); + } + } + + public List<LogAggregationReport> pullCachedLogAggregationReports() { + List<LogAggregationReport> reports = new ArrayList<>(); + if (disabled) { + LOG.warn("The log aggregation is diabled." + + "There is no cached log aggregation status."); + return reports; + } + // When we pull cached Log aggregation reports for all application in + // this NM, we should make sure that we need to block all of the + // updateLogAggregationStatus calls. So, the writeLocker is used here. + this.writeLocker.lock(); + try { + for(Entry<ApplicationId, AppLogAggregationStatusForRMRecovery> tracker : + recoveryStatuses.entrySet()) { + AppLogAggregationStatusForRMRecovery current = tracker.getValue(); + LogAggregationReport report = LogAggregationReport.newInstance( + tracker.getKey(), current.getLogAggregationStatus(), + current.getDiagnosis()); + reports.add(report); + } + return reports; + } finally { + this.writeLocker.unlock(); + } + } + + private class LogAggregationStatusRoller extends TimerTask { + @Override + public void run() { + rollLogAggregationStatus(); + } + } + + private void rollLogAggregationStatus() { + // When we call rollLogAggregationStatus, basically fetch all + // cached log aggregation status and delete the out-of-timeout period + // log aggregation status, we should block the rollLogAggregationStatus + // calls as well as pullCachedLogAggregationReports call. So, the + // writeLocker is used here. + this.writeLocker.lock(); + try { + long currentTimeStamp = System.currentTimeMillis(); + LOG.info("Rolling over the cached log aggregation status."); + Iterator<Entry<ApplicationId, AppLogAggregationStatusForRMRecovery>> it + = recoveryStatuses.entrySet().iterator(); + while (it.hasNext()) { + Entry<ApplicationId, AppLogAggregationStatusForRMRecovery> tracker = + it.next(); + // the application has finished. + if (nmContext.getApplications().get(tracker.getKey()) == null) { + if (currentTimeStamp - tracker.getValue().getLastModifiedTime() + > rollingInterval) { + it.remove(); + } + } + } + } finally { + this.writeLocker.unlock(); + } + } + + private static class AppLogAggregationStatusForRMRecovery { + private LogAggregationStatus logAggregationStatus; + private long lastModifiedTime; + private boolean finalized; + private String diagnosis; + + AppLogAggregationStatusForRMRecovery( + LogAggregationStatus logAggregationStatus, String diagnosis) { + this.setLogAggregationStatus(logAggregationStatus); + this.setDiagnosis(diagnosis); + } + + public LogAggregationStatus getLogAggregationStatus() { + return logAggregationStatus; + } + + public void setLogAggregationStatus( + LogAggregationStatus logAggregationStatus) { + this.logAggregationStatus = logAggregationStatus; + } + + public long getLastModifiedTime() { + return lastModifiedTime; + } + + public void setLastModifiedTime(long lastModifiedTime) { + this.lastModifiedTime = lastModifiedTime; + } + + public boolean isFinalized() { + return finalized; + } + + public void setFinalized(boolean finalized) { + this.finalized = finalized; + } + + public String getDiagnosis() { + return diagnosis; + } + + public void setDiagnosis(String diagnosis) { + this.diagnosis = diagnosis; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 9602142..4b1a887 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -814,5 +815,10 @@ public abstract class BaseAMRMProxyTest { public DeletionService getDeletionService() { return null; } + + @Override + public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index fc9e6c4..93d0afb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -18,7 +18,14 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -77,6 +85,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; +import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; @@ -130,6 +139,16 @@ public abstract class BaseContainerManagerTest { public ContainerExecutor getContainerExecutor() { return exec; } + + @Override + public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() { + NMLogAggregationStatusTracker mock = mock( + NMLogAggregationStatusTracker.class); + doNothing().when(mock).updateLogAggregationStatus( + any(ApplicationId.class), any(LogAggregationStatus.class), + anyLong(), anyString(), anyBoolean()); + return mock; + } }; protected ContainerExecutor exec; protected DeletionService delSrvc; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java new file mode 100644 index 0000000..4efc398 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.base.Supplier; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.junit.Assert; +import org.junit.Test; + +/** + * Function test for {@link NMLogAggregationStatusTracker}. + * + */ +public class TestNMLogAggregationStatusTracker { + + @SuppressWarnings("resource") + @Test + public void testNMLogAggregationStatusUpdate() { + long baseTime = System.currentTimeMillis(); + Context mockContext = mock(Context.class); + ConcurrentMap<ApplicationId, Application> apps = new ConcurrentHashMap<>(); + ApplicationId appId1 = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + apps.putIfAbsent(appId1, mock(Application.class)); + when(mockContext.getApplications()).thenReturn(apps); + // the log aggregation is disabled. + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); + when(mockContext.getConf()).thenReturn(conf); + NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker( + mockContext); + ApplicationId appId0 = ApplicationId.newInstance(0, 0); + tracker.updateLogAggregationStatus(appId0, + LogAggregationStatus.RUNNING, System.currentTimeMillis(), "", false); + List<LogAggregationReport> reports = tracker + .pullCachedLogAggregationReports(); + // we can not get any cached log aggregation status because + // the log aggregation is disabled. + Assert.assertTrue("No cached log aggregation status because " + + "log aggregation is disabled.", reports.isEmpty()); + + // enable the log aggregation. + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + when(mockContext.getConf()).thenReturn(conf); + tracker = new NMLogAggregationStatusTracker(mockContext); + // update the log aggregation status for an un-existed/finished + // application, we should ignore the status update request. + appId0 = ApplicationId.newInstance(0, 0); + tracker.updateLogAggregationStatus(appId0, + LogAggregationStatus.RUNNING, baseTime, "", false); + reports = tracker + .pullCachedLogAggregationReports(); + Assert.assertTrue("No cached log aggregation status " + + "because the application is finished or not existed.", + reports.isEmpty()); + + tracker.updateLogAggregationStatus(appId1, + LogAggregationStatus.RUNNING, baseTime, "", false); + reports = tracker + .pullCachedLogAggregationReports(); + Assert.assertEquals("Should have one cached log aggregation status.", + 1, reports.size()); + Assert.assertEquals("The cached log aggregation status should be RUNNING.", + LogAggregationStatus.RUNNING, + reports.get(0).getLogAggregationStatus()); + + tracker.updateLogAggregationStatus(appId1, + LogAggregationStatus.SUCCEEDED, baseTime + 60 * 1000, "", true); + reports = tracker + .pullCachedLogAggregationReports(); + Assert.assertEquals(1, reports.size()); + Assert.assertEquals("Update cached log aggregation status to SUCCEEDED", + LogAggregationStatus.SUCCEEDED, + reports.get(0).getLogAggregationStatus()); + + // the log aggregation status is finalized. So, we would + // ingore the following update + tracker.updateLogAggregationStatus(appId1, + LogAggregationStatus.FAILED, baseTime + 10 * 60 * 1000, "", true); + reports = tracker + .pullCachedLogAggregationReports(); + Assert.assertEquals(1, reports.size()); + Assert.assertEquals("The cached log aggregation status " + + "should be still SUCCEEDED.", LogAggregationStatus.SUCCEEDED, + reports.get(0).getLogAggregationStatus()); + } + + public void testLogAggregationStatusRoller() throws Exception { + Context mockContext = mock(Context.class); + Configuration conf = new YarnConfiguration(); + conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, + 10 * 1000); + when(mockContext.getConf()).thenReturn(conf); + ConcurrentMap<ApplicationId, Application> apps = new ConcurrentHashMap<>(); + ApplicationId appId1 = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + apps.putIfAbsent(appId1, mock(Application.class)); + when(mockContext.getApplications()).thenReturn(apps); + final NMLogAggregationStatusTracker tracker = + new NMLogAggregationStatusTracker(mockContext); + tracker.updateLogAggregationStatus(appId1, + LogAggregationStatus.RUNNING, + System.currentTimeMillis(), "", false); + // verify that we have cached the log aggregation status for app1 + List<LogAggregationReport> reports = tracker + .pullCachedLogAggregationReports(); + Assert.assertEquals("Should have one cached log aggregation status.", + 1, reports.size()); + Assert.assertEquals("The cached log aggregation status should be RUNNING.", + LogAggregationStatus.RUNNING, + reports.get(0).getLogAggregationStatus()); + // wait for 10s + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + List<LogAggregationReport>reports = tracker + .pullCachedLogAggregationReports(); + return reports.size() == 0; + } + }, 2000, 10000); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 9d95f63..e997192 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -399,9 +399,21 @@ public class ResourceTrackerService extends AbstractService implements RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { + RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId, + request.getNMContainerStatuses(), + request.getRunningApplications()); + if (request.getLogAggregationReportsForApps() != null + && !request.getLogAggregationReportsForApps().isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found the number of previous cached log aggregation " + + "status from nodemanager:" + nodeId + " is :" + + request.getLogAggregationReportsForApps().size()); + } + startEvent.setLogAggregationReportsForApps(request + .getLogAggregationReportsForApps()); + } this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(), - request.getRunningApplications())); + startEvent); } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); @@ -426,7 +438,6 @@ public class ResourceTrackerService extends AbstractService implements this.rmContext.getRMNodes().put(nodeId, rmNode); this.rmContext.getDispatcher().getEventHandler() .handle(new RMNodeStartedEvent(nodeId, null, null)); - } else { // Reset heartbeat ID since node just restarted. oldNode.resetLastNodeHeartBeatResponse(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3cbde01..b942afa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -866,6 +866,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); + List<LogAggregationReport> logAggregationReportsForApps = + startEvent.getLogAggregationReportsForApps(); + if (logAggregationReportsForApps != null + && !logAggregationReportsForApps.isEmpty()) { + rmNode.handleLogAggregationStatus(logAggregationReportsForApps); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bf62204/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java index 4fc983a..3976994 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java @@ -22,12 +22,14 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; public class RMNodeStartedEvent extends RMNodeEvent { private List<NMContainerStatus> containerStatuses; private List<ApplicationId> runningApplications; + private List<LogAggregationReport> logAggregationReportsForApps; public RMNodeStartedEvent(NodeId nodeId, List<NMContainerStatus> containerReports, @@ -44,4 +46,13 @@ public class RMNodeStartedEvent extends RMNodeEvent { public List<ApplicationId> getRunningApplications() { return runningApplications; } + + public List<LogAggregationReport> getLogAggregationReportsForApps() { + return this.logAggregationReportsForApps; + } + + public void setLogAggregationReportsForApps( + List<LogAggregationReport> logAggregationReportsForApps) { + this.logAggregationReportsForApps = logAggregationReportsForApps; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
