Repository: tez Updated Branches: refs/heads/branch-0.7 be57068d1 -> d0fb2ca28
TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not need protection. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d0fb2ca2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d0fb2ca2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d0fb2ca2 Branch: refs/heads/branch-0.7 Commit: d0fb2ca2898c9ba826cd5be5f944d359bd542534 Parents: be57068 Author: Hitesh Shah <[email protected]> Authored: Tue Apr 5 08:01:42 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue Apr 5 08:01:42 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 5 ++- .../tez/dag/history/HistoryEventType.java | 41 +++++++++++++++++++- .../tez/dag/history/TestHistoryEventType.java | 39 +++++++++++++++++++ .../ats/acls/TestATSHistoryWithACLs.java | 4 +- .../logging/ats/ATSHistoryLoggingService.java | 14 +++++-- 6 files changed, 96 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d0fb2ca2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 15a2ce5..9ae49fd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: + TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not need protection. TEZ-3192. IFile#checkState creating unnecessary objects though auto-boxing TEZ-3189. Pre-warm dags should not be counted in submitted dags count by DAGAppMaster. TEZ-2967. Vertex start time should be that of first task start time in UI http://git-wip-us.apache.org/repos/asf/tez/blob/d0fb2ca2/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 56adf13..e8c1760 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -122,6 +122,7 @@ public class TezClient { private JavaOptsChecker javaOptsChecker = null; private int preWarmDAGCounter = 0; + private int dagCounter = 0; private static final String atsHistoryLoggingServiceClassName = "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService"; @@ -427,6 +428,7 @@ public class TezClient { * if submission timed out */ public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException { + ++dagCounter; if (isSession) { return submitDAGSession(dag); } else { @@ -468,7 +470,8 @@ public class TezClient { if (historyACLPolicyManager != null && sessionHistoryLoggingEnabled) { try { aclConfigs = historyACLPolicyManager.setupSessionDAGACLs( - amConfig.getTezConfiguration(), sessionAppId, dag.getName(), dag.getDagAccessControls()); + amConfig.getTezConfiguration(), sessionAppId, + Integer.toString(dagCounter), dag.getDagAccessControls()); } catch (HistoryACLPolicyException e) { LOG.warn("Disabling history logging for dag " + dag.getName() + " due to error in setting up history acls " + e); http://git-wip-us.apache.org/repos/asf/tez/blob/d0fb2ca2/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java index d791d9e..b03ad77 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java @@ -18,6 +18,8 @@ package org.apache.tez.dag.history; +import org.apache.tez.dag.api.TezUncheckedException; + public enum HistoryEventType { APP_LAUNCHED, AM_LAUNCHED, @@ -42,5 +44,40 @@ public enum HistoryEventType { VERTEX_COMMIT_STARTED, VERTEX_GROUP_COMMIT_STARTED, VERTEX_GROUP_COMMIT_FINISHED, - DAG_RECOVERED -} + DAG_RECOVERED; + + public static boolean isDAGSpecificEvent(HistoryEventType historyEventType) { + switch (historyEventType) { + case APP_LAUNCHED: + case AM_LAUNCHED: + case AM_STARTED: + case CONTAINER_LAUNCHED: + case CONTAINER_STOPPED: + return false; + case DAG_SUBMITTED: + case DAG_INITIALIZED: + case DAG_STARTED: + case DAG_FINISHED: + case DAG_KILL_REQUEST: + case VERTEX_INITIALIZED: + case VERTEX_STARTED: + case VERTEX_CONFIGURE_DONE: + case VERTEX_FINISHED: + case TASK_STARTED: + case TASK_FINISHED: + case TASK_ATTEMPT_STARTED: + case TASK_ATTEMPT_FINISHED: + case DAG_COMMIT_STARTED: + case VERTEX_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_FINISHED: + case DAG_RECOVERED: + return true; + default: + throw new TezUncheckedException("Unhandled history event type: " + historyEventType.name()); + } + } + + + + } http://git-wip-us.apache.org/repos/asf/tez/blob/d0fb2ca2/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventType.java b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventType.java new file mode 100644 index 0000000..c37442e --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventType.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history; + +import org.junit.Assert; +import org.junit.Test; + +public class TestHistoryEventType { + + @Test + public void testDAGSpecificEventCheck() { + for (HistoryEventType eventType : HistoryEventType.values()) { + if (eventType.name().startsWith("AM_") + || eventType.name().startsWith("APP_") + || eventType.name().startsWith("CONTAINER_")) { + Assert.assertFalse(HistoryEventType.isDAGSpecificEvent(eventType)); + } else { + Assert.assertTrue(HistoryEventType.isDAGSpecificEvent(eventType)); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/d0fb2ca2/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java index 512913d..2c976f5 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java @@ -298,7 +298,7 @@ public class TestATSHistoryWithACLs { Collections.singleton("nobody"), Collections.singleton("nobody_group")); timelineDomain = getDomain(ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX - + applicationId.toString() + "_TezSleepProcessor"); + + applicationId.toString() + "_1"); verifyDomainACLs(timelineDomain, Sets.newHashSet("nobody", "nobody2"), Sets.newHashSet("nobody_group", "nobody_group2")); @@ -670,7 +670,7 @@ public class TestATSHistoryWithACLs { appEntity.getDomainId()); if (!sameDomain) { assertEquals(ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX + applicationId.toString() - + "_TezSleepProcessor", dagEntity.getDomainId()); + + "_1", dagEntity.getDomainId()); } else { assertEquals(appEntity.getDomainId(), dagEntity.getDomainId()); } http://git-wip-us.apache.org/repos/asf/tez/blob/d0fb2ca2/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java index a66da24..6b8d6e5 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java @@ -268,7 +268,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { } } - public void handle(DAGHistoryEvent event) { if (historyLoggingEnabled && timelineClient != null) { eventQueue.add(event); @@ -326,6 +325,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { for (int i = 0; i < events.size(); ++i) { DAGHistoryEvent event = events.get(i); String domainId = sessionDomainId; + TezDAGID dagId = event.getDagID(); if (historyACLPolicyManager != null && dagId != null) { @@ -335,11 +335,19 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { } entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()); + if (historyACLPolicyManager != null) { - if (domainId != null && !domainId.isEmpty()) { - historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId); + if (HistoryEventType.isDAGSpecificEvent(event.getHistoryEvent().getEventType())) { + if (domainId != null && !domainId.isEmpty()) { + historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId); + } + } else { + if (sessionDomainId != null && !sessionDomainId.isEmpty()) { + historyACLPolicyManager.updateTimelineEntityDomain(entities[i], sessionDomainId); + } } } + } if (LOG.isDebugEnabled()) {
