TEZ-3267. Publish queue name to ATS as part of dag summary. Contributed by Harish Jaiprakash.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/16b93de8 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/16b93de8 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/16b93de8 Branch: refs/heads/TEZ-1190 Commit: 16b93de8f31a815cab63e0be0dc563549a688566 Parents: 11815a7 Author: Siddharth Seth <[email protected]> Authored: Wed Feb 8 18:32:26 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Wed Feb 8 18:32:26 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/ATSConstants.java | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 11 ++++++++- .../dag/history/events/DAGSubmittedEvent.java | 17 +++++++++++-- .../impl/HistoryEventJsonConversion.java | 3 +++ tez-dag/src/main/proto/HistoryEvents.proto | 1 + .../apache/tez/dag/app/TestRecoveryParser.java | 26 ++++++++++---------- .../dag/history/TestHistoryEventHandler.java | 2 +- .../TestHistoryEventsProtoConversion.java | 4 ++- .../impl/TestHistoryEventJsonConversion.java | 12 +++++++-- .../history/recovery/TestRecoveryService.java | 2 +- .../ats/acls/TestATSHistoryWithACLs.java | 4 +-- .../ats/TestATSV15HistoryLoggingService.java | 2 +- .../ats/HistoryEventTimelineConversion.java | 3 +++ .../ats/TestATSHistoryLoggingService.java | 2 +- .../ats/TestHistoryEventTimelineConversion.java | 10 +++++--- 16 files changed, 72 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0949339..215cb08 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3267. Publish queue name to ATS as part of dag summary. TEZ-3609. Improve ATSv15 performance for DAG entities read calls. TEZ-3244. Allow overlap of input and output memory when they are not concurrent TEZ-3581. Add different logger to enable suppressing logs for specific lines. http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index c56582c..03c9fa1 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -55,6 +55,7 @@ public class ATSConstants { public static final String DAG_PLAN = "dagPlan"; public static final String DAG_NAME = "dagName"; public static final String DAG_STATE = "dagState"; + public static final String DAG_SUBMITTED_QUEUE_NAME = "submittedQueueName"; public static final String DAG_AM_WEB_SERVICE_VERSION = "amWebServiceVersion"; public static final String RECOVERY_FAILURE_REASON = "recoveryFailureReason"; public static final String VERTEX_NAME = "vertexName"; http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index eaaf18b..7f27064 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2578,7 +2578,7 @@ public class DAGAppMaster extends AbstractService { // for an app later final DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(), submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources, - newDAG.getUserName(), newDAG.getConf(), containerLogs); + newDAG.getUserName(), newDAG.getConf(), containerLogs, getSubmittedQueueName()); boolean dagLoggingEnabled = newDAG.getConf().getBoolean( TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT); @@ -2671,6 +2671,15 @@ public class DAGAppMaster extends AbstractService { }); } + private String getSubmittedQueueName() { + // TODO: Replace this with constant once the yarn patch is backported. (JIRA: TEZ-3279) + String submittedQueueName = System.getenv("YARN_RESOURCEMANAGER_APPLICATION_QUEUE"); + if (submittedQueueName == null) { + submittedQueueName = amConf.get(TezConfiguration.TEZ_QUEUE_NAME); + } + return submittedQueueName; + } + @SuppressWarnings("unchecked") private void sendEvent(Event<?> event) { dispatcher.getEventHandler().handle(event); http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java index 07d7c07..1b1fdf3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java @@ -57,6 +57,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { private boolean historyLoggingEnabled = true; private Configuration conf; private String containerLogs; + private String queueName; public DAGSubmittedEvent() { } @@ -64,7 +65,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { public DAGSubmittedEvent(TezDAGID dagID, long submitTime, DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId, Map<String, LocalResource> cumulativeAdditionalLocalResources, - String user, Configuration conf, String containerLogs) { + String user, Configuration conf, String containerLogs, String queueName) { this.dagID = dagID; this.dagName = dagPlan.getName(); this.submitTime = submitTime; @@ -74,6 +75,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { this.user = user; this.conf = conf; this.containerLogs = containerLogs; + this.queueName = queueName; } @Override @@ -97,6 +99,9 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { .setApplicationAttemptId(applicationAttemptId.toString()) .setDagPlan(dagPlan) .setSubmitTime(submitTime); + if (queueName != null) { + builder.setQueueName(queueName); + } if (cumulativeAdditionalLocalResources != null && !cumulativeAdditionalLocalResources.isEmpty()) { builder.setCumulativeAdditionalAmResources(DagTypeConverters .convertFromLocalResources(cumulativeAdditionalLocalResources)); @@ -111,6 +116,9 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { this.submitTime = proto.getSubmitTime(); this.applicationAttemptId = ConverterUtils.toApplicationAttemptId( proto.getApplicationAttemptId()); + if (proto.hasQueueName()) { + this.queueName = proto.getQueueName(); + } if (proto.hasCumulativeAdditionalAmResources()) { this.cumulativeAdditionalLocalResources = DagTypeConverters.convertFromPlanLocalResources(proto .getCumulativeAdditionalAmResources()); @@ -134,7 +142,8 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { @Override public String toString() { return "dagID=" + dagID - + ", submitTime=" + submitTime; + + ", submitTime=" + submitTime + + ", queueName=" + queueName; } @Override @@ -203,4 +212,8 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { public String getContainerLogs() { return containerLogs; } + + public String getQueueName() { + return queueName; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index a767fbf..69c40e4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -515,6 +515,9 @@ public class HistoryEventJsonConversion { otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE, event.getDAGPlan().getCallerContext().getCallerType()); } + if (event.getQueueName() != null) { + otherInfo.put(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName()); + } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index ff3707d..7671469 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -54,6 +54,7 @@ message DAGSubmittedProto { optional int64 submit_time = 3; optional string application_attempt_id = 4; optional PlanLocalResourcesProto cumulative_additional_am_resources = 5; + optional string queue_name = 6; } message DAGInitializedProto { http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index f4edf9e..6673b39 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -169,7 +169,7 @@ public class TestRecoveryParser { rService.start(); rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); rService.handle(new DAGHistoryEvent(dagID, new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null))); // only for testing, DAGCommitStartedEvent is not supposed to happen at this time. @@ -215,7 +215,7 @@ public class TestRecoveryParser { rService.start(); rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); rService.handle(new DAGHistoryEvent(dagID, new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null))); rService.handle(new DAGHistoryEvent(dagID, @@ -264,7 +264,7 @@ public class TestRecoveryParser { rService.start(); rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // wait until DAGSubmittedEvent is handled in the RecoveryEventHandling thread rService.await(); rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA"); @@ -310,7 +310,7 @@ public class TestRecoveryParser { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // write an corrupted SummaryEvent rService.summaryStream.writeChars("INVALID_DATA"); rService.stop(); @@ -344,7 +344,7 @@ public class TestRecoveryParser { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. rService.handle(new DAGHistoryEvent(dagID, new DAGCommitStartedEvent(dagID, 0L))); @@ -376,7 +376,7 @@ public class TestRecoveryParser { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. rService.handle(new DAGHistoryEvent(dagID, new DAGCommitStartedEvent(dagID, 0L))); @@ -412,7 +412,7 @@ public class TestRecoveryParser { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. rService.handle(new DAGHistoryEvent(dagID, new VertexCommitStartedEvent(TezVertexID.getInstance(dagID, 0), 0L))); @@ -445,7 +445,7 @@ public class TestRecoveryParser { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. TezVertexID vertexId = TezVertexID.getInstance(dagID, 0); rService.handle(new DAGHistoryEvent(dagID, @@ -482,7 +482,7 @@ public class TestRecoveryParser { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. rService.handle(new DAGHistoryEvent(dagID, new VertexGroupCommitStartedEvent(dagID, "group_1", @@ -516,7 +516,7 @@ public class TestRecoveryParser { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. TezVertexID v0 = TezVertexID.getInstance(dagID, 0); TezVertexID v1 = TezVertexID.getInstance(dagID, 1); @@ -565,7 +565,7 @@ public class TestRecoveryParser { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. TezVertexID vertexId = TezVertexID.getInstance(dagID, 0); rService.handle(new DAGHistoryEvent(dagID, @@ -601,7 +601,7 @@ public class TestRecoveryParser { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. TezVertexID vertexId = TezVertexID.getInstance(dagID, 0); rService.handle(new DAGHistoryEvent(dagID, @@ -640,7 +640,7 @@ public class TestRecoveryParser { // DAG DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L, "user", "dagName", null); DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName"); http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java index 4c0fe3f..5a71a42 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java @@ -219,7 +219,7 @@ public class TestHistoryEventHandler { new AMStartedEvent(attemptId, time, user))); historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user, - conf, null))); + conf, null, "default"))); TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time))); http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 3d29a5d..47d8389 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -171,12 +171,13 @@ public class TestHistoryEventsProtoConversion { logEvents(event, deserializedEvent); } + private final String QUEUE_NAME = "TEST_QUEUE_NAME"; private void testDAGSubmittedEvent() throws Exception { DAGSubmittedEvent event = new DAGSubmittedEvent(TezDAGID.getInstance( ApplicationId.newInstance(0, 1), 1), 1001l, DAGPlan.newBuilder().setName("foo").build(), ApplicationAttemptId.newInstance( - ApplicationId.newInstance(0, 1), 1), null, "", null, null); + ApplicationId.newInstance(0, 1), 1), null, "", null, null, QUEUE_NAME); DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent) testProtoConversion(event); Assert.assertEquals(event.getApplicationAttemptId(), @@ -189,6 +190,7 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getSubmitTime()); Assert.assertEquals(event.getDAGPlan(), deserializedEvent.getDAGPlan()); + Assert.assertEquals(event.getQueueName(), deserializedEvent.getQueueName()); logEvents(event, deserializedEvent); } http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index 9477118..1bbecd3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -124,7 +124,7 @@ public class TestHistoryEventJsonConversion { break; case DAG_SUBMITTED: event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId, - null, user, null, null); + null, user, null, null, "Q_" + eventType.name()); break; case DAG_INITIALIZED: event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null); @@ -200,7 +200,15 @@ public class TestHistoryEventJsonConversion { if (event == null || !event.isHistoryEvent()) { continue; } - HistoryEventJsonConversion.convertToJson(event); + JSONObject json = HistoryEventJsonConversion.convertToJson(event); + if (eventType == HistoryEventType.DAG_SUBMITTED) { + try { + Assert.assertEquals("Q_" + eventType.name(), json.getJSONObject(ATSConstants.OTHER_INFO) + .getString(ATSConstants.DAG_SUBMITTED_QUEUE_NAME)); + } catch (JSONException ex) { + Assert.fail("Exception: " + ex.getMessage() + " for type: " + eventType); + } + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java index 3dec1d7..790e2d8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java @@ -333,7 +333,7 @@ public class TestRecoveryService { DAGPlan dagPlan = DAGPlan.newBuilder().setName("test_dag").build(); // This writes to recovery immediately. recoveryService.handle(new DAGHistoryEvent(dagId, new DAGSubmittedEvent( - dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null))); + dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null, "default"))); waitForDrain(-1); verify(summaryFos, times(1)).hflush(); verify(dagFos, times(1)).hflush(); http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java index 6b3ebd7..8e5c95c 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java @@ -401,7 +401,7 @@ public class TestATSHistoryWithACLs { DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID, 1, dagPlan, appAttemptId, null, - "usr", tezConf, null); + "usr", tezConf, null, null); submittedEvent.setHistoryLoggingEnabled(false); DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent); historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent)); @@ -446,7 +446,7 @@ public class TestATSHistoryWithACLs { DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID, 1, dagPlan, appAttemptId, null, - "usr", tezConf, null); + "usr", tezConf, null, null); submittedEvent.setHistoryLoggingEnabled(true); DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent); historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent)); http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java index 9111195..cbded35 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java @@ -450,7 +450,7 @@ public class TestATSV15HistoryLoggingService { new AMStartedEvent(attemptId, time, user))); historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user, - conf, null))); + conf, null, "default"))); TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time))); http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 96239c3..8d0c547 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -423,6 +423,9 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_TYPE, event.getDAGPlan().getCallerContext().getCallerType()); } + if (event.getQueueName() != null) { + atsEntity.addOtherInfo(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName()); + } return atsEntity; } http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java index da57eb2..a641cda 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java @@ -444,7 +444,7 @@ public class TestATSHistoryLoggingService { Configuration conf = new Configuration(service.getConfig()); historyEvents.add(new DAGHistoryEvent(null, new AMStartedEvent(attemptId, time, "user"))); historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time, - DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null))); + DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null, "default"))); TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time))); TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1); http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 62fb335..bb189d3 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -145,7 +145,7 @@ public class TestHistoryEventTimelineConversion { break; case DAG_SUBMITTED: event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId, - null, user, null, containerLogs); + null, user, null, containerLogs, null); break; case DAG_INITIALIZED: event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null); @@ -473,8 +473,9 @@ public class TestHistoryEventTimelineConversion { public void testConvertDAGSubmittedEvent() { long submitTime = random.nextLong(); + final String queueName = "TEST_DAG_SUBMITTED"; DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan, - applicationAttemptId, null, user, null, containerLogs); + applicationAttemptId, null, user, null, containerLogs, queueName); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); @@ -509,7 +510,7 @@ public class TestHistoryEventTimelineConversion { Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); - Assert.assertEquals(8, timelineEntity.getOtherInfo().size()); + Assert.assertEquals(9, timelineEntity.getOtherInfo().size()); Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN)); Assert.assertEquals(applicationId.toString(), timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); @@ -530,7 +531,8 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals( timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE), dagPlan.getCallerContext().getCallerType()); - + Assert.assertEquals( + queueName, timelineEntity.getOtherInfo().get(ATSConstants.DAG_SUBMITTED_QUEUE_NAME)); }
