Repository: tez Updated Branches: refs/heads/master 8e9e5ae7f -> 5ce07f89f
TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not need protection. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5ce07f89 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5ce07f89 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5ce07f89 Branch: refs/heads/master Commit: 5ce07f89fb8adc04569184174a841843d4c31199 Parents: 8e9e5ae Author: Hitesh Shah <[email protected]> Authored: Tue Apr 5 07:56:32 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue Apr 5 07:56:32 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/tez/client/TezClient.java | 5 ++- .../tez/dag/history/HistoryEventType.java | 41 +++++++++++++++++++- .../tez/dag/history/TestHistoryEventType.java | 39 +++++++++++++++++++ .../ats/acls/TestATSHistoryWithACLs.java | 4 +- .../ats/ATSV15HistoryLoggingService.java | 11 +++++- .../logging/ats/ATSHistoryLoggingService.java | 14 +++++-- 7 files changed, 106 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5ce07f89/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d64388e..41752a9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not need protection. TEZ-3192. IFile#checkState creating unnecessary objects though auto-boxing TEZ-3173. Update Tez AM REST APIs for more information for each vertex. TEZ-3108. Add support for external services to local mode. @@ -412,6 +413,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not need protection. TEZ-3192. IFile#checkState creating unnecessary objects though auto-boxing TEZ-3189. Pre-warm dags should not be counted in submitted dags count by DAGAppMaster. TEZ-2967. Vertex start time should be that of first task start time in UI http://git-wip-us.apache.org/repos/asf/tez/blob/5ce07f89/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 43cdfb8..639d961 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -135,6 +135,7 @@ public class TezClient { private JavaOptsChecker javaOptsChecker = null; private int preWarmDAGCounter = 0; + private int dagCounter = 0; /* max submitDAG request size through IPC; beyond this we transfer them in the same way we transfer local resource */ private int maxSubmitDAGRequestSizeThroughIPC; @@ -494,6 +495,7 @@ public class TezClient { * if submission timed out */ public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException { + ++dagCounter; if (isSession) { return submitDAGSession(dag); } else { @@ -536,7 +538,8 @@ public class TezClient { if (historyACLPolicyManager != null && sessionHistoryLoggingEnabled) { try { aclConfigs = historyACLPolicyManager.setupSessionDAGACLs( - amConfig.getTezConfiguration(), sessionAppId, dag.getName(), dag.getDagAccessControls()); + amConfig.getTezConfiguration(), sessionAppId, + Integer.toString(dagCounter), dag.getDagAccessControls()); } catch (HistoryACLPolicyException e) { LOG.warn("Disabling history logging for dag " + dag.getName() + " due to error in setting up history acls " + e); http://git-wip-us.apache.org/repos/asf/tez/blob/5ce07f89/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java index 4e56e9f..9bf98df 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java @@ -18,6 +18,8 @@ package org.apache.tez.dag.history; +import org.apache.tez.dag.api.TezUncheckedException; + public enum HistoryEventType { APP_LAUNCHED, AM_LAUNCHED, @@ -41,5 +43,40 @@ public enum HistoryEventType { VERTEX_COMMIT_STARTED, VERTEX_GROUP_COMMIT_STARTED, VERTEX_GROUP_COMMIT_FINISHED, - DAG_RECOVERED -} + DAG_RECOVERED; + + public static boolean isDAGSpecificEvent(HistoryEventType historyEventType) { + switch (historyEventType) { + case APP_LAUNCHED: + case AM_LAUNCHED: + case AM_STARTED: + case CONTAINER_LAUNCHED: + case CONTAINER_STOPPED: + return false; + case DAG_SUBMITTED: + case DAG_INITIALIZED: + case DAG_STARTED: + case DAG_FINISHED: + case DAG_KILL_REQUEST: + case VERTEX_INITIALIZED: + case VERTEX_STARTED: + case VERTEX_CONFIGURE_DONE: + case VERTEX_FINISHED: + case TASK_STARTED: + case TASK_FINISHED: + case TASK_ATTEMPT_STARTED: + case TASK_ATTEMPT_FINISHED: + case DAG_COMMIT_STARTED: + case VERTEX_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_FINISHED: + case DAG_RECOVERED: + return true; + default: + throw new TezUncheckedException("Unhandled history event type: " + historyEventType.name()); + } + } + + + + } http://git-wip-us.apache.org/repos/asf/tez/blob/5ce07f89/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventType.java b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventType.java new file mode 100644 index 0000000..c37442e --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventType.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history; + +import org.junit.Assert; +import org.junit.Test; + +public class TestHistoryEventType { + + @Test + public void testDAGSpecificEventCheck() { + for (HistoryEventType eventType : HistoryEventType.values()) { + if (eventType.name().startsWith("AM_") + || eventType.name().startsWith("APP_") + || eventType.name().startsWith("CONTAINER_")) { + Assert.assertFalse(HistoryEventType.isDAGSpecificEvent(eventType)); + } else { + Assert.assertTrue(HistoryEventType.isDAGSpecificEvent(eventType)); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/5ce07f89/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java index 512913d..2c976f5 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java @@ -298,7 +298,7 @@ public class TestATSHistoryWithACLs { Collections.singleton("nobody"), Collections.singleton("nobody_group")); timelineDomain = getDomain(ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX - + applicationId.toString() + "_TezSleepProcessor"); + + applicationId.toString() + "_1"); verifyDomainACLs(timelineDomain, Sets.newHashSet("nobody", "nobody2"), Sets.newHashSet("nobody_group", "nobody_group2")); @@ -670,7 +670,7 @@ public class TestATSHistoryWithACLs { appEntity.getDomainId()); if (!sameDomain) { assertEquals(ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX + applicationId.toString() - + "_TezSleepProcessor", dagEntity.getDomainId()); + + "_1", dagEntity.getDomainId()); } else { assertEquals(appEntity.getDomainId(), dagEntity.getDomainId()); } http://git-wip-us.apache.org/repos/asf/tez/blob/5ce07f89/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java index 0c5b8d6..fc7e97a 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java @@ -367,9 +367,16 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()); + if (historyACLPolicyManager != null) { - if (domainId != null && !domainId.isEmpty()) { - historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); + if (HistoryEventType.isDAGSpecificEvent(event.getHistoryEvent().getEventType())) { + if (domainId != null && !domainId.isEmpty()) { + historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); + } + } else { + if (sessionDomainId != null && !sessionDomainId.isEmpty()) { + historyACLPolicyManager.updateTimelineEntityDomain(entity, sessionDomainId); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/5ce07f89/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java index a66da24..6b8d6e5 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java @@ -268,7 +268,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { } } - public void handle(DAGHistoryEvent event) { if (historyLoggingEnabled && timelineClient != null) { eventQueue.add(event); @@ -326,6 +325,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { for (int i = 0; i < events.size(); ++i) { DAGHistoryEvent event = events.get(i); String domainId = sessionDomainId; + TezDAGID dagId = event.getDagID(); if (historyACLPolicyManager != null && dagId != null) { @@ -335,11 +335,19 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { } entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()); + if (historyACLPolicyManager != null) { - if (domainId != null && !domainId.isEmpty()) { - historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId); + if (HistoryEventType.isDAGSpecificEvent(event.getHistoryEvent().getEventType())) { + if (domainId != null && !domainId.isEmpty()) { + historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId); + } + } else { + if (sessionDomainId != null && !sessionDomainId.isEmpty()) { + historyACLPolicyManager.updateTimelineEntityDomain(entities[i], sessionDomainId); + } } } + } if (LOG.isDebugEnabled()) {
