Repository: tez
Updated Branches:
  refs/heads/master c07ec7b6f -> a23de4982


http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
 
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
index f0ec1eb..d28ffe0 100644
--- 
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
+++ 
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
@@ -159,7 +158,6 @@ public class ATSV15HistoryACLPolicyManager implements 
HistoryACLPolicyManager {
     if (domainId != null) {
       // do nothing
       LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
-      return null;
     } else {
       if (!autoCreateDomain) {
         // Error - Cannot fallback to default as that leaves ACLs open
@@ -169,18 +167,13 @@ public class ATSV15HistoryACLPolicyManager implements 
HistoryACLPolicyManager {
       domainId = DOMAIN_ID_PREFIX + applicationId.toString();
       createTimelineDomain(applicationId, domainId, tezConf, 
dagAccessControls);
       LOG.info("Created Timeline Domain for History ACLs, domainId=" + 
domainId);
-      return 
Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, 
domainId);
     }
+    return 
Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, 
domainId);
   }
 
   private Map<String, String> createDAGDomain(Configuration tezConf,
       ApplicationId applicationId, String dagName, DAGAccessControls 
dagAccessControls)
       throws IOException, HistoryACLPolicyException {
-    if (dagAccessControls == null) {
-      // No DAG specific ACLs
-      return null;
-    }
-
     String domainId =
         tezConf.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
     if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
@@ -198,7 +191,6 @@ public class ATSV15HistoryACLPolicyManager implements 
HistoryACLPolicyManager {
     if (domainId != null) {
       // do nothing
       LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
-      return null;
     } else {
       if (!autoCreateDomain) {
         // Error - Cannot fallback to default as that leaves ACLs open
@@ -206,14 +198,17 @@ public class ATSV15HistoryACLPolicyManager implements 
HistoryACLPolicyManager {
             + " Domains is disabled");
       }
 
+      // Create a domain only if dagAccessControls has been specified.
+      if (dagAccessControls == null) {
+        return null;
+      }
       domainId = DOMAIN_ID_PREFIX + applicationId.toString() + "_" + dagName;
       createTimelineDomain(applicationId, domainId, tezConf, 
dagAccessControls);
       LOG.info("Created Timeline Domain for DAG-specific History ACLs, 
domainId=" + domainId);
-      return 
Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
     }
+    return 
Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
   }
 
-
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
 
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
index dd21d2d..a095cbc 100644
--- 
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
+++ 
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.logging.ats;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -36,11 +37,16 @@ import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyException;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezReflectionException;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.logging.HistoryLoggingService;
@@ -80,7 +86,9 @@ public class ATSV15HistoryLoggingService extends 
HistoryLoggingService {
       ATSV15HistoryLoggingService.class.getName();
   private static final String atsHistoryACLManagerClassName =
       "org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager";
-  private HistoryACLPolicyManager historyACLPolicyManager;
+
+  @VisibleForTesting
+  HistoryACLPolicyManager historyACLPolicyManager;
 
   private int numDagsPerGroup;
 
@@ -133,7 +141,6 @@ public class ATSV15HistoryLoggingService extends 
HistoryLoggingService {
     if (maxTimeToWaitOnShutdown < 0) {
       waitForeverOnShutdown = true;
     }
-    sessionDomainId = 
conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
 
     LOG.info("Initializing " + 
ATSV15HistoryLoggingService.class.getSimpleName() + " with "
         + ", maxPollingTime(ms)=" + maxPollingTimeMillis
@@ -165,6 +172,15 @@ public class ATSV15HistoryLoggingService extends 
HistoryLoggingService {
     }
     timelineClient.start();
 
+    // create a session domain id, if it fails then disable history logging.
+    try {
+      sessionDomainId = createSessionDomain();
+    } catch (HistoryACLPolicyException | IOException e) {
+      LOG.warn("Could not setup history acls, disabling history logging.", e);
+      historyLoggingEnabled = false;
+      return;
+    }
+
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -216,9 +232,6 @@ public class ATSV15HistoryLoggingService extends 
HistoryLoggingService {
 
   @Override
   public void serviceStop() {
-    if (!historyLoggingEnabled || timelineClient == null) {
-      return;
-    }
     LOG.info("Stopping ATSService"
         + ", eventQueueBacklog=" + eventQueue.size());
     stopped.set(true);
@@ -265,11 +278,12 @@ public class ATSV15HistoryLoggingService extends 
HistoryLoggingService {
       LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
           + ", eventQueueBacklog=" + eventQueue.size());
     }
-    timelineClient.stop();
+    if (timelineClient != null) {
+      timelineClient.stop();
+    }
     if (historyACLPolicyManager != null) {
       historyACLPolicyManager.close();
     }
-
   }
 
   @VisibleForTesting
@@ -331,13 +345,6 @@ public class ATSV15HistoryLoggingService extends 
HistoryLoggingService {
         skippedDAGs.add(dagId);
         return false;
       }
-      if (historyACLPolicyManager != null) {
-        String dagDomainId = dagSubmittedEvent.getConf().get(
-            TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
-        if (dagDomainId != null) {
-          dagDomainIdMap.put(dagId, dagDomainId);
-        }
-      }
     }
     if (eventType.equals(HistoryEventType.DAG_RECOVERED)) {
       DAGRecoveredEvent dagRecoveredEvent = (DAGRecoveredEvent) 
event.getHistoryEvent();
@@ -363,28 +370,17 @@ public class ATSV15HistoryLoggingService extends 
HistoryLoggingService {
   }
 
   private void handleEvents(DAGHistoryEvent event) {
-    String domainId = sessionDomainId;
-    TezDAGID dagId = event.getDagID();
-
-    if (historyACLPolicyManager != null && dagId != null) {
-      if (dagDomainIdMap.containsKey(dagId)) {
-        domainId = dagDomainIdMap.get(dagId);
-      }
+    String domainId = getDomainForEvent(event);
+    // skippedDags is updated in the above call so check again.
+    if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) {
+      return;
     }
 
-    TimelineEntity entity =
-        
HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
+    TimelineEntity entity = 
HistoryEventTimelineConversion.convertToTimelineEntity(
+        event.getHistoryEvent());
 
-    if (historyACLPolicyManager != null) {
-      if 
(HistoryEventType.isDAGSpecificEvent(event.getHistoryEvent().getEventType())) {
-        if (domainId != null && !domainId.isEmpty()) {
-          historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
-        }
-      } else {
-        if (sessionDomainId != null && !sessionDomainId.isEmpty()) {
-          historyACLPolicyManager.updateTimelineEntityDomain(entity, 
sessionDomainId);
-        }
-      }
+    if (historyACLPolicyManager != null && domainId != null && 
!domainId.isEmpty()) {
+      historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
     }
 
     try {
@@ -408,7 +404,99 @@ public class ATSV15HistoryLoggingService extends 
HistoryLoggingService {
     } catch (Exception e) {
       LOG.warn("Could not handle history events", e);
     }
+  }
 
+  private String getDomainForEvent(DAGHistoryEvent event) {
+    String domainId = sessionDomainId;
+    if (historyACLPolicyManager == null) {
+      return domainId;
+    }
+
+    TezDAGID dagId = event.getDagID();
+    HistoryEvent historyEvent = event.getHistoryEvent();
+    if (dagId == null || 
!HistoryEventType.isDAGSpecificEvent(historyEvent.getEventType())) {
+      return domainId;
+    }
+
+    if (dagDomainIdMap.containsKey(dagId)) {
+      // If we already have the domain for the dag id return it
+      domainId = dagDomainIdMap.get(dagId);
+      // Cleanup if this is the last event.
+      if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
+        dagDomainIdMap.remove(dagId);
+      }
+    } else if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()
+        || HistoryEventType.DAG_RECOVERED == historyEvent.getEventType()) {
+      // In case this is the first event for the dag, create and populate dag 
domain.
+      Configuration conf;
+      DAGPlan dagPlan;
+      if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()) {
+          conf = ((DAGSubmittedEvent)historyEvent).getConf();
+          dagPlan = ((DAGSubmittedEvent)historyEvent).getDAGPlan();
+      } else {
+         conf = appContext.getCurrentDAG().getConf();
+         dagPlan = appContext.getCurrentDAG().getJobPlan();
+      }
+      domainId = createDagDomain(conf, dagPlan, dagId);
+
+      // createDagDomain updates skippedDAGs so another check here.
+      if (skippedDAGs.contains(dagId)) {
+        return null;
+      }
+
+      dagDomainIdMap.put(dagId, domainId);
+    }
+    return domainId;
+  }
+
+  /**
+   * Creates a domain for the session.
+   * @return domainId to be used. null if acls are disabled.
+   * @throws HistoryACLPolicyException, IOException Forward if 
historyACLPolicyManger exception.
+   */
+  private String createSessionDomain() throws IOException, 
HistoryACLPolicyException {
+    if (historyACLPolicyManager == null) {
+      return null;
+    }
+    Map<String, String> domainInfo = 
historyACLPolicyManager.setupSessionACLs(getConfig(),
+        appContext.getApplicationID());
+    if (domainInfo != null) {
+      return domainInfo.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+    }
+    return null;
   }
 
+  /**
+   * When running in session mode, create a domain for the dag and return it.
+   * @param dagConf The configuration the dag for which domain has to be 
created.
+   * @param dagPlan The dag plan which contains the ACLs.
+   * @param dagId The dagId for which domain has to be created.
+   * @return The created domain id on success.
+   *     sessionDomainId: If there is a failure also disable history logging 
for this dag.
+   *     sessionDomainId: If historyACLPolicyManager returns null.
+   */
+  private String createDagDomain(Configuration dagConf, DAGPlan dagPlan, 
TezDAGID dagId) {
+    // In non session mode dag domain is same as session domain id.
+    if (!appContext.isSession()) {
+      return sessionDomainId;
+    }
+    DAGAccessControls dagAccessControls = dagPlan.hasAclInfo()
+        ? 
DagTypeConverters.convertDAGAccessControlsFromProto(dagPlan.getAclInfo())
+        : null;
+    try {
+      Map<String, String> domainInfo = 
historyACLPolicyManager.setupSessionDAGACLs(
+          dagConf, appContext.getApplicationID(), 
Integer.toString(dagId.getId()),
+          dagAccessControls);
+      if (domainInfo != null) {
+        return domainInfo.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+      }
+      // Fallback to session domain, if domainInfo was null
+      return sessionDomainId;
+    } catch (IOException | HistoryACLPolicyException e) {
+      LOG.warn("Could not setup ACLs for DAG, disabling history logging for 
dag.", e);
+      skippedDAGs.add(dagId);
+      // Return value is not used, check for skippedDAG is important.
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
 
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
index 6f653ad..a690a19 100644
--- 
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
+++ 
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
@@ -50,6 +50,7 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
@@ -300,6 +301,7 @@ public class TestATSHistoryV15 {
       ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService();
       AppContext appContext = mock(AppContext.class);
       when(appContext.getApplicationID()).thenReturn(appId);
+      when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
       service.setAppContext(appContext);
 
       TimelineEntityGroupId grpId = service.getGroupId(event);

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
 
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
index 1869b56..9111195 100644
--- 
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
+++ 
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
@@ -22,11 +22,18 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +51,8 @@ import 
org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import 
org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.AppContext;
@@ -66,10 +75,14 @@ public class TestATSV15HistoryLoggingService {
   private static String user = "TEST_USER";
 
   private InMemoryTimelineClient timelineClient;
+  private AppContext appContext;
 
   @Test(timeout=2000)
   public void testDAGGroupingDefault() throws Exception {
     ATSV15HistoryLoggingService service = createService(-1);
+
+    service.start();
+
     TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
     for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
       service.handle(event);
@@ -96,6 +109,8 @@ public class TestATSV15HistoryLoggingService {
   @Test(timeout=2000)
   public void testDAGGroupingDisabled() throws Exception {
     ATSV15HistoryLoggingService service = createService(1);
+    service.start();
+
     TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
     for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
       service.handle(event);
@@ -123,6 +138,8 @@ public class TestATSV15HistoryLoggingService {
   public void testDAGGroupingGroupingEnabled() throws Exception {
     int numDagsPerGroup = 100;
     ATSV15HistoryLoggingService service = createService(numDagsPerGroup);
+    service.start();
+
     TezDAGID dagId1 = TezDAGID.getInstance(appId, 1);
     for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
       service.handle(event);
@@ -172,14 +189,243 @@ public class TestATSV15HistoryLoggingService {
     service.stop();
   }
 
+  @Test
+  public void testNonSessionDomains() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    HistoryACLPolicyManager historyACLPolicyManager = 
mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
eq(appId)))
+    .thenReturn(Collections.singletonMap(
+        TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id"));
+
+    service.start();
+
+    verify(historyACLPolicyManager, times(1))
+        .setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, 
times(5)).updateTimelineEntityDomain(any(), eq("session-id"));
+    assertTrue(timelineClient.entityLog.size() > 0);
+
+    service.stop();
+  }
+
+  @Test
+  public void testNonSessionDomainsFailed() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    HistoryACLPolicyManager historyACLPolicyManager = 
mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
eq(appId)))
+    .thenThrow(new IOException());
+
+    service.start();
+
+    verify(historyACLPolicyManager, 
times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // History logging is disabled.
+    verify(historyACLPolicyManager, 
times(0)).updateTimelineEntityDomain(any(), (String)any());
+    assertEquals(0, timelineClient.entityLog.size());
+
+    service.stop();
+  }
+
+  @Test
+  public void testNonSessionDomainsAclNull() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    HistoryACLPolicyManager historyACLPolicyManager = 
mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
eq(appId)))
+    .thenReturn(null);
+
+    service.start();
+
+    verify(historyACLPolicyManager, 
times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // No domain updates but history logging happened.
+    verify(historyACLPolicyManager, 
times(0)).updateTimelineEntityDomain(any(), (String)any());
+    assertTrue(timelineClient.entityLog.size() > 0);
+
+    service.stop();
+  }
+
+  @Test
+  public void testSessionDomains() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    when(appContext.isSession()).thenReturn(true);
+
+    HistoryACLPolicyManager historyACLPolicyManager = 
mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
eq(appId)))
+    .thenReturn(Collections.singletonMap(
+        TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id"));
+
+    service.start();
+
+    // Verify that the session domain was created.
+    verify(historyACLPolicyManager, 
times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Mock dag domain creation.
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()))
+    .thenReturn(
+        Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, 
"dag-id"));
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // Verify dag domain was created.
+    verify(historyACLPolicyManager, times(1))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // calls were made with correct domain ids.
+    verify(historyACLPolicyManager, 
times(1)).updateTimelineEntityDomain(any(), eq("session-id"));
+    verify(historyACLPolicyManager, 
times(4)).updateTimelineEntityDomain(any(), eq("dag-id"));
+
+    service.stop();
+  }
+
+  @Test
+  public void testSessionDomainsFailed() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    when(appContext.isSession()).thenReturn(true);
+
+    HistoryACLPolicyManager historyACLPolicyManager = 
mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
eq(appId)))
+    .thenThrow(new IOException());
+
+    service.start();
+
+    // Verify that the session domain was created.
+    verify(historyACLPolicyManager, 
times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Mock dag domain creation.
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()))
+    .thenReturn(
+        Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, 
"dag-id"));
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag creation was done.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // No history logging calls were done
+    verify(historyACLPolicyManager, 
times(0)).updateTimelineEntityDomain(any(), (String)any());
+    assertEquals(0, timelineClient.entityLog.size());
+
+    service.stop();
+  }
+
+  @Test
+  public void testSessionDomainsDagFailed() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    when(appContext.isSession()).thenReturn(true);
+
+    HistoryACLPolicyManager historyACLPolicyManager = 
mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
eq(appId)))
+    .thenReturn(
+        
Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, 
"session-id"));
+
+    service.start();
+
+    // Verify that the session domain creation was called.
+    verify(historyACLPolicyManager, 
times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Mock dag domain creation.
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()))
+    .thenThrow(new IOException());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // Verify dag domain creation was called.
+    verify(historyACLPolicyManager, times(1))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // AM events sent, dag events are not sent.
+    verify(historyACLPolicyManager, 
times(1)).updateTimelineEntityDomain(any(), eq("session-id"));
+    verify(historyACLPolicyManager, 
times(0)).updateTimelineEntityDomain(any(), eq("dag-id"));
+    assertEquals(1, timelineClient.entityLog.size());
+
+    service.stop();
+  }
+
   private ATSV15HistoryLoggingService createService(int numDagsPerGroup) {
     ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService();
-    AppContext appContext = mock(AppContext.class);
+    appContext = mock(AppContext.class);
     when(appContext.getApplicationID()).thenReturn(appId);
     when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
     service.setAppContext(appContext);
 
-    Configuration conf = new Configuration();
+    Configuration conf = new Configuration(false);
     if (numDagsPerGroup != -1) {
       
conf.setInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP,
           numDagsPerGroup);
@@ -191,7 +437,6 @@ public class TestATSV15HistoryLoggingService {
     timelineClient.init(conf);
     service.timelineClient = timelineClient;
 
-    service.start();
     return service;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
 
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index 6b8d6e5..dc215fd 100644
--- 
a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ 
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.history.logging.ats;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -36,13 +38,17 @@ import 
org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import 
org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyException;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezReflectionException;
-import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.logging.HistoryLoggingService;
@@ -54,8 +60,8 @@ public class ATSHistoryLoggingService extends 
HistoryLoggingService {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ATSHistoryLoggingService.class);
 
-  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
-      new LinkedBlockingQueue<DAGHistoryEvent>();
+  @VisibleForTesting
+  LinkedBlockingQueue<DAGHistoryEvent> eventQueue = new 
LinkedBlockingQueue<DAGHistoryEvent>();
 
   private Thread eventHandlingThread;
   private AtomicBoolean stopped = new AtomicBoolean(false);
@@ -80,7 +86,9 @@ public class ATSHistoryLoggingService extends 
HistoryLoggingService {
       "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
   private static final String atsHistoryACLManagerClassName =
       "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
-  private HistoryACLPolicyManager historyACLPolicyManager;
+
+  @VisibleForTesting
+  HistoryACLPolicyManager historyACLPolicyManager;
 
   public ATSHistoryLoggingService() {
     super(ATSHistoryLoggingService.class.getName());
@@ -121,7 +129,6 @@ public class ATSHistoryLoggingService extends 
HistoryLoggingService {
     if (maxTimeToWaitOnShutdown < 0) {
       waitForeverOnShutdown = true;
     }
-    sessionDomainId = 
conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
 
     LOG.info("Initializing " + ATSHistoryLoggingService.class.getSimpleName() 
+ " with "
       + "maxEventsPerBatch=" + maxEventsPerBatch
@@ -150,8 +157,17 @@ public class ATSHistoryLoggingService extends 
HistoryLoggingService {
     if (!historyLoggingEnabled || timelineClient == null) {
       return;
     }
+
     timelineClient.start();
 
+    try {
+      sessionDomainId = createSessionDomain();
+    } catch (HistoryACLPolicyException | IOException e) {
+      LOG.warn("Could not setup history acls, disabling history logging.", e);
+      historyLoggingEnabled = false;
+      return;
+    }
+
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -200,9 +216,6 @@ public class ATSHistoryLoggingService extends 
HistoryLoggingService {
 
   @Override
   public void serviceStop() {
-    if (!historyLoggingEnabled || timelineClient == null) {
-      return;
-    }
     LOG.info("Stopping ATSService"
         + ", eventQueueBacklog=" + eventQueue.size());
     stopped.set(true);
@@ -242,7 +255,9 @@ public class ATSHistoryLoggingService extends 
HistoryLoggingService {
       LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
           + ", eventQueueBacklog=" + eventQueue.size());
     }
-    timelineClient.stop();
+    if (timelineClient != null) {
+      timelineClient.stop();
+    }
     if (historyACLPolicyManager != null) {
       historyACLPolicyManager.close();
     }
@@ -289,13 +304,6 @@ public class ATSHistoryLoggingService extends 
HistoryLoggingService {
         skippedDAGs.add(dagId);
         return false;
       }
-      if (historyACLPolicyManager != null) {
-        String dagDomainId = dagSubmittedEvent.getConf().get(
-            TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
-        if (dagDomainId != null) {
-          dagDomainIdMap.put(dagId, dagDomainId);
-        }
-      }
     }
     if (eventType.equals(HistoryEventType.DAG_RECOVERED)) {
       DAGRecoveredEvent dagRecoveredEvent = (DAGRecoveredEvent) 
event.getHistoryEvent();
@@ -321,33 +329,19 @@ public class ATSHistoryLoggingService extends 
HistoryLoggingService {
   }
 
   private void handleEvents(List<DAGHistoryEvent> events) {
-    TimelineEntity[] entities = new TimelineEntity[events.size()];
-    for (int i = 0; i < events.size(); ++i) {
-      DAGHistoryEvent event = events.get(i);
-      String domainId = sessionDomainId;
-
-      TezDAGID dagId = event.getDagID();
-
-      if (historyACLPolicyManager != null && dagId != null) {
-        if (dagDomainIdMap.containsKey(dagId)) {
-          domainId = dagDomainIdMap.get(dagId);
-        }
+    List<TimelineEntity> entities = new ArrayList<>(events.size());
+    for (DAGHistoryEvent event : events) {
+      String domainId = getDomainForEvent(event);
+      // skippedDags is updated in the above call so check again.
+      if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) {
+        continue;
       }
-
-      entities[i] = 
HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
-
-      if (historyACLPolicyManager != null) {
-        if 
(HistoryEventType.isDAGSpecificEvent(event.getHistoryEvent().getEventType())) {
-          if (domainId != null && !domainId.isEmpty()) {
-            historyACLPolicyManager.updateTimelineEntityDomain(entities[i], 
domainId);
-          }
-        } else {
-          if (sessionDomainId != null && !sessionDomainId.isEmpty()) {
-            historyACLPolicyManager.updateTimelineEntityDomain(entities[i], 
sessionDomainId);
-          }
-        }
+      TimelineEntity entity = 
HistoryEventTimelineConversion.convertToTimelineEntity(
+          event.getHistoryEvent());
+      entities.add(entity);
+      if (historyACLPolicyManager != null && domainId != null && 
!domainId.isEmpty()) {
+        historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
       }
-
     }
 
     if (LOG.isDebugEnabled()) {
@@ -355,7 +349,7 @@ public class ATSHistoryLoggingService extends 
HistoryLoggingService {
     }
     try {
       TimelinePutResponse response =
-          timelineClient.putEntities(entities);
+          timelineClient.putEntities(entities.toArray(new 
TimelineEntity[entities.size()]));
       if (response != null
         && !response.getErrors().isEmpty()) {
         int count = response.getErrors().size();
@@ -375,4 +369,97 @@ public class ATSHistoryLoggingService extends 
HistoryLoggingService {
     }
   }
 
+  private String getDomainForEvent(DAGHistoryEvent event) {
+    String domainId = sessionDomainId;
+    if (historyACLPolicyManager == null) {
+      return domainId;
+    }
+
+    TezDAGID dagId = event.getDagID();
+    HistoryEvent historyEvent = event.getHistoryEvent();
+    if (dagId == null || 
!HistoryEventType.isDAGSpecificEvent(historyEvent.getEventType())) {
+      return domainId;
+    }
+
+    if (dagDomainIdMap.containsKey(dagId)) {
+      // If we already have the domain for the dag id return it
+      domainId = dagDomainIdMap.get(dagId);
+      // Cleanup if this is the last event.
+      if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
+        dagDomainIdMap.remove(dagId);
+      }
+    } else if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()
+        || HistoryEventType.DAG_RECOVERED == historyEvent.getEventType()) {
+      // In case this is the first event for the dag, create and populate dag 
domain.
+      Configuration conf;
+      DAGPlan dagPlan;
+      if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()) {
+          conf = ((DAGSubmittedEvent)historyEvent).getConf();
+          dagPlan = ((DAGSubmittedEvent)historyEvent).getDAGPlan();
+      } else {
+         conf = appContext.getCurrentDAG().getConf();
+         dagPlan = appContext.getCurrentDAG().getJobPlan();
+      }
+      domainId = createDagDomain(conf, dagPlan, dagId);
+
+      // createDagDomain updates skippedDAGs so another check here.
+      if (skippedDAGs.contains(dagId)) {
+        return null;
+      }
+
+      dagDomainIdMap.put(dagId, domainId);
+    }
+    return domainId;
+  }
+
+  /**
+   * Creates a domain for the session.
+   * @return domainId to be used. null if acls are disabled.
+   * @throws HistoryACLPolicyException, IOException Forward if 
historyACLPolicyManger exception.
+   */
+  private String createSessionDomain() throws HistoryACLPolicyException, 
IOException {
+    if (historyACLPolicyManager == null) {
+      return null;
+    }
+    Map<String, String> domainInfo = 
historyACLPolicyManager.setupSessionACLs(getConfig(),
+        appContext.getApplicationID());
+    if (domainInfo != null) {
+      return domainInfo.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+    }
+    return null;
+  }
+
+  /**
+   * When running in session mode, create a domain for the dag and return it.
+   * @param dagConf The configuration the dag for which domain has to be 
created.
+   * @param dagPlan The dag plan which contains the ACLs.
+   * @param dagId The dagId for which domain has to be created.
+   * @return The created domain id on success.
+   *     sessionDomainId: If there is a failure also disable history logging 
for this dag.
+   *     sessionDomainId: If historyACLPolicyManager returns null.
+   */
+  private String createDagDomain(Configuration dagConf, DAGPlan dagPlan, 
TezDAGID dagId) {
+    // In non session mode dag domain is same as session domain id.
+    if (!appContext.isSession()) {
+      return sessionDomainId;
+    }
+    DAGAccessControls dagAccessControls = dagPlan.hasAclInfo()
+        ? 
DagTypeConverters.convertDAGAccessControlsFromProto(dagPlan.getAclInfo())
+        : null;
+    try {
+      Map<String, String> domainInfo = 
historyACLPolicyManager.setupSessionDAGACLs(
+          dagConf, appContext.getApplicationID(), 
Integer.toString(dagId.getId()),
+          dagAccessControls);
+      if (domainInfo != null) {
+        return domainInfo.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+      }
+      // Fallback to session domain, if domainInfo was null
+      return sessionDomainId;
+    } catch (IOException | HistoryACLPolicyException e) {
+      LOG.warn("Could not setup ACLs for DAG, disabling history logging for 
dag.", e);
+      skippedDAGs.add(dagId);
+      // Return value is not used, check for skippedDAG is important.
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
 
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
index 464864e..da57eb2 100644
--- 
a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ 
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -21,16 +21,30 @@ package org.apache.tez.dag.history.logging.ats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -39,9 +53,19 @@ import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 public class TestATSHistoryLoggingService {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestATSHistoryLoggingService.class);
@@ -51,11 +75,15 @@ public class TestATSHistoryLoggingService {
   private Configuration conf;
   private int atsInvokeCounter;
   private int atsEntitiesCounter;
+  private HistoryACLPolicyManager historyACLPolicyManager;
   private SystemClock clock = new SystemClock();
+  private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+  private static ApplicationAttemptId attemptId = 
ApplicationAttemptId.newInstance(appId, 1);
 
   @Before
   public void setup() throws Exception {
     appContext = mock(AppContext.class);
+    historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
     atsHistoryLoggingService = new ATSHistoryLoggingService();
     atsHistoryLoggingService.setAppContext(appContext);
     conf = new Configuration(false);
@@ -63,13 +91,16 @@ public class TestATSHistoryLoggingService {
         1000l);
     conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2);
     conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, 
true);
+    conf.set(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "test-domain");
     atsInvokeCounter = 0;
     atsEntitiesCounter = 0;
     atsHistoryLoggingService.init(conf);
+    atsHistoryLoggingService.historyACLPolicyManager = historyACLPolicyManager;
     atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
-    atsHistoryLoggingService.start();
+
     when(appContext.getClock()).thenReturn(clock);
     when(appContext.getCurrentDAGID()).thenReturn(null);
+    when(appContext.getApplicationID()).thenReturn(appId);
     when(atsHistoryLoggingService.timelineClient.putEntities(
         Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
         new Answer<Object>() {
@@ -96,6 +127,7 @@ public class TestATSHistoryLoggingService {
 
   @Test(timeout=20000)
   public void testATSHistoryLoggingServiceShutdown() {
+    atsHistoryLoggingService.start();
     TezDAGID tezDAGID = TezDAGID.getInstance(
         ApplicationId.newInstance(100l, 1), 1);
     DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
@@ -122,6 +154,7 @@ public class TestATSHistoryLoggingService {
 
   @Test(timeout=20000)
   public void testATSEventBatching() {
+    atsHistoryLoggingService.start();
     TezDAGID tezDAGID = TezDAGID.getInstance(
         ApplicationId.newInstance(100l, 1), 1);
     DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
@@ -145,9 +178,8 @@ public class TestATSHistoryLoggingService {
 
   @Test(timeout=20000)
   public void testTimelineServiceDisable() throws Exception {
+    atsHistoryLoggingService.start();
     ATSHistoryLoggingService atsHistoryLoggingService1;
-    AppContext appContext1;
-    appContext1 = mock(AppContext.class);
     atsHistoryLoggingService1 = new ATSHistoryLoggingService();
 
     atsHistoryLoggingService1.setAppContext(appContext);
@@ -160,7 +192,7 @@ public class TestATSHistoryLoggingService {
         ++atsInvokeCounter;
         atsEntitiesCounter += invocation.getArguments().length;
         try {
-          Thread.sleep(500l);
+          Thread.sleep(10l);
         } catch (InterruptedException e) {
           // do nothing
         }
@@ -181,7 +213,7 @@ public class TestATSHistoryLoggingService {
     }
 
     try {
-        Thread.sleep(1000l);
+        Thread.sleep(20l);
     } catch (InterruptedException e) {
         // Do nothing
     }
@@ -190,6 +222,238 @@ public class TestATSHistoryLoggingService {
     Assert.assertEquals(atsInvokeCounter, 0);
     Assert.assertEquals(atsEntitiesCounter, 0);
     Assert.assertNull(atsHistoryLoggingService1.timelineClient);
-    
+    atsHistoryLoggingService1.close();
+  }
+
+  @Test(timeout=10000)
+  public void testNonSessionDomains() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
(ApplicationId)any()))
+    .thenReturn(
+        
Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, 
"session-id"));
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs(
+        (Configuration)any(), (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, 
atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    Thread.sleep(2500);
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, 
times(5)).updateTimelineEntityDomain(any(), eq("session-id"));
+  }
+
+  @Test(timeout=10000)
+  public void testNonSessionDomainsFailed() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
(ApplicationId)any()))
+    .thenThrow(new IOException());
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs(
+        (Configuration)any(), (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, 
atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(1000);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, 
times(0)).updateTimelineEntityDomain(any(), eq("session-id"));
+    Assert.assertEquals(0, atsEntitiesCounter);
+  }
+
+  @Test(timeout=10000)
+  public void testNonSessionDomainsAclNull() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
(ApplicationId)any()))
+    .thenReturn(null);
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs(
+        (Configuration)any(), (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, 
atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    Thread.sleep(2500);
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, 
times(0)).updateTimelineEntityDomain(any(), eq("session-id"));
+    Assert.assertEquals(5, atsEntitiesCounter);
+  }
+
+  @Test(timeout=10000)
+  public void testSessionDomains() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
(ApplicationId)any()))
+    .thenReturn(
+        
Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, 
"test-domain"));
+
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), (ApplicationId)any(), eq("0"), 
(DAGAccessControls)any()))
+    .thenReturn(
+        Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, 
"dag-domain"));
+
+    when(appContext.isSession()).thenReturn(true);
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, 
times(1)).setupSessionACLs((Configuration)any(),
+        (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, 
atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    Thread.sleep(2500);
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(1))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, 
times(1)).updateTimelineEntityDomain(any(), eq("test-domain"));
+    verify(historyACLPolicyManager, 
times(4)).updateTimelineEntityDomain(any(), eq("dag-domain"));
+  }
+
+  @Test(timeout=10000)
+  public void testSessionDomainsFailed() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
(ApplicationId)any()))
+    .thenThrow(new IOException());
+
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), (ApplicationId)any(), eq("0"), 
(DAGAccessControls)any()))
+    .thenReturn(
+        Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, 
"dag-domain"));
+
+    when(appContext.isSession()).thenReturn(true);
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, 
times(1)).setupSessionACLs((Configuration)any(),
+        (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, 
atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(1000);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // No calls were made for domains.
+    verify(historyACLPolicyManager, 
times(0)).updateTimelineEntityDomain(any(), (String)any());
+    Assert.assertEquals(0, atsEntitiesCounter);
+  }
+
+  @Test(timeout=10000)
+  public void testSessionDomainsDagFailed() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
(ApplicationId)any()))
+    .thenReturn(Collections.singletonMap(
+        TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-domain"));
+
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), (ApplicationId)any(), eq("0"), 
(DAGAccessControls)any()))
+    .thenThrow(new IOException());
+
+    when(appContext.isSession()).thenReturn(true);
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, 
times(1)).setupSessionACLs((Configuration)any(),
+        (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, 
atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    Thread.sleep(2500);
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // DAG domain was called once.
+    verify(historyACLPolicyManager, times(1))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, times(1))
+        .updateTimelineEntityDomain(any(), eq("session-domain"));
+    verify(historyACLPolicyManager, times(1))
+        .updateTimelineEntityDomain(any(), (String)any());
+    Assert.assertEquals(1, atsEntitiesCounter);
+  }
+
+  @Test(timeout=10000)
+  public void testSessionDomainsAclNull() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), 
(ApplicationId)any()))
+    .thenReturn(null);
+
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), (ApplicationId)any(), eq("0"), 
(DAGAccessControls)any()))
+    .thenReturn(null);
+
+    when(appContext.isSession()).thenReturn(true);
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, 
times(1)).setupSessionACLs((Configuration)any(),
+        (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, 
atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    Thread.sleep(2500);
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(1))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), 
(DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, 
times(0)).updateTimelineEntityDomain(any(), (String)any());
+    Assert.assertEquals(5, atsEntitiesCounter);
+  }
+
+  private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
+      ATSHistoryLoggingService service) {
+    List<DAGHistoryEvent> historyEvents = new ArrayList<>();
+
+    long time = System.currentTimeMillis();
+    Configuration conf = new Configuration(service.getConfig());
+    historyEvents.add(new DAGHistoryEvent(null, new AMStartedEvent(attemptId, 
time, "user")));
+    historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, 
time,
+        DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null)));
+    TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
+    historyEvents.add(new DAGHistoryEvent(dagId, new 
VertexStartedEvent(vertexID, time, time)));
+    TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
+    historyEvents
+        .add(new DAGHistoryEvent(dagId, new TaskStartedEvent(tezTaskID, 
"test", time, time)));
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 
1), "test", time,
+            ContainerId.newContainerId(attemptId, 1), 
NodeId.newInstance("localhost", 8765), null,
+            null, null)));
+    return historyEvents;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git 
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java 
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index b0bacf4..3be7131 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -663,7 +663,8 @@ public class MRRSleepJob extends Configured implements Tool 
{
           " [-irt intermediateReduceSleepTime]" +
           " [-recordt recordSleepTime (msec)]" +
           " [-generateSplitsInAM (false)/true]" +
-          " [-writeSplitsToDfs (false)/true]");
+          " [-writeSplitsToDfs (false)/true]" +
+          " [-numDags numDagsToSubmit");
       ToolRunner.printGenericCommandUsage(System.err);
       return 2;
     }
@@ -676,6 +677,8 @@ public class MRRSleepJob extends Configured implements Tool 
{
     boolean writeSplitsToDfs = false;
     boolean generateSplitsInAM = false;
     boolean splitsOptionFound = false;
+    boolean isSession = false;
+    int numDags = 1;
 
     for(int i=0; i < args.length; i++ ) {
       if(args[i].equals("-m")) {
@@ -716,6 +719,12 @@ public class MRRSleepJob extends Configured implements 
Tool {
         }
         splitsOptionFound = true;
         writeSplitsToDfs = Boolean.parseBoolean(args[++i]);
+      } else if (args[i].equals("-numDags")) {
+        numDags = Integer.parseInt(args[++i]);
+        if (numDags < 1) {
+          throw new RuntimeException("numDags should be positive");
+        }
+        isSession = numDags > 1;
       }
     }
 
@@ -747,13 +756,20 @@ public class MRRSleepJob extends Configured implements 
Tool {
         mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
         iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs, 
generateSplitsInAM);
 
-    TezClient tezSession = TezClient.create("MRRSleep", conf, false, null, 
credentials);
+    TezClient tezSession = TezClient.create("MRRSleep", conf, isSession, null, 
credentials);
     tezSession.start();
-    DAGClient dagClient = tezSession.submitDAG(dag);
-    dagClient.waitForCompletion();
-    tezSession.stop();
-
-    return 
dagClient.getDAGStatus(null).getState().equals(DAGStatus.State.SUCCEEDED) ? 0 : 
1;
+    try {
+      for (; numDags > 0; --numDags) {
+        DAGClient dagClient = tezSession.submitDAG(dag);
+        dagClient.waitForCompletion();
+        if 
(!dagClient.getDAGStatus(null).getState().equals(DAGStatus.State.SUCCEEDED)) {
+          return 1;
+        }
+      }
+    } finally {
+      tezSession.stop();
+    }
+    return 0;
   }
 
 }

Reply via email to