TEZ-3404. Move blocking call for YARN Timeline domain creation from client side 
to AM. (Harish Jaiprakash via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a23de498
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a23de498
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a23de498

Branch: refs/heads/master
Commit: a23de4982e4ed0d55fb711745e7670b3be4b266e
Parents: c07ec7b
Author: Hitesh Shah <[email protected]>
Authored: Mon Sep 12 13:29:22 2016 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Mon Sep 12 13:29:22 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/tez/client/TezClient.java   |  89 +-----
 .../org/apache/tez/client/TezClientUtils.java   |  69 +----
 .../apache/tez/common/security/ACLManager.java  |  27 +-
 .../tez/common/security/DAGAccessControls.java  |  43 ++-
 .../security/HistoryACLPolicyManager.java       |  30 +-
 .../main/java/org/apache/tez/dag/api/DAG.java   |  32 +--
 .../apache/tez/dag/api/DagTypeConverters.java   |  25 ++
 tez-api/src/main/proto/DAGApiRecords.proto      |   8 +
 .../org/apache/tez/client/TestTezClient.java    |   6 +-
 .../apache/tez/client/TestTezClientUtils.java   |  18 +-
 .../tez/common/security/TestACLManager.java     |  24 +-
 .../common/security/TestDAGAccessControls.java  | 167 ++++++-----
 .../org/apache/tez/dag/api/TestDAGPlan.java     |  25 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   |  24 +-
 .../tez/dag/api/TestDagTypeConverters.java      |  38 +++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   2 +-
 .../ats/acls/ATSHistoryACLPolicyManager.java    |  16 +-
 .../ats/acls/TestATSHistoryWithACLs.java        | 202 +-------------
 .../ats/acls/ATSV15HistoryACLPolicyManager.java |  17 +-
 .../ats/ATSV15HistoryLoggingService.java        | 154 ++++++++---
 .../dag/history/ats/acls/TestATSHistoryV15.java |   2 +
 .../ats/TestATSV15HistoryLoggingService.java    | 251 ++++++++++++++++-
 .../logging/ats/ATSHistoryLoggingService.java   | 171 +++++++++---
 .../ats/TestATSHistoryLoggingService.java       | 276 ++++++++++++++++++-
 .../tez/mapreduce/examples/MRRSleepJob.java     |  30 +-
 26 files changed, 1127 insertions(+), 620 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd6ab68..99bae83 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3404. Move blocking call for YARN Timeline domain creation from client 
side to AM.
   TEZ-3272. Add AMContainerImpl and AMNodeImpl to StateMachine visualization 
list.
   TEZ-3284. Synchronization for every write in UnorderdKVWriter
   TEZ-3426. Second AM attempt launched for session mode and recovery disabled 
for certain cases

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 71ba6b2..780fcb7 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -57,7 +57,6 @@ import 
org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.util.Time;
 import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DAGSubmissionTimedOut;
@@ -69,7 +68,6 @@ import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezReflectionException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
@@ -79,7 +77,6 @@ import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequest
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
 import org.apache.tez.dag.api.client.DAGClientImpl;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.common.security.HistoryACLPolicyException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -136,11 +133,9 @@ public class TezClient {
   final TezApiVersionInfo apiVersionInfo;
   @VisibleForTesting
   final ServicePluginsDescriptor servicePluginsDescriptor;
-  private HistoryACLPolicyManager historyACLPolicyManager;
   private JavaOptsChecker javaOptsChecker = null;
 
   private int preWarmDAGCounter = 0;
-  private int dagCounter = 0;
 
   /* max submitDAG request size through IPC; beyond this we transfer them in 
the same way we transfer local resource */
   private int maxSubmitDAGRequestSizeThroughIPC;
@@ -148,15 +143,6 @@ public class TezClient {
   private AtomicInteger serializedSubmitDAGPlanRequestCounter = new 
AtomicInteger(0);
   private FileSystem stagingFs = null;
 
-  private static final String atsHistoryLoggingServiceClassName =
-      "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
-  private static final String atsHistoryACLManagerClassName =
-      "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
-  private static final String atsv15HistoryLoggingServiceClassName =
-      "org.apache.tez.dag.history.logging.ats.ATSV15HistoryLoggingService";
-  private static final String atsV15HistoryACLManagerClassName =
-      "org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager";
-
   private TezClient(String name, TezConfiguration tezConf) {
     this(name, tezConf, tezConf.getBoolean(
         TezConfiguration.TEZ_AM_SESSION_MODE, 
TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));    
@@ -375,12 +361,6 @@ public class TezClient {
         historyLogLevel);
   }
 
-  @Private
-  @VisibleForTesting
-  public synchronized void setUpHistoryAclManager(HistoryACLPolicyManager 
myAclPolicyManager) {
-    historyACLPolicyManager =  myAclPolicyManager;
-  }
-
   /**
    * Start the client. This establishes a connection to the YARN cluster.
    * In session mode, this start the App Master thats runs all the DAGs in the
@@ -395,40 +375,6 @@ public class TezClient {
     frameworkClient.init(amConfig.getTezConfiguration(), 
amConfig.getYarnConfiguration());
     frameworkClient.start();
 
-    ///need additional check for historyACLPolicyManager because tests could 
stub historyACLPolicyManager
-    ///before tezclient start. If there is already a stubbed 
historyACLPolicyManager, we don't overwrite it
-    if (historyACLPolicyManager == null) {
-      //TODO: FIXME: The ACL manager should be retrieved either from the
-      //logging service directly or via a pluggable factory that can
-      //instantiate ACL managers and logging services
-      String logSvcClassName = amConfig.getTezConfiguration().get(
-          TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "");
-      String aclMgrClassName = null;
-      if (logSvcClassName.equals(atsHistoryLoggingServiceClassName)) {
-        aclMgrClassName = atsHistoryACLManagerClassName;
-      } else if (logSvcClassName.equals(
-        atsv15HistoryLoggingServiceClassName)) {
-        aclMgrClassName = atsV15HistoryACLManagerClassName;
-      }
-      if (aclMgrClassName != null) {
-        LOG.info("Using " + aclMgrClassName + " to manage Timeline ACLs");
-        try {
-          historyACLPolicyManager = ReflectionUtils.createClazzInstance(
-              aclMgrClassName);
-          
historyACLPolicyManager.setConf(this.amConfig.getYarnConfiguration());
-        } catch (TezReflectionException e) {
-          if (!amConfig.getTezConfiguration().getBoolean(
-              TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
-              
TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {
-            LOG.warn("Could not instantiate object for " + aclMgrClassName
-                + ". ACLs cannot be enforced correctly for history data in 
Timeline", e);
-            throw e;
-          }
-          historyACLPolicyManager = null;
-        }
-      }
-    }
-
     if (this.amConfig.getTezConfiguration().getBoolean(
         TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED,
         TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED_DEFAULT)) {
@@ -476,7 +422,7 @@ public class TezClient {
                 sessionAppId,
                 null, clientName, amConfig,
                 tezJarResources, sessionCredentials, usingTezArchiveDeploy, 
apiVersionInfo,
-                historyACLPolicyManager, servicePluginsDescriptor, 
javaOptsChecker);
+                servicePluginsDescriptor, javaOptsChecker);
   
         // Set Tez Sessions to not retry on AM crashes if recovery is disabled
         if (!amConfig.getTezConfiguration().getBoolean(
@@ -511,7 +457,6 @@ public class TezClient {
    *           if submission timed out
    */  
   public synchronized DAGClient submitDAG(DAG dag) throws TezException, 
IOException {
-    ++dagCounter;
     if (isSession) {
       return submitDAGSession(dag);
     } else {
@@ -545,32 +490,9 @@ public class TezClient {
     }
 
     TezConfiguration dagClientConf = new 
TezConfiguration(amConfig.getTezConfiguration());
-    Map<String, String> aclConfigs = null;
-    // TEZ_AM_HISTORY_LOGGING_ENABLED is a config setting enable/disable 
logging of all
-    // dags within a session
-    boolean sessionHistoryLoggingEnabled = 
amConfig.getTezConfiguration().getBoolean(
-        TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
-        TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
-    if (historyACLPolicyManager != null && sessionHistoryLoggingEnabled) {
-      try {
-        aclConfigs = historyACLPolicyManager.setupSessionDAGACLs(
-            amConfig.getTezConfiguration(), sessionAppId,
-            Integer.toString(dagCounter), dag.getDagAccessControls());
-      } catch (HistoryACLPolicyException e) {
-        LOG.warn("Disabling history logging for dag " +
-          dag.getName() + " due to error in setting up history acls " + e);
-        dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false");
-        
dagClientConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, 
false);
-      }
-    } else if (!sessionHistoryLoggingEnabled) {
-      dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false");
-      
dagClientConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, 
false);
-    }
-
     Map<String, LocalResource> tezJarResources = 
getTezJarResources(sessionCredentials);
     DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, 
tezJarResources,
-        usingTezArchiveDeploy, sessionCredentials, aclConfigs, 
servicePluginsDescriptor,
-        javaOptsChecker);
+        usingTezArchiveDeploy, sessionCredentials, servicePluginsDescriptor, 
javaOptsChecker);
 
     SubmitDAGRequestProto.Builder requestBuilder = 
SubmitDAGRequestProto.newBuilder();
     requestBuilder.setDAGPlan(dagPlan);
@@ -644,10 +566,6 @@ public class TezClient {
    */
   public synchronized void stop() throws TezException, IOException {
     try {
-      if (historyACLPolicyManager != null) {
-        historyACLPolicyManager.close();
-      }
-
       if (sessionStarted) {
         LOG.info("Shutting down Tez Session"
             + ", sessionName=" + clientName
@@ -1032,8 +950,7 @@ public class TezClient {
       ApplicationSubmissionContext appContext = TezClientUtils
           .createApplicationSubmissionContext(
               appId, dag, dag.getName(), amConfig, tezJarResources, 
credentials,
-              usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager,
-              servicePluginsDescriptor, javaOptsChecker);
+              usingTezArchiveDeploy, apiVersionInfo, servicePluginsDescriptor, 
javaOptsChecker);
       String callerContextStr = "";
       if (dag.getCallerContext() != null) {
         callerContextStr = ", callerContext=" + 
dag.getCallerContext().contextAsSimpleString();

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index eb1a95e..f440b6f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -86,8 +86,6 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.common.VersionInfo;
 import org.apache.tez.common.security.ACLManager;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
-import org.apache.tez.common.security.HistoryACLPolicyException;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
@@ -445,7 +443,6 @@ public class TezClientUtils {
    * @param amConfig AM Configuration
    * @param tezJarResources Resources to be used by the AM
    * @param sessionCreds the credential object which will be populated with 
session specific
-   * @param historyACLPolicyManager
    * @param servicePluginsDescriptor descriptor for services which may be 
running in the AM
    * @return an ApplicationSubmissionContext to launch a Tez AM
    * @throws IOException
@@ -457,7 +454,7 @@ public class TezClientUtils {
       ApplicationId appId, DAG dag, String amName,
       AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
       Credentials sessionCreds, boolean tezLrsAsArchive,
-      TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager 
historyACLPolicyManager,
+      TezApiVersionInfo apiVersionInfo,
       ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker 
javaOptsChecker)
       throws IOException, YarnException {
 
@@ -565,41 +562,19 @@ public class TezClientUtils {
     }
     amLocalResources.putAll(tezJarResources);
 
-    // Setup Session ACLs and update conf as needed
-    Map<String, String> aclConfigs = null;
-    if (historyACLPolicyManager != null) {
-      if (dag == null) {
-        try{
-          aclConfigs = 
historyACLPolicyManager.setupSessionACLs(amConfig.getTezConfiguration(),
-              appId);
-        } catch (HistoryACLPolicyException e) {
-          LOG.warn("Disabling history logging for session " + strAppId +
-                   " due to error in setting up history acls " + e);
-          
amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
-              false);
-        }
-      } else {
-        try{
-          // Non-session mode
-          // As only a single DAG is support, we should combine AM and DAG 
ACLs under the same
-          // acl management layer
-          aclConfigs = 
historyACLPolicyManager.setupNonSessionACLs(amConfig.getTezConfiguration(),
-              appId, dag.getDagAccessControls());
-        } catch (HistoryACLPolicyException e) {
-          LOG.warn("Disabling history logging for dag " +
-              dag.getName() + " due to error in setting up history acls " + e);
-          dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, 
"false");
-          // This is non-session mode so disable logging for whole AM
-          
amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
-              false);
-        }
-      }
+    TezConfiguration tezConf = amConfig.getTezConfiguration();
+    // Merge the dag access controls into tez am config.
+    if (dag != null && dag.getDagAccessControls() != null) {
+      // Merge updates the conf object passed. In non session mode, same 
client object can be used
+      // to submit multiple dags, copying this prevents ACL of one DAG from 
being used in another.
+      tezConf = new TezConfiguration(amConfig.getTezConfiguration());
+      dag.getDagAccessControls().mergeIntoAmAcls(tezConf);
     }
 
     // emit conf as PB file
-    ConfigurationProto finalConfProto = 
createFinalConfProtoForApp(amConfig.getTezConfiguration(),
-        aclConfigs, servicePluginsDescriptor);
-    
+    ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf,
+        servicePluginsDescriptor);
+
     FSDataOutputStream amConfPBOutBinaryStream = null;
     try {
       amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, 
binaryConfPath);
@@ -737,20 +712,10 @@ public class TezClientUtils {
       Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
       Credentials credentials, ServicePluginsDescriptor 
servicePluginsDescriptor,
       JavaOptsChecker javaOptsChecker) throws IOException {
-    return prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, 
tezLrsAsArchive, credentials,
-        null, servicePluginsDescriptor, javaOptsChecker);
-  }
-
-  static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
-      Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
-      Credentials credentials, Map<String, String> additionalDAGConfigs,
-      ServicePluginsDescriptor servicePluginsDescriptor,
-      JavaOptsChecker javaOptsChecker) throws IOException {
     Credentials dagCredentials = setupDAGCredentials(dag, credentials,
         amConfig.getTezConfiguration());
     return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, 
tezJarResources,
-        amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs, 
servicePluginsDescriptor,
-        javaOptsChecker);
+        amConfig.getBinaryConfLR(), tezLrsAsArchive, servicePluginsDescriptor, 
javaOptsChecker);
   }
   
   static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> 
vargs) {
@@ -829,7 +794,7 @@ public class TezClientUtils {
   }
 
   static ConfigurationProto createFinalConfProtoForApp(Configuration amConf,
-    Map<String, String> additionalConfigs, ServicePluginsDescriptor 
servicePluginsDescriptor) {
+    ServicePluginsDescriptor servicePluginsDescriptor) {
     assert amConf != null;
     ConfigurationProto.Builder builder = ConfigurationProto.newBuilder();
     for (Entry<String, String> entry : amConf) {
@@ -838,14 +803,6 @@ public class TezClientUtils {
       kvp.setValue(amConf.get(entry.getKey()));
       builder.addConfKeyValues(kvp);
     }
-    if (additionalConfigs != null && !additionalConfigs.isEmpty()) {
-      for (Entry<String, String> entry : additionalConfigs.entrySet()) {
-        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
-        kvp.setKey(entry.getKey());
-        kvp.setValue(entry.getValue());
-        builder.addConfKeyValues(kvp);
-      }
-    }
 
     AMPluginDescriptorProto pluginDescriptorProto =
         
DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java 
b/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
index e1c7314..08680a5 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
@@ -26,15 +26,15 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 
 /**
  * Class to manage ACLs for the Tez AM and DAGs and provides functionality to 
check whether
@@ -42,8 +42,6 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @Private
 public class ACLManager {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ACLManager.class);
   public static final String WILDCARD_ACL_VALUE = "*";
 
   private final String dagUser;
@@ -75,7 +73,7 @@ public class ACLManager {
     }
   }
 
-  public ACLManager(ACLManager amACLManager, String dagUser, Configuration 
dagConf) {
+  public ACLManager(ACLManager amACLManager, String dagUser, ACLInfo aclInfo) {
     this.amUser = amACLManager.amUser;
     this.dagUser = dagUser;
     this.users = amACLManager.users;
@@ -84,12 +82,21 @@ public class ACLManager {
     if (!aclsEnabled) {
       return;
     }
-    ACLConfigurationParser parser = new ACLConfigurationParser(dagConf, true);
-    if (parser.getAllowedUsers() != null) {
-      this.users.putAll(parser.getAllowedUsers());
+    if (aclInfo.getUsersWithViewAccessCount() > 0) {
+      this.users.put(ACLType.DAG_VIEW_ACL,
+          Sets.newLinkedHashSet(aclInfo.getUsersWithViewAccessList()));
     }
-    if (parser.getAllowedGroups() != null) {
-      this.groups.putAll(parser.getAllowedGroups());
+    if (aclInfo.getUsersWithModifyAccessCount() > 0) {
+      this.users.put(ACLType.DAG_MODIFY_ACL,
+          Sets.newLinkedHashSet(aclInfo.getUsersWithModifyAccessList()));
+    }
+    if (aclInfo.getGroupsWithViewAccessCount() > 0) {
+      this.groups.put(ACLType.DAG_VIEW_ACL,
+          Sets.newLinkedHashSet(aclInfo.getGroupsWithViewAccessList()));
+    }
+    if (aclInfo.getGroupsWithModifyAccessCount() > 0) {
+      this.groups.put(ACLType.DAG_MODIFY_ACL,
+          Sets.newLinkedHashSet(aclInfo.getGroupsWithModifyAccessList()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java 
b/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
index 5fe352a..520416d 100644
--- 
a/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
+++ 
b/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
@@ -27,8 +27,11 @@ import java.util.Set;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 
+import com.google.common.collect.ImmutableMap;
+
 /**
  * Access controls for the DAG
  */
@@ -150,23 +153,37 @@ public class DAGAccessControls {
     return Collections.unmodifiableSet(groupsWithModifyACLs);
   }
 
+  /**
+   * Merge the dag acls with the AM acls in the configuration object. The 
config object will contain
+   * the updated acls.
+   * @param conf The AM config.
+   */
   @Private
-  public synchronized void serializeToConfiguration(Configuration conf) {
-    if (usersWithViewACLs.contains(ACLManager.WILDCARD_ACL_VALUE)) {
-      conf.set(TezConstants.TEZ_DAG_VIEW_ACLS, ACLManager.WILDCARD_ACL_VALUE);
+  public synchronized void mergeIntoAmAcls(Configuration conf) {
+    ACLConfigurationParser parser = new ACLConfigurationParser(conf, false);
+    parser.addAllowedGroups(ImmutableMap.of(
+        ACLType.AM_VIEW_ACL, groupsWithViewACLs, ACLType.AM_MODIFY_ACL, 
groupsWithModifyACLs));
+    parser.addAllowedUsers(ImmutableMap.of(
+        ACLType.AM_VIEW_ACL, usersWithViewACLs, ACLType.AM_MODIFY_ACL, 
usersWithModifyACLs));
+
+    Set<String> viewUsers = parser.getAllowedUsers().get(ACLType.AM_VIEW_ACL);
+    Set<String> viewGroups = 
parser.getAllowedGroups().get(ACLType.AM_VIEW_ACL);
+    if (viewUsers.contains(ACLManager.WILDCARD_ACL_VALUE)) {
+      conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, 
ACLManager.WILDCARD_ACL_VALUE);
     } else {
-      String userList = ACLManager.toCommaSeparatedString(usersWithViewACLs);
-      String groupList = ACLManager.toCommaSeparatedString(groupsWithViewACLs);
-      conf.set(TezConstants.TEZ_DAG_VIEW_ACLS,
-          userList + " " + groupList);
+      String userList = ACLManager.toCommaSeparatedString(viewUsers);
+      String groupList = ACLManager.toCommaSeparatedString(viewGroups);
+      conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, userList + " " + groupList);
     }
-    if (usersWithModifyACLs.contains(ACLManager.WILDCARD_ACL_VALUE)) {
-      conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, 
ACLManager.WILDCARD_ACL_VALUE);
+
+    Set<String> modifyUsers = 
parser.getAllowedUsers().get(ACLType.AM_MODIFY_ACL);
+    Set<String> modifyGroups = 
parser.getAllowedGroups().get(ACLType.AM_MODIFY_ACL);
+    if (modifyUsers.contains(ACLManager.WILDCARD_ACL_VALUE)) {
+      conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, 
ACLManager.WILDCARD_ACL_VALUE);
     } else {
-      String userList = ACLManager.toCommaSeparatedString(usersWithModifyACLs);
-      String groupList = 
ACLManager.toCommaSeparatedString(groupsWithModifyACLs);
-      conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, userList + " " + groupList);
+      String userList = ACLManager.toCommaSeparatedString(modifyUsers);
+      String groupList = ACLManager.toCommaSeparatedString(modifyGroups);
+      conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, userList + " " + 
groupList);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
 
b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
index fc0f57c..92eea67 100644
--- 
a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
+++ 
b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
@@ -30,15 +30,21 @@ import 
org.apache.tez.common.security.HistoryACLPolicyException;
 
 /**
  * ACL Policy Manager
- * An instance of this implements any ACL related activity when starting a 
session or
- * submitting a DAG
+ * An instance of this implements any ACL related activity when starting a 
session or submitting a 
+ * DAG. It is used in the HistoryLoggingService to create domain ids and 
populate entities with
+ * domain id.
  */
 @Unstable
 @Private
 public interface HistoryACLPolicyManager extends Configurable {
 
   /**
-   * Take any necessary steps for setting up Session ACLs
+   * Take any necessary steps for setting up both Session ACLs and non session 
acls. This is called
+   * with the am configuration which contains the ACL information to be used 
to create a domain.
+   * If the method returns a value, then its assumed to be a valid domain and 
used as domainId.
+   * If the method returns null, acls are disabled at session level, i.e use 
default acls at session
+   * level.
+   * If the method throws an Exception, history logging is disabled for the 
entire session.
    * @param conf Configuration
    * @param applicationId Application ID for the session
    * @throws Exception
@@ -47,7 +53,7 @@ public interface HistoryACLPolicyManager extends Configurable 
{
       throws IOException, HistoryACLPolicyException;
 
   /**
-   * Take any necessary steps for setting up ACLs for an AM which is running 
in non-session mode
+   * Not used currently.
    * @param conf Configuration
    * @param applicationId Application ID for the AM
    * @param dagAccessControls ACLs defined for the DAG being submitted
@@ -57,16 +63,26 @@ public interface HistoryACLPolicyManager extends 
Configurable {
       DAGAccessControls dagAccessControls) throws IOException, 
HistoryACLPolicyException;
 
   /**
-   * Take any necessary steps for setting up ACLs for a DAG that is submitted 
to a Session
+   * Take any necessary steps for setting up ACLs for a DAG that is submitted 
to a Session. This is
+   * called with dag configuration.
+   * If the method returns a value, then it is assumed to be valid domain and 
is used as a domainId
+   * for all of the dag events.
+   * If the method returns null, it falls back to session level acls.
+   * If the method throws Exception: it disables history logging for the dag 
events.
    * @param conf Configuration
    * @param applicationId Application ID for the AM
    * @param dagAccessControls ACLs defined for the DAG being submitted
    * @throws Exception
    */
   public Map<String, String> setupSessionDAGACLs(Configuration conf, 
ApplicationId applicationId,
-      String dagName, DAGAccessControls dagAccessControls) throws IOException, 
HistoryACLPolicyException;
-
+      String dagName, DAGAccessControls dagAccessControls)
+          throws IOException, HistoryACLPolicyException;
 
+  /**
+   * Called with a timeline entity which has to be updated with a domain id.
+   * @param timelineEntity The timeline entity which will be published.
+   * @param domainId The domainId returned by one of the setup*ACL calls.
+   */
   public void updateTimelineEntityDomain(Object timelineEntity, String 
domainId);
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 65321a8..f15c1fb 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -770,15 +769,15 @@ public class DAG {
                            Map<String, LocalResource> tezJarResources, 
LocalResource binaryConfig,
                            boolean tezLrsAsArchive) {
     return createDag(tezConf, extraCredentials, tezJarResources, binaryConfig, 
tezLrsAsArchive,
-        null, null, null);
+        null, null);
   }
 
   // create protobuf message describing DAG
   @Private
   public synchronized DAGPlan createDag(Configuration tezConf, Credentials 
extraCredentials,
       Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
-      boolean tezLrsAsArchive, Map<String, String> additionalConfigs,
-      ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker 
javaOptsChecker) {
+      boolean tezLrsAsArchive, ServicePluginsDescriptor 
servicePluginsDescriptor,
+      JavaOptsChecker javaOptsChecker) {
     Deque<String> topologicalVertexStack = verify(true);
 
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
@@ -1017,30 +1016,11 @@ public class DAG {
       dagBuilder.addEdge(edgeBuilder);
     }
 
-    ConfigurationProto.Builder confProtoBuilder =
-        ConfigurationProto.newBuilder();
     if (dagAccessControls != null) {
-      Configuration aclConf = new Configuration(false);
-      dagAccessControls.serializeToConfiguration(aclConf);
-      Iterator<Entry<String, String>> aclConfIter = aclConf.iterator();
-      while (aclConfIter.hasNext()) {
-        Entry<String, String> entry = aclConfIter.next();
-        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
-        kvp.setKey(entry.getKey());
-        kvp.setValue(entry.getValue());
-        TezConfiguration.validateProperty(entry.getKey(), Scope.DAG);
-        confProtoBuilder.addConfKeyValues(kvp);
-      }
-    }
-    if (additionalConfigs != null && !additionalConfigs.isEmpty()) {
-      for (Entry<String, String> entry : additionalConfigs.entrySet()) {
-        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
-        kvp.setKey(entry.getKey());
-        kvp.setValue(entry.getValue());
-        TezConfiguration.validateProperty(entry.getKey(), Scope.DAG);
-        confProtoBuilder.addConfKeyValues(kvp);
-      }
+      
dagBuilder.setAclInfo(DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls));
     }
+
+    ConfigurationProto.Builder confProtoBuilder = 
ConfigurationProto.newBuilder();
     if (!this.dagConf.isEmpty()) {
       for (Entry<String, String> entry : this.dagConf.entrySet()) {
         PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 5733da8..cefe026 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -50,6 +50,7 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
@@ -57,6 +58,7 @@ import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
@@ -863,4 +865,27 @@ public class DagTypeConverters {
     return callerContext;
   }
 
+  public static ACLInfo convertDAGAccessControlsToProto(DAGAccessControls 
dagAccessControls) {
+    if (dagAccessControls == null) {
+      return null;
+    }
+    ACLInfo.Builder builder = ACLInfo.newBuilder();
+    
builder.addAllUsersWithViewAccess(dagAccessControls.getUsersWithViewACLs());
+    
builder.addAllUsersWithModifyAccess(dagAccessControls.getUsersWithModifyACLs());
+    
builder.addAllGroupsWithViewAccess(dagAccessControls.getGroupsWithViewACLs());
+    
builder.addAllGroupsWithModifyAccess(dagAccessControls.getGroupsWithModifyACLs());
+    return builder.build();
+  }
+
+  public static DAGAccessControls convertDAGAccessControlsFromProto(ACLInfo 
aclInfo) {
+    if (aclInfo == null) {
+      return null;
+    }
+    DAGAccessControls dagAccessControls = new DAGAccessControls();
+    
dagAccessControls.setUsersWithViewACLs(aclInfo.getUsersWithViewAccessList());
+    
dagAccessControls.setUsersWithModifyACLs(aclInfo.getUsersWithModifyAccessList());
+    
dagAccessControls.setGroupsWithViewACLs(aclInfo.getGroupsWithViewAccessList());
+    
dagAccessControls.setGroupsWithModifyACLs(aclInfo.getGroupsWithModifyAccessList());
+    return dagAccessControls;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto 
b/tez-api/src/main/proto/DAGApiRecords.proto
index d016d60..c84094b 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -197,6 +197,13 @@ message CallerContextProto {
   optional string blob = 4;
 }
 
+message ACLInfo {
+  repeated string usersWithViewAccess = 1;
+  repeated string usersWithModifyAccess = 2;
+  repeated string groupsWithViewAccess = 3;
+  repeated string groupsWithModifyAccess = 4;
+}
+
 message DAGPlan {
   required string name = 1;
   repeated VertexPlan vertex = 2;
@@ -208,6 +215,7 @@ message DAGPlan {
   optional string dag_info = 8;
   optional VertexExecutionContextProto default_execution_context = 9;
   optional CallerContextProto caller_context = 10;
+  optional ACLInfo aclInfo = 11;
 }
 
 // DAG monitoring messages

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java 
b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index d49ba48..51f36a3 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -241,12 +240,10 @@ public class TestTezClient {
         LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
     
     TezClientForTest client = configureAndCreateTezClient(lrs, isSession, 
null);
-    HistoryACLPolicyManager mockAcl = mock(HistoryACLPolicyManager.class);
 
     ArgumentCaptor<ApplicationSubmissionContext> captor = 
ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
     
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
     .thenReturn(YarnApplicationState.RUNNING);
-    client.setUpHistoryAclManager(mockAcl);
     client.start();
     verify(client.mockYarnClient, times(1)).init((Configuration)any());
     verify(client.mockYarnClient, times(1)).start();
@@ -344,9 +341,8 @@ public class TestTezClient {
           (ShutdownSessionRequestProto) any());
     }
     verify(client.mockYarnClient, times(1)).stop();
-    verify(mockAcl, times(1)).close();
   }
-  
+
   @Test (timeout=5000)
   public void testPreWarm() throws Exception {
     TezClientForTest client = configureAndCreateTezClient();

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java 
b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index 2c69d77..49aae20 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -61,7 +60,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
@@ -133,7 +131,7 @@ public class TestTezClientUtils {
   /**
    *
    */
-  @Test (timeout=5000)
+  @Test (timeout=10000)
   public void validateSetTezJarLocalResourcesDefinedExistingDirectory() throws 
Exception {
     URL[] cp = ((URLClassLoader)ClassLoader.getSystemClassLoader()).getURLs();
     StringBuffer buffer = new StringBuffer();
@@ -314,7 +312,7 @@ public class TestTezClientUtils {
         appId, null, "dagname",
         amConf, m,
         credentials, false,
-        new TezApiVersionInfo(), null, null, null);
+        new TezApiVersionInfo(), null, null);
     assertEquals(testpriority, appcontext.getPriority().getPriority());
   }
 
@@ -366,7 +364,7 @@ public class TestTezClientUtils {
     ApplicationSubmissionContext appSubmissionContext =
         TezClientUtils.createApplicationSubmissionContext(appId, dag, 
"amName", amConf,
             new HashMap<String, LocalResource>(), credentials, false, new 
TezApiVersionInfo(),
-            mock(HistoryACLPolicyManager.class), null, null);
+            null, null);
 
     ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec();
     Map<String, ByteBuffer> amServiceData = amClc.getServiceData();
@@ -399,7 +397,7 @@ public class TestTezClientUtils {
     ApplicationSubmissionContext appSubmissionContext =
         TezClientUtils.createApplicationSubmissionContext(appId, dag, 
"amName", amConf,
             new HashMap<String, LocalResource>(), credentials, false, new 
TezApiVersionInfo(),
-            mock(HistoryACLPolicyManager.class), null, null);
+            null, null);
 
     List<String> expectedCommands = new LinkedList<String>();
     
expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
@@ -439,7 +437,7 @@ public class TestTezClientUtils {
     ApplicationSubmissionContext appSubmissionContext =
         TezClientUtils.createApplicationSubmissionContext(appId, dag, 
"amName", amConf,
             new HashMap<String, LocalResource>(), credentials, false, new 
TezApiVersionInfo(),
-            mock(HistoryACLPolicyManager.class), null, null);
+            null, null);
 
     List<String> expectedCommands = new LinkedList<String>();
     
expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
@@ -626,7 +624,7 @@ public class TestTezClientUtils {
     expected.put("property1", val1);
     expected.put("property2", expVal2);
 
-    ConfigurationProto confProto = 
TezClientUtils.createFinalConfProtoForApp(conf, null, null);
+    ConfigurationProto confProto = 
TezClientUtils.createFinalConfProtoForApp(conf, null);
 
     for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
       String v = expected.remove(kvPair.getKey());
@@ -730,7 +728,7 @@ public class TestTezClientUtils {
       srcConf.set(entry.getKey(), entry.getValue());
     }
 
-    ConfigurationProto confProto = 
TezClientUtils.createFinalConfProtoForApp(srcConf, null, null);
+    ConfigurationProto confProto = 
TezClientUtils.createFinalConfProtoForApp(srcConf, null);
 
     for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
       String val = confMap.remove(kvPair.getKey());
@@ -792,7 +790,7 @@ public class TestTezClientUtils {
     Configuration conf = new Configuration(false);
     ServicePluginsDescriptor servicePluginsDescriptor = 
ServicePluginsDescriptor.create(true);
 
-    ConfigurationProto confProto = 
TezClientUtils.createFinalConfProtoForApp(conf, null,
+    ConfigurationProto confProto = 
TezClientUtils.createFinalConfProtoForApp(conf,
         servicePluginsDescriptor);
 
     assertTrue(confProto.hasAmPluginDescriptor());

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java 
b/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
index a88e801..fc9c24e 100644
--- a/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
+++ b/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
@@ -20,18 +20,16 @@ package org.apache.tez.common.security;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.collect.Sets;
 
 public class TestACLManager {
 
@@ -64,7 +62,8 @@ public class TestACLManager {
     Assert.assertTrue(aclManager.checkAccess(user, ACLType.AM_VIEW_ACL));
     Assert.assertTrue(aclManager.checkAccess(user, ACLType.AM_MODIFY_ACL));
 
-    ACLManager dagAclManager = new ACLManager(aclManager, 
dagUser.getShortUserName(), new Configuration(false));
+    ACLManager dagAclManager = new ACLManager(aclManager, 
dagUser.getShortUserName(),
+        ACLInfo.getDefaultInstance());
     user = dagUser;
     Assert.assertFalse(dagAclManager.checkAccess(user, ACLType.AM_VIEW_ACL));
     Assert.assertFalse(dagAclManager.checkAccess(user, ACLType.AM_MODIFY_ACL));
@@ -256,17 +255,20 @@ public class TestACLManager {
     conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, modifyACLs);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, yarnAdminACLs);
 
-    // DAG View ACLs: user1, user4, grp3, grp4.
-    String dagViewACLs = "user6,   grp5  ";
-    // DAG Modify ACLs: user3, grp6, grp7
-    String dagModifyACLs = "user6,user5 ";
-    conf.set(TezConstants.TEZ_DAG_VIEW_ACLS, dagViewACLs);
-    conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, dagModifyACLs);
+    ACLInfo.Builder builder = ACLInfo.newBuilder();
+    // DAG View ACLs: user6, grp5
+    builder.addUsersWithViewAccess("user6");
+    builder.addGroupsWithViewAccess("grp5");
+
+    // DAG Modify ACLs: user6,user5
+    builder.addUsersWithModifyAccess("user6");
+    builder.addUsersWithModifyAccess("user7");
 
     UserGroupInformation dagUser = 
UserGroupInformation.createUserForTesting("dagUser", noGroups);
 
     ACLManager amAclManager = new ACLManager(currentUser.getShortUserName(), 
conf);
-    ACLManager aclManager = new ACLManager(amAclManager, 
dagUser.getShortUserName(), conf);
+    ACLManager aclManager = new ACLManager(amAclManager, 
dagUser.getShortUserName(),
+        builder.build());
 
     Assert.assertTrue(aclManager.checkAMViewAccess(currentUser));
     Assert.assertFalse(aclManager.checkAMViewAccess(dagUser));

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
 
b/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
index 6afc83b..4335a20 100644
--- 
a/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
+++ 
b/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
@@ -18,76 +18,14 @@
 
 package org.apache.tez.common.security;
 
-import java.util.Arrays;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class TestDAGAccessControls {
-
-  @Test(timeout = 5000)
-  public void testBasicSerializeToConf()  {
-    DAGAccessControls dagAccessControls = new DAGAccessControls();
-    dagAccessControls.setUsersWithViewACLs(Arrays.asList("u1"))
-        .setUsersWithModifyACLs(Arrays.asList("u2"))
-        .setGroupsWithViewACLs(Arrays.asList("g1"))
-        .setGroupsWithModifyACLs(Arrays.asList("g2"));
-
-    Configuration conf = new Configuration(false);
-    dagAccessControls.serializeToConfiguration(conf);
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-
-    Assert.assertEquals("u1 g1", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertEquals("u2 g2", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-  }
-
-  @Test(timeout = 5000)
-  public void testWildCardSerializeToConf()  {
-    DAGAccessControls dagAccessControls = new DAGAccessControls();
-    dagAccessControls.setUsersWithViewACLs(Arrays.asList("*"))
-        .setUsersWithModifyACLs(Arrays.asList("*"))
-        .setGroupsWithViewACLs(Arrays.asList("g1"))
-        .setGroupsWithModifyACLs(Arrays.asList("g2"));
-
-    Configuration conf = new Configuration(false);
-    dagAccessControls.serializeToConfiguration(conf);
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-
-    Assert.assertEquals("*", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertEquals("*", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-  }
-
-  @Test(timeout = 5000)
-  public void testGroupsOnlySerializeToConf()  {
-    DAGAccessControls dagAccessControls = new DAGAccessControls();
-    dagAccessControls.setGroupsWithViewACLs(Arrays.asList("g1"))
-        .setGroupsWithModifyACLs(Arrays.asList("g2"));
-
-    Configuration conf = new Configuration(false);
-    dagAccessControls.serializeToConfiguration(conf);
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
+import com.google.common.collect.Sets;
 
-    Assert.assertEquals(" g1", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertEquals(" g2", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-  }
-
-  @Test(timeout = 5000)
-  public void testEmptySerializeToConf()  {
-    DAGAccessControls dagAccessControls = new DAGAccessControls();
-
-    Configuration conf = new Configuration(false);
-    dagAccessControls.serializeToConfiguration(conf);
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-
-    Assert.assertEquals(" ", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertEquals(" ", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-  }
+public class TestDAGAccessControls {
 
   @Test(timeout = 5000)
   public void testStringBasedConstructor() {
@@ -102,8 +40,107 @@ public class TestDAGAccessControls {
     
Assert.assertTrue(dagAccessControls.getUsersWithModifyACLs().contains("u2"));
     
Assert.assertTrue(dagAccessControls.getGroupsWithViewACLs().contains("g1"));
     
Assert.assertTrue(dagAccessControls.getGroupsWithModifyACLs().contains("g2"));
+  }
 
+
+  @Test(timeout=5000)
+  public void testMergeIntoAmAcls() {
+    DAGAccessControls dagAccessControls = new DAGAccessControls("u1 g1", "u2 
g2");
+    Configuration conf = new Configuration(false);
+
+    // default conf should have ACLs copied over.
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("u1 g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("u2 g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // both have unique users merged should have all
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u1 g1");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u2 g2");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("u1 g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("u2 g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // both have unique users merged should have all
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("u3,u1 g3,g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("u4,u2 g4,g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // one of the user is *, merged is always *
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // only * in the config, merged is *
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // DAG access with *, all operation yeild *
+    dagAccessControls = new DAGAccessControls("*", "*");
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // DAG access is empty, conf should be same.
+    dagAccessControls = new DAGAccessControls("", "");
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("u3 g3", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("u4 g4", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
   }
 
+  public void assertACLS(String expected, String obtained) {
+    if (expected.equals(obtained)) {
+      return;
+    }
 
+    String [] parts1 = expected.split(" ");
+    String [] parts2 = obtained.split(" ");
+
+    Assert.assertEquals(parts1.length, parts2.length);
+
+    Assert.assertEquals(
+        Sets.newHashSet(parts1[0].split(",")), 
Sets.newHashSet(parts2[0].split(",")));
+
+    if (parts1.length < 2) {
+      return;
+    }
+    Assert.assertEquals(
+        Sets.newHashSet(parts1[1].split(",")), 
Sets.newHashSet(parts2[1].split(",")));
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java 
b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 005c027..8e1011f 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -370,14 +370,14 @@ public class TestDAGPlan {
     dag.addVertex(v1);
 
     // Should succeed. Default context is containers.
-    dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+    dag.createDag(new TezConfiguration(false), null, null, null, true,
         servicePluginsDescriptor, null);
 
 
     // Set execute in AM should fail
     v1.setExecutionContext(VertexExecutionContext.createExecuteInAm(true));
     try {
-      dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+      dag.createDag(new TezConfiguration(false), null, null, null, true,
           servicePluginsDescriptor, null);
       fail("Expecting dag create to fail due to invalid 
ServicePluginDescriptor");
     } catch (IllegalStateException e) {
@@ -386,13 +386,13 @@ public class TestDAGPlan {
 
     // Valid context
     v1.setExecutionContext(validExecContext);
-    dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+    dag.createDag(new TezConfiguration(false), null, null, null, true,
         servicePluginsDescriptor, null);
 
     // Invalid task scheduler
     v1.setExecutionContext(invalidExecContext1);
     try {
-      dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+      dag.createDag(new TezConfiguration(false), null, null, null, true,
           servicePluginsDescriptor, null);
       fail("Expecting dag create to fail due to invalid 
ServicePluginDescriptor");
     } catch (IllegalStateException e) {
@@ -404,7 +404,7 @@ public class TestDAGPlan {
     // Invalid ContainerLauncher
     v1.setExecutionContext(invalidExecContext2);
     try {
-      dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+      dag.createDag(new TezConfiguration(false), null, null, null, true,
           servicePluginsDescriptor, null);
       fail("Expecting dag create to fail due to invalid 
ServicePluginDescriptor");
     } catch (IllegalStateException e) {
@@ -416,7 +416,7 @@ public class TestDAGPlan {
     // Invalid task comm
     v1.setExecutionContext(invalidExecContext3);
     try {
-      dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+      dag.createDag(new TezConfiguration(false), null, null, null, true,
           servicePluginsDescriptor, null);
       fail("Expecting dag create to fail due to invalid 
ServicePluginDescriptor");
     } catch (IllegalStateException e) {
@@ -444,7 +444,8 @@ public class TestDAGPlan {
             new 
ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create("plugin", 
null)},
             new 
TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create("plugin", 
null)});
 
-    Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 
1)).setExecutionContext(v1Context);
+    Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1))
+        .setExecutionContext(v1Context);
     Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1));
     v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, 
String>())
         .addTaskLocalFiles(new HashMap<String, LocalResource>());
@@ -462,7 +463,7 @@ public class TestDAGPlan {
     dag.addVertex(v1).addVertex(v2).addEdge(edge);
     dag.setExecutionContext(defaultExecutionContext);
 
-    DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, 
true, null,
+    DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, 
true,
         servicePluginsDescriptor, null);
 
     assertEquals(2, dagProto.getVertexCount());
@@ -502,8 +503,7 @@ public class TestDAGPlan {
     TezConfiguration conf = new TezConfiguration(false);
     conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, "  -XX:+UseParallelGC 
");
     try {
-      DAGPlan dagProto = dag.createDag(conf, null, null, null, true, null, 
null,
-          new JavaOptsChecker());
+      dag.createDag(conf, null, null, null, true, null, new JavaOptsChecker());
       fail("Expected dag creation to fail for invalid java opts");
     } catch (TezUncheckedException e) {
       Assert.assertTrue(e.getMessage().contains("Invalid/conflicting GC 
options"));
@@ -511,12 +511,11 @@ public class TestDAGPlan {
 
     // Should not fail as java opts valid
     conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, "  -XX:-UseParallelGC 
");
-    DAGPlan dagProto1 = dag.createDag(conf, null, null, null, true, null, null,
-        new JavaOptsChecker());
+    dag.createDag(conf, null, null, null, true, null, new JavaOptsChecker());
 
     // Should not fail as no checker enabled
     conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, "  -XX:+UseParallelGC 
");
-    DAGPlan dagProto2 = dag.createDag(conf, null, null, null, true, null, 
null, null);
+    dag.createDag(conf, null, null, null, true, null, null);
 
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java 
b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index c566b1a..794a597 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.api;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,9 +35,8 @@ import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.junit.Assert;
@@ -1044,21 +1044,11 @@ public class TestDAGVerify {
     Assert.assertNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
     Assert.assertNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
 
-    ConfigurationProto confProto = dagPlan.getDagConf();
-    boolean foundViewAcls = false;
-    boolean foundModifyAcls = false;
-
-    for (PlanKeyValuePair pair : confProto.getConfKeyValuesList()) {
-      if (pair.getKey().equals(TezConstants.TEZ_DAG_VIEW_ACLS)) {
-        foundViewAcls = true;
-        Assert.assertEquals("u1 g1", pair.getValue());
-      } else if (pair.getKey().equals(TezConstants.TEZ_DAG_MODIFY_ACLS)) {
-        foundModifyAcls = true;
-        Assert.assertEquals("*", pair.getValue());
-      }
-    }
-    Assert.assertTrue(foundViewAcls);
-    Assert.assertTrue(foundModifyAcls);
+    ACLInfo aclInfo = dagPlan.getAclInfo();
+    Assert.assertEquals(Collections.singletonList("u1"), 
aclInfo.getUsersWithViewAccessList());
+    Assert.assertEquals(Collections.singletonList("g1"), 
aclInfo.getGroupsWithViewAccessList());
+    Assert.assertEquals(Collections.singletonList("*"), 
aclInfo.getUsersWithModifyAccessList());
+    Assert.assertEquals(Collections.singletonList("g2"), 
aclInfo.getGroupsWithModifyAccessList());
   }
 
   // v1 has input initializer

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java 
b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index 6f795fc..dc04f2d 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
@@ -44,6 +46,8 @@ import 
org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
 public class TestDagTypeConverters {
 
   @Test(timeout = 5000)
@@ -208,6 +212,40 @@ public class TestDagTypeConverters {
     verifyPlugins(proto.getTaskCommunicatorsList(), 1, testComm, true);
   }
 
+  @Test
+  public void testAclConversions() {
+    DAGAccessControls dagAccessControls = new DAGAccessControls("u1,u2 g1,g2", 
"u3,u4 g3,g4");
+    ACLInfo aclInfo = 
DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+    assertSame(dagAccessControls, aclInfo);
+    assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), 
aclInfo);
+
+    dagAccessControls = new DAGAccessControls("u1 ", "u2 ");
+    aclInfo = 
DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+    assertSame(dagAccessControls, aclInfo);
+    assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), 
aclInfo);
+
+    dagAccessControls = new DAGAccessControls(" g1", " g3,g4");
+    aclInfo = 
DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+    assertSame(dagAccessControls, aclInfo);
+    assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), 
aclInfo);
+
+    dagAccessControls = new DAGAccessControls("*", "*");
+    aclInfo = 
DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+    assertSame(dagAccessControls, aclInfo);
+    assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), 
aclInfo);
+  }
+
+  private void assertSame(DAGAccessControls dagAccessControls, ACLInfo 
aclInfo) {
+    assertEquals(dagAccessControls.getUsersWithViewACLs(),
+        Sets.newHashSet(aclInfo.getUsersWithViewAccessList()));
+    assertEquals(dagAccessControls.getUsersWithModifyACLs(),
+        Sets.newHashSet(aclInfo.getUsersWithModifyAccessList()));
+    assertEquals(dagAccessControls.getGroupsWithViewACLs(),
+        Sets.newHashSet(aclInfo.getGroupsWithViewAccessList()));
+    assertEquals(dagAccessControls.getGroupsWithModifyACLs(),
+        Sets.newHashSet(aclInfo.getGroupsWithModifyAccessList()));
+  }
+
   private void verifyPlugins(List<TezNamedEntityDescriptorProto> entities, int 
expectedCount,
                              String baseString, boolean hasPayload) {
     assertEquals(expectedCount, entities.size());

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index fd6d446..481353b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -542,7 +542,7 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
     }
 
     this.aclManager = new ACLManager(appContext.getAMACLManager(), 
dagUGI.getShortUserName(),
-        this.dagConf);
+        this.jobPlan.getAclInfo());
     // this is only for recovery in case it does not call the init transition
     this.startDAGCpuTime = appContext.getCumulativeCPUTime();
     this.startDAGGCTime = appContext.getCumulativeGCTime();

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
 
b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
index 91ffe7b..45e24ce 100644
--- 
a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
+++ 
b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
@@ -154,7 +154,6 @@ public class ATSHistoryACLPolicyManager implements 
HistoryACLPolicyManager {
     if (domainId != null) {
       // do nothing
       LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
-      return null;
     } else {
       if (!autoCreateDomain) {
         // Error - Cannot fallback to default as that leaves ACLs open
@@ -164,18 +163,13 @@ public class ATSHistoryACLPolicyManager implements 
HistoryACLPolicyManager {
       domainId = DOMAIN_ID_PREFIX + applicationId.toString();
       createTimelineDomain(domainId, tezConf, dagAccessControls);
       LOG.info("Created Timeline Domain for History ACLs, domainId=" + 
domainId);
-      return 
Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, 
domainId);
     }
+    return 
Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, 
domainId);
   }
 
   private Map<String, String> createDAGDomain(Configuration tezConf,
       ApplicationId applicationId, String dagName, DAGAccessControls 
dagAccessControls)
       throws IOException, HistoryACLPolicyException {
-    if (dagAccessControls == null) {
-      // No DAG specific ACLs
-      return null;
-    }
-
     String domainId =
         tezConf.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
     if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
@@ -193,7 +187,6 @@ public class ATSHistoryACLPolicyManager implements 
HistoryACLPolicyManager {
     if (domainId != null) {
       // do nothing
       LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
-      return null;
     } else {
       if (!autoCreateDomain) {
         // Error - Cannot fallback to default as that leaves ACLs open
@@ -201,14 +194,17 @@ public class ATSHistoryACLPolicyManager implements 
HistoryACLPolicyManager {
             + " Domains is disabled");
       }
 
+      // Create a domain only if dagAccessControls has been specified.
+      if (dagAccessControls == null) {
+        return null;
+      }
       domainId = DOMAIN_ID_PREFIX + applicationId.toString() + "_" + dagName;
       createTimelineDomain(domainId, tezConf, dagAccessControls);
       LOG.info("Created Timeline Domain for DAG-specific History ACLs, 
domainId=" + domainId);
-      return 
Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
     }
+    return 
Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
   }
 
-
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
 
b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
index 2c976f5..6b3ebd7 100644
--- 
a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
+++ 
b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.ReflectionUtils;
@@ -54,6 +53,7 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
@@ -73,7 +73,6 @@ import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 
-import org.mockito.Matchers;
 
 public class TestATSHistoryWithACLs {
 
@@ -307,170 +306,6 @@ public class TestATSHistoryWithACLs {
   }
 
   /**
-   * test failure of domain creation during dag submittion in session mode
-   * only affect logging for that dag not following submitted dag 
-   * @throws Exception
-   */
-  @Test (timeout=50000)
-  public void testMultipleDagSession() throws Exception {
-    TezClient tezSession = null;
-    String viewAcls = "nobody nobody_group";
-    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
-    DAG dag = DAG.create("TezSleepProcessor");
-    Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-            
SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-            Resource.newInstance(256, 1));
-    dag.addVertex(vertex);
-    DAGAccessControls accessControls = new DAGAccessControls();
-    accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
-    
accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
-    dag.setAccessControls(accessControls);
-
-    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-    tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
-    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
-        ATSHistoryLoggingService.class.getName());
-    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", 
String.valueOf(random
-        .nextInt(100000))));
-    remoteFs.mkdirs(remoteStagingDir);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, 
remoteStagingDir.toString());
-
-    tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
-    tezSession.start();
-
-    //////submit first dag which fails in dag creation//////
-    ATSHistoryACLPolicyManager myAclPolicyManager = 
ReflectionUtils.createClazzInstance(
-              atsHistoryACLManagerClassName);
-    myAclPolicyManager.timelineClient = mock(TimelineClient.class);
-
-    doThrow(new IOException("Fail to Put 
Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
-    tezSession.setUpHistoryAclManager(myAclPolicyManager);
-
-    DAGClient dagClient = tezSession.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.getDAGStatus(null);
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current 
state: "
-          + dagStatus.getState());
-      Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(null);
-    }
-    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    String dagLogging = 
dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    assertEquals(dagLogging, "false");
-    
-    myAclPolicyManager.timelineClient = null;
-    myAclPolicyManager.setConf(tezConf);
-    tezSession.setUpHistoryAclManager(myAclPolicyManager);
-
-    //////submit second dag which succeeds in dag creation//////
-    DAG dag2 = DAG.create("TezSleepProcessor2");
-    vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-          
SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-       Resource.newInstance(256, 1));
-    dag2.addVertex(vertex);
-    accessControls = new DAGAccessControls();
-    accessControls.setUsersWithViewACLs(Collections.singleton("nobody3"));
-    
accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3"));
-    dag2.setAccessControls(accessControls);
-    dagClient = tezSession.submitDAG(dag2);
-    dagStatus = dagClient.getDAGStatus(null);
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current 
state: "
-                + dagStatus.getState());
-      Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(null);
-    }
-    dagLogging = 
dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    Assert.assertNull(dagLogging);
-    myAclPolicyManager.timelineClient = spy(myAclPolicyManager.timelineClient);
-    tezSession.stop();
-    verify(myAclPolicyManager.timelineClient, times(1)).stop();
-  }
-  
-/**
- * test failure of domain creation during dag submittion in nonsession mode
- * only affect logging for that dag not following submitted dag 
- * @throws Exception
- */
-  @Test (timeout=50000)
-  public void testMultipleDagNonSession() throws Exception {
-    TezClient tezClient = null;
-    String viewAcls = "nobody nobody_group";
-    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
-    DAG dag = DAG.create("TezSleepProcessor");
-    Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-            
SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-            Resource.newInstance(256, 1));
-    dag.addVertex(vertex);
-    DAGAccessControls accessControls = new DAGAccessControls();
-    accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
-    
accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
-    dag.setAccessControls(accessControls);
-
-    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-    tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
-    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
-        ATSHistoryLoggingService.class.getName());
-    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", 
String.valueOf(random
-        .nextInt(100000))));
-    remoteFs.mkdirs(remoteStagingDir);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, 
remoteStagingDir.toString());
-
-    tezClient = TezClient.create("TezSleepProcessor", tezConf, false);
-    tezClient.start();
-
-    //////submit first dag which fails in dag creation//////
-    ATSHistoryACLPolicyManager myAclPolicyManager = 
ReflectionUtils.createClazzInstance(
-              atsHistoryACLManagerClassName);
-    myAclPolicyManager.timelineClient = mock(TimelineClient.class);
-
-    doThrow(new IOException("Fail to Put 
Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
-    tezClient.setUpHistoryAclManager(myAclPolicyManager);
-
-    DAGClient dagClient = tezClient.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.getDAGStatus(null);
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current 
state: "
-          + dagStatus.getState());
-      Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(null);
-    }
-    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    String dagLogging = 
dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    assertEquals(dagLogging, "false");
-    
-    myAclPolicyManager.timelineClient = null;
-    myAclPolicyManager.setConf(tezConf);
-    tezClient.setUpHistoryAclManager(myAclPolicyManager);
-
-    //////submit second dag which succeeds in dag creation//////
-    DAG dag2 = DAG.create("TezSleepProcessor2");
-    vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-           
SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-        Resource.newInstance(256, 1));
-    dag2.addVertex(vertex);
-    accessControls = new DAGAccessControls();
-    accessControls.setUsersWithViewACLs(Collections.singleton("nobody3"));
-    
accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3"));
-    dag2.setAccessControls(accessControls);
-    dagClient = tezClient.submitDAG(dag2);
-    dagStatus = dagClient.getDAGStatus(null);
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current 
state: "
-                   + dagStatus.getState());
-      Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(null);
-    }
-    dagLogging = 
dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    Assert.assertNull(dagLogging);
-    myAclPolicyManager.timelineClient = spy(myAclPolicyManager.timelineClient);
-    tezClient.stop();
-    verify(myAclPolicyManager.timelineClient, times(1)).stop();
-
-  }
-  /**
    * Test Disable Logging for all dags in a session 
    * due to failure to create domain in session start
    * @throws Exception
@@ -501,19 +336,8 @@ public class TestATSHistoryWithACLs {
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, 
remoteStagingDir.toString());
 
     tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
-    ATSHistoryACLPolicyManager myAclPolicyManager = 
ReflectionUtils.createClazzInstance(
-            atsHistoryACLManagerClassName);
-    myAclPolicyManager.timelineClient = mock(TimelineClient.class);
-
-    doThrow(new IOException("Fail to Put Domain")).
-        
when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
-    tezSession.setUpHistoryAclManager(myAclPolicyManager);
     tezSession.start();
 
-    ///substitute back mocked timelineClient with a normal one
-    myAclPolicyManager.timelineClient = null;
-    myAclPolicyManager.setConf(tezConf);
-    tezSession.setUpHistoryAclManager(myAclPolicyManager);
     //////submit first dag //////
     DAGClient dagClient = tezSession.submitDAG(dag);
     DAGStatus dagStatus = dagClient.getDAGStatus(null);
@@ -524,9 +348,7 @@ public class TestATSHistoryWithACLs {
       dagStatus = dagClient.getDAGStatus(null);
     }
     assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    String dagLogging = 
dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    assertEquals(dagLogging, "false");
- 
+
     //////submit second dag//////
     DAG dag2 = DAG.create("TezSleepProcessor2");
     vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
@@ -545,10 +367,9 @@ public class TestATSHistoryWithACLs {
       Thread.sleep(500l);
       dagStatus = dagClient.getDAGStatus(null);
     }
-    dagLogging = 
dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    assertEquals(dagLogging, "false");
     tezSession.stop();
   }
+
   /**
    * use mini cluster to verify data do not push to ats when the daglogging 
flag
    * in dagsubmittedevent is set off
@@ -559,6 +380,9 @@ public class TestATSHistoryWithACLs {
     ATSHistoryLoggingService historyLoggingService;
     historyLoggingService =
         
ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName());
+    AppContext appContext = mock(AppContext.class);
+    
when(appContext.getApplicationID()).thenReturn(ApplicationId.newInstance(0, 1));
+    historyLoggingService.setAppContext(appContext);
     TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
     String viewAcls = "nobody nobody_group";
     tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
@@ -568,8 +392,8 @@ public class TestATSHistoryWithACLs {
         .nextInt(100000))));
     remoteFs.mkdirs(remoteStagingDir);
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, 
remoteStagingDir.toString());
-    historyLoggingService.serviceInit(tezConf);
-    historyLoggingService.serviceStart();
+    historyLoggingService.init(tezConf);
+    historyLoggingService.start();
     ApplicationId appId = ApplicationId.newInstance(100l, 1);
     TezDAGID tezDAGID = TezDAGID.getInstance(
                         appId, 100);
@@ -601,6 +425,9 @@ public class TestATSHistoryWithACLs {
     ATSHistoryLoggingService historyLoggingService;
     historyLoggingService =
             
ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName());
+    AppContext appContext = mock(AppContext.class);
+    
when(appContext.getApplicationID()).thenReturn(ApplicationId.newInstance(0, 1));
+    historyLoggingService.setAppContext(appContext);
     TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
     String viewAcls = "nobody nobody_group";
     tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
@@ -610,8 +437,8 @@ public class TestATSHistoryWithACLs {
         .nextInt(100000))));
     remoteFs.mkdirs(remoteStagingDir);
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, 
remoteStagingDir.toString());
-    historyLoggingService.serviceInit(tezConf);
-    historyLoggingService.serviceStart();
+    historyLoggingService.init(tezConf);
+    historyLoggingService.start();
     ApplicationId appId = ApplicationId.newInstance(100l, 1);
     TezDAGID tezDAGID = TezDAGID.getInstance(
                         appId, 11);
@@ -636,7 +463,7 @@ public class TestATSHistoryWithACLs {
     assertEquals(entity.getEntityType(), "TEZ_DAG_ID");
     assertEquals(entity.getEvents().get(0).getEventType(), 
HistoryEventType.DAG_SUBMITTED.toString());
   }
-  
+
   private static final String atsHistoryACLManagerClassName =
       "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
   @Test (timeout=50000)
@@ -676,5 +503,4 @@ public class TestATSHistoryWithACLs {
     }
   }
 
-
 }

Reply via email to