TEZ-3627. Use queue name available in RegisterApplicationMasterResponse for publishing to ATS. Contributed by Harish Jaiprakash.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8c311e41 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8c311e41 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8c311e41 Branch: refs/heads/TEZ-1190 Commit: 8c311e4127f18e6d468e361a7805f87e84544c25 Parents: 10ded7c Author: Siddharth Seth <[email protected]> Authored: Tue Feb 21 21:16:51 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Tue Feb 21 21:16:51 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/ATSConstants.java | 2 +- .../api/TaskSchedulerContext.java | 3 ++- .../java/org/apache/tez/dag/app/AppContext.java | 5 ++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 24 +++++++++++--------- .../dag/app/rm/TaskSchedulerContextImpl.java | 7 +++--- .../app/rm/TaskSchedulerContextImplWrapper.java | 11 +++++---- .../tez/dag/app/rm/TaskSchedulerManager.java | 4 +++- .../dag/app/rm/YarnTaskSchedulerService.java | 3 ++- .../impl/HistoryEventJsonConversion.java | 5 +++- .../tez/dag/app/rm/TestTaskScheduler.java | 3 ++- .../dag/app/rm/TestTaskSchedulerHelpers.java | 4 ++-- .../impl/TestHistoryEventJsonConversion.java | 4 +++- .../ats/HistoryEventTimelineConversion.java | 5 +++- .../ats/TestHistoryEventTimelineConversion.java | 7 ++++-- 15 files changed, 58 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f34252b..a5c59ca 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3627. Use queue name available in RegisterApplicationMasterResponse for publishing to ATS. TEZ-3610. TEZ UI 0.7 0.9 compatibility for url query params and tez-app sub-routes TEZ-3625. Dag.getVertex should obtain a readlock. TEZ-3624. Split multiple calls on the same line in TaskCommunicatorContextImpl. http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index 03c9fa1..25c802e 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -55,7 +55,7 @@ public class ATSConstants { public static final String DAG_PLAN = "dagPlan"; public static final String DAG_NAME = "dagName"; public static final String DAG_STATE = "dagState"; - public static final String DAG_SUBMITTED_QUEUE_NAME = "submittedQueueName"; + public static final String DAG_QUEUE_NAME = "queueName"; public static final String DAG_AM_WEB_SERVICE_VERSION = "amWebServiceVersion"; public static final String RECOVERY_FAILURE_REASON = "recoveryFailureReason"; public static final String VERTEX_NAME = "vertexName"; http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java index d30ada3..97fe7ae 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java @@ -131,7 +131,8 @@ public interface TaskSchedulerContext extends ServicePluginContextBase { void setApplicationRegistrationData( Resource maxContainerCapability, Map<ApplicationAccessType, String> appAcls, - ByteBuffer clientAMSecretKey + ByteBuffer clientAMSecretKey, + String queueName ); /** http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index 45ce8c1..b3d561a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -116,6 +117,10 @@ public interface AppContext { String getAMUser(); + String getQueueName(); + + void setQueueName(String queueName); + /** Whether the AM is in the process of shutting down/completing */ boolean isAMInCompletionState(); http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 5a43358..2085789 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1528,7 +1528,6 @@ public class DAGAppMaster extends AbstractService { } private class RunningAppContext implements AppContext { - private DAG dag; private DAGRecoveryData dagRecoveryData; private final Configuration conf; @@ -1537,6 +1536,8 @@ public class DAGAppMaster extends AbstractService { private final Lock rLock = rwLock.readLock(); private final Lock wLock = rwLock.writeLock(); private final EventHandler eventHandler; + private volatile String queueName; + public RunningAppContext(Configuration config) { checkNotNull(config, "config is null"); this.conf = config; @@ -1793,6 +1794,16 @@ public class DAGAppMaster extends AbstractService { public DAGRecoveryData getDAGRecoveryData() { return dagRecoveryData; } + + @Override + public String getQueueName() { + return queueName; + } + + @Override + public void setQueueName(String queueName) { + this.queueName = queueName; + } } private static class ServiceWithDependency implements ServiceStateChangeListener { @@ -2578,7 +2589,7 @@ public class DAGAppMaster extends AbstractService { // for an app later final DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(), submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources, - newDAG.getUserName(), newDAG.getConf(), containerLogs, getSubmittedQueueName()); + newDAG.getUserName(), newDAG.getConf(), containerLogs, getContext().getQueueName()); boolean dagLoggingEnabled = newDAG.getConf().getBoolean( TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT); @@ -2672,15 +2683,6 @@ public class DAGAppMaster extends AbstractService { }); } - private String getSubmittedQueueName() { - // TODO: Replace this with constant once the yarn patch is backported. (JIRA: TEZ-3279) - String submittedQueueName = System.getenv("YARN_RESOURCEMANAGER_APPLICATION_QUEUE"); - if (submittedQueueName == null) { - submittedQueueName = amConf.get(TezConfiguration.TEZ_QUEUE_NAME); - } - return submittedQueueName; - } - @SuppressWarnings("unchecked") private void sendEvent(Event<?> event) { dispatcher.getEventHandler().handle(event); http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java index fb4198b..39000d6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java @@ -92,9 +92,10 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext { @Override public void setApplicationRegistrationData(Resource maxContainerCapability, Map<ApplicationAccessType, String> appAcls, - ByteBuffer clientAMSecretKey) { - taskSchedulerManager.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, - clientAMSecretKey); + ByteBuffer clientAMSecretKey, + String queueName) { + taskSchedulerManager.setApplicationRegistrationData(schedulerId, maxContainerCapability, + appAcls, clientAMSecretKey, queueName); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java index 7e1988b..49ab77d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java @@ -95,9 +95,9 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext { @Override public void setApplicationRegistrationData(Resource maxContainerCapability, - Map<ApplicationAccessType, String> appAcls, ByteBuffer key) { + Map<ApplicationAccessType, String> appAcls, ByteBuffer key, String queueName) { executorService.submit(new SetApplicationRegistrationDataCallable(real, - maxContainerCapability, appAcls, key)); + maxContainerCapability, appAcls, key, queueName)); } @Override @@ -295,20 +295,23 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext { private final Resource maxContainerCapability; private final Map<ApplicationAccessType, String> appAcls; private final ByteBuffer key; + private final String queueName; public SetApplicationRegistrationDataCallable(TaskSchedulerContext app, Resource maxContainerCapability, Map<ApplicationAccessType, String> appAcls, - ByteBuffer key) { + ByteBuffer key, + String queueName) { super(app); this.maxContainerCapability = maxContainerCapability; this.appAcls = appAcls; this.key = key; + this.queueName = queueName; } @Override public Void call() throws Exception { - app.setApplicationRegistrationData(maxContainerCapability, appAcls, key); + app.setApplicationRegistrationData(maxContainerCapability, appAcls, key, queueName); return null; } } http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index 7989e5f..d32261f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -768,9 +768,11 @@ public class TaskSchedulerManager extends AbstractService implements int schedulerId, Resource maxContainerCapability, Map<ApplicationAccessType, String> appAcls, - ByteBuffer clientAMSecretKey) { + ByteBuffer clientAMSecretKey, + String queueName) { this.appContext.getClusterInfo().setMaxContainerCapability( maxContainerCapability); + this.appContext.setQueueName(queueName); this.appAcls = appAcls; this.clientService.setClientAMSecretKey(clientAMSecretKey); } http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 41d380a..95cd85b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -371,7 +371,8 @@ public class YarnTaskSchedulerService extends TaskScheduler getContext().setApplicationRegistrationData( response.getMaximumResourceCapability(), response.getApplicationACLs(), - response.getClientToAMTokenMasterKey()); + response.getClientToAMTokenMasterKey(), + response.getQueue()); delayedContainerManager.start(); } catch (YarnException e) { http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 69c40e4..e60575f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -490,6 +490,9 @@ public class HistoryEventJsonConversion { primaryFilters.put(ATSConstants.CALLER_CONTEXT_TYPE, event.getDAGPlan().getCallerContext().getCallerType()); } + if (event.getQueueName() != null) { + primaryFilters.put(ATSConstants.DAG_QUEUE_NAME, event.getQueueName()); + } jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters); @@ -516,7 +519,7 @@ public class HistoryEventJsonConversion { event.getDAGPlan().getCallerContext().getCallerType()); } if (event.getQueueName() != null) { - otherInfo.put(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName()); + otherInfo.put(ATSConstants.DAG_QUEUE_NAME, event.getQueueName()); } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index b3511e8..16c560e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -150,7 +150,8 @@ public class TestTaskScheduler { RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse(); verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(), regResponse.getApplicationACLs(), - regResponse.getClientToAMTokenMasterKey()); + regResponse.getClientToAMTokenMasterKey(), + regResponse.getQueue()); Assert.assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount()); http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index 9a845a1..35ab30b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -287,9 +287,9 @@ class TestTaskSchedulerHelpers { @Override public void setApplicationRegistrationData(Resource maxContainerCapability, - Map<ApplicationAccessType, String> appAcls, ByteBuffer key) { + Map<ApplicationAccessType, String> appAcls, ByteBuffer key, String queueName) { invocations++; - real.setApplicationRegistrationData(maxContainerCapability, appAcls, key); + real.setApplicationRegistrationData(maxContainerCapability, appAcls, key, queueName); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index 1bbecd3..081e47f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -204,7 +204,9 @@ public class TestHistoryEventJsonConversion { if (eventType == HistoryEventType.DAG_SUBMITTED) { try { Assert.assertEquals("Q_" + eventType.name(), json.getJSONObject(ATSConstants.OTHER_INFO) - .getString(ATSConstants.DAG_SUBMITTED_QUEUE_NAME)); + .getString(ATSConstants.DAG_QUEUE_NAME)); + Assert.assertEquals("Q_" + eventType.name(), json + .getJSONObject(ATSConstants.PRIMARY_FILTERS).getString(ATSConstants.DAG_QUEUE_NAME)); } catch (JSONException ex) { Assert.fail("Exception: " + ex.getMessage() + " for type: " + eventType); } http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 8d0c547..faccc98 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -400,6 +400,9 @@ public class HistoryEventTimelineConversion { atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID, event.getDAGPlan().getCallerContext().getCallerId()); } + if (event.getQueueName() != null) { + atsEntity.addPrimaryFilter(ATSConstants.DAG_QUEUE_NAME, event.getQueueName()); + } try { atsEntity.addOtherInfo(ATSConstants.DAG_PLAN, @@ -424,7 +427,7 @@ public class HistoryEventTimelineConversion { event.getDAGPlan().getCallerContext().getCallerType()); } if (event.getQueueName() != null) { - atsEntity.addOtherInfo(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName()); + atsEntity.addOtherInfo(ATSConstants.DAG_QUEUE_NAME, event.getQueueName()); } return atsEntity; http://git-wip-us.apache.org/repos/asf/tez/blob/8c311e41/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index bb189d3..28fd5da 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -496,7 +496,7 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue()); - Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size()); + Assert.assertEquals(5, timelineEntity.getPrimaryFilters().size()); Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains( @@ -509,6 +509,9 @@ public class TestHistoryEventTimelineConversion { applicationAttemptId.getApplicationId().toString())); Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_QUEUE_NAME) + .contains(queueName)); Assert.assertEquals(9, timelineEntity.getOtherInfo().size()); Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN)); @@ -532,7 +535,7 @@ public class TestHistoryEventTimelineConversion { timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE), dagPlan.getCallerContext().getCallerType()); Assert.assertEquals( - queueName, timelineEntity.getOtherInfo().get(ATSConstants.DAG_SUBMITTED_QUEUE_NAME)); + queueName, timelineEntity.getOtherInfo().get(ATSConstants.DAG_QUEUE_NAME)); }
