YARN-6339. Improve performance for createAndGetApplicationReport. (Yunjiong Zhao via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cd014d57 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cd014d57 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cd014d57 Branch: refs/heads/HDFS-10467 Commit: cd014d57aa8b896da02b5bcadafbd404bca2bc12 Parents: db2adf3 Author: Wangda Tan <wan...@apache.org> Authored: Mon Mar 27 13:29:09 2017 -0700 Committer: Wangda Tan <wan...@apache.org> Committed: Mon Mar 27 13:29:09 2017 -0700 ---------------------------------------------------------------------- .../yarn/api/records/impl/pb/ProtoUtils.java | 6 ++-- .../server/resourcemanager/rmapp/RMAppImpl.java | 32 ++++++++++++-------- .../TestRMAppLogAggregationStatus.java | 2 ++ 3 files changed, 25 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd014d57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index ab283e7..926c757 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -296,6 +296,8 @@ public class ProtoUtils { * Log Aggregation Status */ private static final String LOG_AGGREGATION_STATUS_PREFIX = "LOG_"; + private static final int LOG_AGGREGATION_STATUS_PREFIX_LEN = + LOG_AGGREGATION_STATUS_PREFIX.length(); public static LogAggregationStatusProto convertToProtoFormat( LogAggregationStatus e) { return LogAggregationStatusProto.valueOf(LOG_AGGREGATION_STATUS_PREFIX @@ -304,8 +306,8 @@ public class ProtoUtils { public static LogAggregationStatus convertFromProtoFormat( LogAggregationStatusProto e) { - return LogAggregationStatus.valueOf(e.name().replace( - LOG_AGGREGATION_STATUS_PREFIX, "")); + return LogAggregationStatus.valueOf(e.name().substring( + LOG_AGGREGATION_STATUS_PREFIX_LEN)); } /* http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd014d57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 9f00b2e..f24908b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -177,8 +178,8 @@ public class RMAppImpl implements RMApp, Recoverable { private long logAggregationStartTime = 0; private final long logAggregationStatusTimeout; private final Map<NodeId, LogAggregationReport> logAggregationStatus = - new HashMap<NodeId, LogAggregationReport>(); - private LogAggregationStatus logAggregationStatusForAppReport; + new ConcurrentHashMap<NodeId, LogAggregationReport>(); + private volatile LogAggregationStatus logAggregationStatusForAppReport; private int logAggregationSucceed = 0; private int logAggregationFailed = 0; private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs = @@ -1697,26 +1698,23 @@ public class RMAppImpl implements RMApp, Recoverable { public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() { try { this.readLock.lock(); - Map<NodeId, LogAggregationReport> outputs = - new HashMap<NodeId, LogAggregationReport>(); - outputs.putAll(logAggregationStatus); - if (!isLogAggregationFinished()) { - for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) { + if (!isLogAggregationFinished() && isAppInFinalState(this) && + System.currentTimeMillis() > this.logAggregationStartTime + + this.logAggregationStatusTimeout) { + for (Entry<NodeId, LogAggregationReport> output : + logAggregationStatus.entrySet()) { if (!output.getValue().getLogAggregationStatus() .equals(LogAggregationStatus.TIME_OUT) && !output.getValue().getLogAggregationStatus() .equals(LogAggregationStatus.SUCCEEDED) && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.FAILED) - && isAppInFinalState(this) - && System.currentTimeMillis() > this.logAggregationStartTime - + this.logAggregationStatusTimeout) { + .equals(LogAggregationStatus.FAILED)) { output.getValue().setLogAggregationStatus( LogAggregationStatus.TIME_OUT); } } } - return outputs; + return Collections.unmodifiableMap(logAggregationStatus); } finally { this.readLock.unlock(); } @@ -1824,11 +1822,17 @@ public class RMAppImpl implements RMApp, Recoverable { // the log aggregation is finished. And the log aggregation status will // not be updated anymore. if (logFailedCount > 0 && isAppInFinalState(this)) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.FAILED; return LogAggregationStatus.FAILED; } else if (logTimeOutCount > 0) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.TIME_OUT; return LogAggregationStatus.TIME_OUT; } if (isAppInFinalState(this)) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.SUCCEEDED; return LogAggregationStatus.SUCCEEDED; } } else if (logRunningWithFailure > 0) { @@ -1844,7 +1848,9 @@ public class RMAppImpl implements RMApp, Recoverable { return this.logAggregationStatusForAppReport .equals(LogAggregationStatus.SUCCEEDED) || this.logAggregationStatusForAppReport - .equals(LogAggregationStatus.FAILED); + .equals(LogAggregationStatus.FAILED) + || this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.TIME_OUT); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd014d57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 55a4eac..677990b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -413,6 +413,8 @@ public class TestRMAppLogAggregationStatus { Assert.assertEquals(LogAggregationStatus.TIME_OUT, rmApp.getLogAggregationStatusForAppReport()); + rmApp = (RMAppImpl)createRMApp(conf); + rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL)); // If the log aggregation status for all NMs are SUCCEEDED and Application // is at the final state, the log aggregation status for this app will // return SUCCEEDED --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org