TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM. (Harish Jaiprakash via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a23de498 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a23de498 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a23de498 Branch: refs/heads/master Commit: a23de4982e4ed0d55fb711745e7670b3be4b266e Parents: c07ec7b Author: Hitesh Shah <[email protected]> Authored: Mon Sep 12 13:29:22 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon Sep 12 13:29:22 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 89 +----- .../org/apache/tez/client/TezClientUtils.java | 69 +---- .../apache/tez/common/security/ACLManager.java | 27 +- .../tez/common/security/DAGAccessControls.java | 43 ++- .../security/HistoryACLPolicyManager.java | 30 +- .../main/java/org/apache/tez/dag/api/DAG.java | 32 +-- .../apache/tez/dag/api/DagTypeConverters.java | 25 ++ tez-api/src/main/proto/DAGApiRecords.proto | 8 + .../org/apache/tez/client/TestTezClient.java | 6 +- .../apache/tez/client/TestTezClientUtils.java | 18 +- .../tez/common/security/TestACLManager.java | 24 +- .../common/security/TestDAGAccessControls.java | 167 ++++++----- .../org/apache/tez/dag/api/TestDAGPlan.java | 25 +- .../org/apache/tez/dag/api/TestDAGVerify.java | 24 +- .../tez/dag/api/TestDagTypeConverters.java | 38 +++ .../apache/tez/dag/app/dag/impl/DAGImpl.java | 2 +- .../ats/acls/ATSHistoryACLPolicyManager.java | 16 +- .../ats/acls/TestATSHistoryWithACLs.java | 202 +------------- .../ats/acls/ATSV15HistoryACLPolicyManager.java | 17 +- .../ats/ATSV15HistoryLoggingService.java | 154 ++++++++--- .../dag/history/ats/acls/TestATSHistoryV15.java | 2 + .../ats/TestATSV15HistoryLoggingService.java | 251 ++++++++++++++++- .../logging/ats/ATSHistoryLoggingService.java | 171 +++++++++--- .../ats/TestATSHistoryLoggingService.java | 276 ++++++++++++++++++- .../tez/mapreduce/examples/MRRSleepJob.java | 30 +- 26 files changed, 1127 insertions(+), 620 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fd6ab68..99bae83 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM. TEZ-3272. Add AMContainerImpl and AMNodeImpl to StateMachine visualization list. TEZ-3284. Synchronization for every write in UnorderdKVWriter TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 71ba6b2..780fcb7 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.util.Time; import org.apache.tez.common.ReflectionUtils; -import org.apache.tez.common.security.HistoryACLPolicyManager; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DAGSubmissionTimedOut; @@ -69,7 +68,6 @@ import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.TezReflectionException; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto; @@ -79,7 +77,6 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequest import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto; import org.apache.tez.dag.api.client.DAGClientImpl; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; -import org.apache.tez.common.security.HistoryACLPolicyException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -136,11 +133,9 @@ public class TezClient { final TezApiVersionInfo apiVersionInfo; @VisibleForTesting final ServicePluginsDescriptor servicePluginsDescriptor; - private HistoryACLPolicyManager historyACLPolicyManager; private JavaOptsChecker javaOptsChecker = null; private int preWarmDAGCounter = 0; - private int dagCounter = 0; /* max submitDAG request size through IPC; beyond this we transfer them in the same way we transfer local resource */ private int maxSubmitDAGRequestSizeThroughIPC; @@ -148,15 +143,6 @@ public class TezClient { private AtomicInteger serializedSubmitDAGPlanRequestCounter = new AtomicInteger(0); private FileSystem stagingFs = null; - private static final String atsHistoryLoggingServiceClassName = - "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService"; - private static final String atsHistoryACLManagerClassName = - "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager"; - private static final String atsv15HistoryLoggingServiceClassName = - "org.apache.tez.dag.history.logging.ats.ATSV15HistoryLoggingService"; - private static final String atsV15HistoryACLManagerClassName = - "org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager"; - private TezClient(String name, TezConfiguration tezConf) { this(name, tezConf, tezConf.getBoolean( TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT)); @@ -375,12 +361,6 @@ public class TezClient { historyLogLevel); } - @Private - @VisibleForTesting - public synchronized void setUpHistoryAclManager(HistoryACLPolicyManager myAclPolicyManager) { - historyACLPolicyManager = myAclPolicyManager; - } - /** * Start the client. This establishes a connection to the YARN cluster. * In session mode, this start the App Master thats runs all the DAGs in the @@ -395,40 +375,6 @@ public class TezClient { frameworkClient.init(amConfig.getTezConfiguration(), amConfig.getYarnConfiguration()); frameworkClient.start(); - ///need additional check for historyACLPolicyManager because tests could stub historyACLPolicyManager - ///before tezclient start. If there is already a stubbed historyACLPolicyManager, we don't overwrite it - if (historyACLPolicyManager == null) { - //TODO: FIXME: The ACL manager should be retrieved either from the - //logging service directly or via a pluggable factory that can - //instantiate ACL managers and logging services - String logSvcClassName = amConfig.getTezConfiguration().get( - TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ""); - String aclMgrClassName = null; - if (logSvcClassName.equals(atsHistoryLoggingServiceClassName)) { - aclMgrClassName = atsHistoryACLManagerClassName; - } else if (logSvcClassName.equals( - atsv15HistoryLoggingServiceClassName)) { - aclMgrClassName = atsV15HistoryACLManagerClassName; - } - if (aclMgrClassName != null) { - LOG.info("Using " + aclMgrClassName + " to manage Timeline ACLs"); - try { - historyACLPolicyManager = ReflectionUtils.createClazzInstance( - aclMgrClassName); - historyACLPolicyManager.setConf(this.amConfig.getYarnConfiguration()); - } catch (TezReflectionException e) { - if (!amConfig.getTezConfiguration().getBoolean( - TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, - TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) { - LOG.warn("Could not instantiate object for " + aclMgrClassName - + ". ACLs cannot be enforced correctly for history data in Timeline", e); - throw e; - } - historyACLPolicyManager = null; - } - } - } - if (this.amConfig.getTezConfiguration().getBoolean( TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED, TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED_DEFAULT)) { @@ -476,7 +422,7 @@ public class TezClient { sessionAppId, null, clientName, amConfig, tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo, - historyACLPolicyManager, servicePluginsDescriptor, javaOptsChecker); + servicePluginsDescriptor, javaOptsChecker); // Set Tez Sessions to not retry on AM crashes if recovery is disabled if (!amConfig.getTezConfiguration().getBoolean( @@ -511,7 +457,6 @@ public class TezClient { * if submission timed out */ public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException { - ++dagCounter; if (isSession) { return submitDAGSession(dag); } else { @@ -545,32 +490,9 @@ public class TezClient { } TezConfiguration dagClientConf = new TezConfiguration(amConfig.getTezConfiguration()); - Map<String, String> aclConfigs = null; - // TEZ_AM_HISTORY_LOGGING_ENABLED is a config setting enable/disable logging of all - // dags within a session - boolean sessionHistoryLoggingEnabled = amConfig.getTezConfiguration().getBoolean( - TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, - TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT); - if (historyACLPolicyManager != null && sessionHistoryLoggingEnabled) { - try { - aclConfigs = historyACLPolicyManager.setupSessionDAGACLs( - amConfig.getTezConfiguration(), sessionAppId, - Integer.toString(dagCounter), dag.getDagAccessControls()); - } catch (HistoryACLPolicyException e) { - LOG.warn("Disabling history logging for dag " + - dag.getName() + " due to error in setting up history acls " + e); - dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false"); - dagClientConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, false); - } - } else if (!sessionHistoryLoggingEnabled) { - dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false"); - dagClientConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, false); - } - Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials); DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, - usingTezArchiveDeploy, sessionCredentials, aclConfigs, servicePluginsDescriptor, - javaOptsChecker); + usingTezArchiveDeploy, sessionCredentials, servicePluginsDescriptor, javaOptsChecker); SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder(); requestBuilder.setDAGPlan(dagPlan); @@ -644,10 +566,6 @@ public class TezClient { */ public synchronized void stop() throws TezException, IOException { try { - if (historyACLPolicyManager != null) { - historyACLPolicyManager.close(); - } - if (sessionStarted) { LOG.info("Shutting down Tez Session" + ", sessionName=" + clientName @@ -1032,8 +950,7 @@ public class TezClient { ApplicationSubmissionContext appContext = TezClientUtils .createApplicationSubmissionContext( appId, dag, dag.getName(), amConfig, tezJarResources, credentials, - usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager, - servicePluginsDescriptor, javaOptsChecker); + usingTezArchiveDeploy, apiVersionInfo, servicePluginsDescriptor, javaOptsChecker); String callerContextStr = ""; if (dag.getCallerContext() != null) { callerContextStr = ", callerContext=" + dag.getCallerContext().contextAsSimpleString(); http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index eb1a95e..f440b6f 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -86,8 +86,6 @@ import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezYARNUtils; import org.apache.tez.common.VersionInfo; import org.apache.tez.common.security.ACLManager; -import org.apache.tez.common.security.HistoryACLPolicyManager; -import org.apache.tez.common.security.HistoryACLPolicyException; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; @@ -445,7 +443,6 @@ public class TezClientUtils { * @param amConfig AM Configuration * @param tezJarResources Resources to be used by the AM * @param sessionCreds the credential object which will be populated with session specific - * @param historyACLPolicyManager * @param servicePluginsDescriptor descriptor for services which may be running in the AM * @return an ApplicationSubmissionContext to launch a Tez AM * @throws IOException @@ -457,7 +454,7 @@ public class TezClientUtils { ApplicationId appId, DAG dag, String amName, AMConfiguration amConfig, Map<String, LocalResource> tezJarResources, Credentials sessionCreds, boolean tezLrsAsArchive, - TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager historyACLPolicyManager, + TezApiVersionInfo apiVersionInfo, ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) throws IOException, YarnException { @@ -565,41 +562,19 @@ public class TezClientUtils { } amLocalResources.putAll(tezJarResources); - // Setup Session ACLs and update conf as needed - Map<String, String> aclConfigs = null; - if (historyACLPolicyManager != null) { - if (dag == null) { - try{ - aclConfigs = historyACLPolicyManager.setupSessionACLs(amConfig.getTezConfiguration(), - appId); - } catch (HistoryACLPolicyException e) { - LOG.warn("Disabling history logging for session " + strAppId + - " due to error in setting up history acls " + e); - amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, - false); - } - } else { - try{ - // Non-session mode - // As only a single DAG is support, we should combine AM and DAG ACLs under the same - // acl management layer - aclConfigs = historyACLPolicyManager.setupNonSessionACLs(amConfig.getTezConfiguration(), - appId, dag.getDagAccessControls()); - } catch (HistoryACLPolicyException e) { - LOG.warn("Disabling history logging for dag " + - dag.getName() + " due to error in setting up history acls " + e); - dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false"); - // This is non-session mode so disable logging for whole AM - amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, - false); - } - } + TezConfiguration tezConf = amConfig.getTezConfiguration(); + // Merge the dag access controls into tez am config. + if (dag != null && dag.getDagAccessControls() != null) { + // Merge updates the conf object passed. In non session mode, same client object can be used + // to submit multiple dags, copying this prevents ACL of one DAG from being used in another. + tezConf = new TezConfiguration(amConfig.getTezConfiguration()); + dag.getDagAccessControls().mergeIntoAmAcls(tezConf); } // emit conf as PB file - ConfigurationProto finalConfProto = createFinalConfProtoForApp(amConfig.getTezConfiguration(), - aclConfigs, servicePluginsDescriptor); - + ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf, + servicePluginsDescriptor); + FSDataOutputStream amConfPBOutBinaryStream = null; try { amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryConfPath); @@ -737,20 +712,10 @@ public class TezClientUtils { Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive, Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) throws IOException { - return prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials, - null, servicePluginsDescriptor, javaOptsChecker); - } - - static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig, - Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive, - Credentials credentials, Map<String, String> additionalDAGConfigs, - ServicePluginsDescriptor servicePluginsDescriptor, - JavaOptsChecker javaOptsChecker) throws IOException { Credentials dagCredentials = setupDAGCredentials(dag, credentials, amConfig.getTezConfiguration()); return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources, - amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs, servicePluginsDescriptor, - javaOptsChecker); + amConfig.getBinaryConfLR(), tezLrsAsArchive, servicePluginsDescriptor, javaOptsChecker); } static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) { @@ -829,7 +794,7 @@ public class TezClientUtils { } static ConfigurationProto createFinalConfProtoForApp(Configuration amConf, - Map<String, String> additionalConfigs, ServicePluginsDescriptor servicePluginsDescriptor) { + ServicePluginsDescriptor servicePluginsDescriptor) { assert amConf != null; ConfigurationProto.Builder builder = ConfigurationProto.newBuilder(); for (Entry<String, String> entry : amConf) { @@ -838,14 +803,6 @@ public class TezClientUtils { kvp.setValue(amConf.get(entry.getKey())); builder.addConfKeyValues(kvp); } - if (additionalConfigs != null && !additionalConfigs.isEmpty()) { - for (Entry<String, String> entry : additionalConfigs.entrySet()) { - PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); - kvp.setKey(entry.getKey()); - kvp.setValue(entry.getValue()); - builder.addConfKeyValues(kvp); - } - } AMPluginDescriptorProto pluginDescriptorProto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor); http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java b/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java index e1c7314..08680a5 100644 --- a/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java +++ b/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java @@ -26,15 +26,15 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.records.DAGProtos.ACLInfo; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; /** * Class to manage ACLs for the Tez AM and DAGs and provides functionality to check whether @@ -42,8 +42,6 @@ import com.google.common.annotations.VisibleForTesting; */ @Private public class ACLManager { - - private static final Logger LOG = LoggerFactory.getLogger(ACLManager.class); public static final String WILDCARD_ACL_VALUE = "*"; private final String dagUser; @@ -75,7 +73,7 @@ public class ACLManager { } } - public ACLManager(ACLManager amACLManager, String dagUser, Configuration dagConf) { + public ACLManager(ACLManager amACLManager, String dagUser, ACLInfo aclInfo) { this.amUser = amACLManager.amUser; this.dagUser = dagUser; this.users = amACLManager.users; @@ -84,12 +82,21 @@ public class ACLManager { if (!aclsEnabled) { return; } - ACLConfigurationParser parser = new ACLConfigurationParser(dagConf, true); - if (parser.getAllowedUsers() != null) { - this.users.putAll(parser.getAllowedUsers()); + if (aclInfo.getUsersWithViewAccessCount() > 0) { + this.users.put(ACLType.DAG_VIEW_ACL, + Sets.newLinkedHashSet(aclInfo.getUsersWithViewAccessList())); } - if (parser.getAllowedGroups() != null) { - this.groups.putAll(parser.getAllowedGroups()); + if (aclInfo.getUsersWithModifyAccessCount() > 0) { + this.users.put(ACLType.DAG_MODIFY_ACL, + Sets.newLinkedHashSet(aclInfo.getUsersWithModifyAccessList())); + } + if (aclInfo.getGroupsWithViewAccessCount() > 0) { + this.groups.put(ACLType.DAG_VIEW_ACL, + Sets.newLinkedHashSet(aclInfo.getGroupsWithViewAccessList())); + } + if (aclInfo.getGroupsWithModifyAccessCount() > 0) { + this.groups.put(ACLType.DAG_MODIFY_ACL, + Sets.newLinkedHashSet(aclInfo.getGroupsWithModifyAccessList())); } } http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java b/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java index 5fe352a..520416d 100644 --- a/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java +++ b/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java @@ -27,8 +27,11 @@ import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; +import com.google.common.collect.ImmutableMap; + /** * Access controls for the DAG */ @@ -150,23 +153,37 @@ public class DAGAccessControls { return Collections.unmodifiableSet(groupsWithModifyACLs); } + /** + * Merge the dag acls with the AM acls in the configuration object. The config object will contain + * the updated acls. + * @param conf The AM config. + */ @Private - public synchronized void serializeToConfiguration(Configuration conf) { - if (usersWithViewACLs.contains(ACLManager.WILDCARD_ACL_VALUE)) { - conf.set(TezConstants.TEZ_DAG_VIEW_ACLS, ACLManager.WILDCARD_ACL_VALUE); + public synchronized void mergeIntoAmAcls(Configuration conf) { + ACLConfigurationParser parser = new ACLConfigurationParser(conf, false); + parser.addAllowedGroups(ImmutableMap.of( + ACLType.AM_VIEW_ACL, groupsWithViewACLs, ACLType.AM_MODIFY_ACL, groupsWithModifyACLs)); + parser.addAllowedUsers(ImmutableMap.of( + ACLType.AM_VIEW_ACL, usersWithViewACLs, ACLType.AM_MODIFY_ACL, usersWithModifyACLs)); + + Set<String> viewUsers = parser.getAllowedUsers().get(ACLType.AM_VIEW_ACL); + Set<String> viewGroups = parser.getAllowedGroups().get(ACLType.AM_VIEW_ACL); + if (viewUsers.contains(ACLManager.WILDCARD_ACL_VALUE)) { + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, ACLManager.WILDCARD_ACL_VALUE); } else { - String userList = ACLManager.toCommaSeparatedString(usersWithViewACLs); - String groupList = ACLManager.toCommaSeparatedString(groupsWithViewACLs); - conf.set(TezConstants.TEZ_DAG_VIEW_ACLS, - userList + " " + groupList); + String userList = ACLManager.toCommaSeparatedString(viewUsers); + String groupList = ACLManager.toCommaSeparatedString(viewGroups); + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, userList + " " + groupList); } - if (usersWithModifyACLs.contains(ACLManager.WILDCARD_ACL_VALUE)) { - conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, ACLManager.WILDCARD_ACL_VALUE); + + Set<String> modifyUsers = parser.getAllowedUsers().get(ACLType.AM_MODIFY_ACL); + Set<String> modifyGroups = parser.getAllowedGroups().get(ACLType.AM_MODIFY_ACL); + if (modifyUsers.contains(ACLManager.WILDCARD_ACL_VALUE)) { + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, ACLManager.WILDCARD_ACL_VALUE); } else { - String userList = ACLManager.toCommaSeparatedString(usersWithModifyACLs); - String groupList = ACLManager.toCommaSeparatedString(groupsWithModifyACLs); - conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, userList + " " + groupList); + String userList = ACLManager.toCommaSeparatedString(modifyUsers); + String groupList = ACLManager.toCommaSeparatedString(modifyGroups); + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, userList + " " + groupList); } } - } http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java index fc0f57c..92eea67 100644 --- a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java +++ b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java @@ -30,15 +30,21 @@ import org.apache.tez.common.security.HistoryACLPolicyException; /** * ACL Policy Manager - * An instance of this implements any ACL related activity when starting a session or - * submitting a DAG + * An instance of this implements any ACL related activity when starting a session or submitting a + * DAG. It is used in the HistoryLoggingService to create domain ids and populate entities with + * domain id. */ @Unstable @Private public interface HistoryACLPolicyManager extends Configurable { /** - * Take any necessary steps for setting up Session ACLs + * Take any necessary steps for setting up both Session ACLs and non session acls. This is called + * with the am configuration which contains the ACL information to be used to create a domain. + * If the method returns a value, then its assumed to be a valid domain and used as domainId. + * If the method returns null, acls are disabled at session level, i.e use default acls at session + * level. + * If the method throws an Exception, history logging is disabled for the entire session. * @param conf Configuration * @param applicationId Application ID for the session * @throws Exception @@ -47,7 +53,7 @@ public interface HistoryACLPolicyManager extends Configurable { throws IOException, HistoryACLPolicyException; /** - * Take any necessary steps for setting up ACLs for an AM which is running in non-session mode + * Not used currently. * @param conf Configuration * @param applicationId Application ID for the AM * @param dagAccessControls ACLs defined for the DAG being submitted @@ -57,16 +63,26 @@ public interface HistoryACLPolicyManager extends Configurable { DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException; /** - * Take any necessary steps for setting up ACLs for a DAG that is submitted to a Session + * Take any necessary steps for setting up ACLs for a DAG that is submitted to a Session. This is + * called with dag configuration. + * If the method returns a value, then it is assumed to be valid domain and is used as a domainId + * for all of the dag events. + * If the method returns null, it falls back to session level acls. + * If the method throws Exception: it disables history logging for the dag events. * @param conf Configuration * @param applicationId Application ID for the AM * @param dagAccessControls ACLs defined for the DAG being submitted * @throws Exception */ public Map<String, String> setupSessionDAGACLs(Configuration conf, ApplicationId applicationId, - String dagName, DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException; - + String dagName, DAGAccessControls dagAccessControls) + throws IOException, HistoryACLPolicyException; + /** + * Called with a timeline entity which has to be updated with a domain id. + * @param timelineEntity The timeline entity which will be published. + * @param domainId The domainId returned by one of the setup*ACL calls. + */ public void updateTimelineEntityDomain(Object timelineEntity, String domainId); /** http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index 65321a8..f15c1fb 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -770,15 +769,15 @@ public class DAG { Map<String, LocalResource> tezJarResources, LocalResource binaryConfig, boolean tezLrsAsArchive) { return createDag(tezConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive, - null, null, null); + null, null); } // create protobuf message describing DAG @Private public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCredentials, Map<String, LocalResource> tezJarResources, LocalResource binaryConfig, - boolean tezLrsAsArchive, Map<String, String> additionalConfigs, - ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) { + boolean tezLrsAsArchive, ServicePluginsDescriptor servicePluginsDescriptor, + JavaOptsChecker javaOptsChecker) { Deque<String> topologicalVertexStack = verify(true); DAGPlan.Builder dagBuilder = DAGPlan.newBuilder(); @@ -1017,30 +1016,11 @@ public class DAG { dagBuilder.addEdge(edgeBuilder); } - ConfigurationProto.Builder confProtoBuilder = - ConfigurationProto.newBuilder(); if (dagAccessControls != null) { - Configuration aclConf = new Configuration(false); - dagAccessControls.serializeToConfiguration(aclConf); - Iterator<Entry<String, String>> aclConfIter = aclConf.iterator(); - while (aclConfIter.hasNext()) { - Entry<String, String> entry = aclConfIter.next(); - PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); - kvp.setKey(entry.getKey()); - kvp.setValue(entry.getValue()); - TezConfiguration.validateProperty(entry.getKey(), Scope.DAG); - confProtoBuilder.addConfKeyValues(kvp); - } - } - if (additionalConfigs != null && !additionalConfigs.isEmpty()) { - for (Entry<String, String> entry : additionalConfigs.entrySet()) { - PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); - kvp.setKey(entry.getKey()); - kvp.setValue(entry.getValue()); - TezConfiguration.validateProperty(entry.getKey(), Scope.DAG); - confProtoBuilder.addConfKeyValues(kvp); - } + dagBuilder.setAclInfo(DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls)); } + + ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); if (!this.dagConf.isEmpty()) { for (Entry<String, String> entry : this.dagConf.entrySet()) { PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index 5733da8..cefe026 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -50,6 +50,7 @@ import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.common.security.DAGAccessControls; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; @@ -57,6 +58,7 @@ import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto; import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.dag.api.records.DAGProtos.ACLInfo; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; @@ -863,4 +865,27 @@ public class DagTypeConverters { return callerContext; } + public static ACLInfo convertDAGAccessControlsToProto(DAGAccessControls dagAccessControls) { + if (dagAccessControls == null) { + return null; + } + ACLInfo.Builder builder = ACLInfo.newBuilder(); + builder.addAllUsersWithViewAccess(dagAccessControls.getUsersWithViewACLs()); + builder.addAllUsersWithModifyAccess(dagAccessControls.getUsersWithModifyACLs()); + builder.addAllGroupsWithViewAccess(dagAccessControls.getGroupsWithViewACLs()); + builder.addAllGroupsWithModifyAccess(dagAccessControls.getGroupsWithModifyACLs()); + return builder.build(); + } + + public static DAGAccessControls convertDAGAccessControlsFromProto(ACLInfo aclInfo) { + if (aclInfo == null) { + return null; + } + DAGAccessControls dagAccessControls = new DAGAccessControls(); + dagAccessControls.setUsersWithViewACLs(aclInfo.getUsersWithViewAccessList()); + dagAccessControls.setUsersWithModifyACLs(aclInfo.getUsersWithModifyAccessList()); + dagAccessControls.setGroupsWithViewACLs(aclInfo.getGroupsWithViewAccessList()); + dagAccessControls.setGroupsWithModifyACLs(aclInfo.getGroupsWithModifyAccessList()); + return dagAccessControls; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/proto/DAGApiRecords.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto index d016d60..c84094b 100644 --- a/tez-api/src/main/proto/DAGApiRecords.proto +++ b/tez-api/src/main/proto/DAGApiRecords.proto @@ -197,6 +197,13 @@ message CallerContextProto { optional string blob = 4; } +message ACLInfo { + repeated string usersWithViewAccess = 1; + repeated string usersWithModifyAccess = 2; + repeated string groupsWithViewAccess = 3; + repeated string groupsWithModifyAccess = 4; +} + message DAGPlan { required string name = 1; repeated VertexPlan vertex = 2; @@ -208,6 +215,7 @@ message DAGPlan { optional string dag_info = 8; optional VertexExecutionContextProto default_execution_context = 9; optional CallerContextProto caller_context = 10; + optional ACLInfo aclInfo = 11; } // DAG monitoring messages http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index d49ba48..51f36a3 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.common.security.HistoryACLPolicyManager; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.PreWarmVertex; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -241,12 +240,10 @@ public class TestTezClient { LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1)); TezClientForTest client = configureAndCreateTezClient(lrs, isSession, null); - HistoryACLPolicyManager mockAcl = mock(HistoryACLPolicyManager.class); ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class); when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) .thenReturn(YarnApplicationState.RUNNING); - client.setUpHistoryAclManager(mockAcl); client.start(); verify(client.mockYarnClient, times(1)).init((Configuration)any()); verify(client.mockYarnClient, times(1)).start(); @@ -344,9 +341,8 @@ public class TestTezClient { (ShutdownSessionRequestProto) any()); } verify(client.mockYarnClient, times(1)).stop(); - verify(mockAcl, times(1)).close(); } - + @Test (timeout=5000) public void testPreWarm() throws Exception { TezClientForTest client = configureAndCreateTezClient(); http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java index 2c69d77..49aae20 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; import java.io.File; import java.io.FileNotFoundException; @@ -61,7 +60,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; -import org.apache.tez.common.security.HistoryACLPolicyManager; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.common.security.TokenCache; @@ -133,7 +131,7 @@ public class TestTezClientUtils { /** * */ - @Test (timeout=5000) + @Test (timeout=10000) public void validateSetTezJarLocalResourcesDefinedExistingDirectory() throws Exception { URL[] cp = ((URLClassLoader)ClassLoader.getSystemClassLoader()).getURLs(); StringBuffer buffer = new StringBuffer(); @@ -314,7 +312,7 @@ public class TestTezClientUtils { appId, null, "dagname", amConf, m, credentials, false, - new TezApiVersionInfo(), null, null, null); + new TezApiVersionInfo(), null, null); assertEquals(testpriority, appcontext.getPriority().getPriority()); } @@ -366,7 +364,7 @@ public class TestTezClientUtils { ApplicationSubmissionContext appSubmissionContext = TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf, new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(), - mock(HistoryACLPolicyManager.class), null, null); + null, null); ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec(); Map<String, ByteBuffer> amServiceData = amClc.getServiceData(); @@ -399,7 +397,7 @@ public class TestTezClientUtils { ApplicationSubmissionContext appSubmissionContext = TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf, new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(), - mock(HistoryACLPolicyManager.class), null, null); + null, null); List<String> expectedCommands = new LinkedList<String>(); expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator"); @@ -439,7 +437,7 @@ public class TestTezClientUtils { ApplicationSubmissionContext appSubmissionContext = TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf, new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(), - mock(HistoryACLPolicyManager.class), null, null); + null, null); List<String> expectedCommands = new LinkedList<String>(); expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator"); @@ -626,7 +624,7 @@ public class TestTezClientUtils { expected.put("property1", val1); expected.put("property2", expVal2); - ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null, null); + ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null); for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) { String v = expected.remove(kvPair.getKey()); @@ -730,7 +728,7 @@ public class TestTezClientUtils { srcConf.set(entry.getKey(), entry.getValue()); } - ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf, null, null); + ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf, null); for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) { String val = confMap.remove(kvPair.getKey()); @@ -792,7 +790,7 @@ public class TestTezClientUtils { Configuration conf = new Configuration(false); ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true); - ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null, + ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, servicePluginsDescriptor); assertTrue(confProto.hasAmPluginDescriptor()); http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java b/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java index a88e801..fc9c24e 100644 --- a/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java +++ b/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java @@ -20,18 +20,16 @@ package org.apache.tez.common.security; import java.io.IOException; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.records.DAGProtos.ACLInfo; import org.junit.Assert; import org.junit.Test; -import com.google.common.collect.Sets; public class TestACLManager { @@ -64,7 +62,8 @@ public class TestACLManager { Assert.assertTrue(aclManager.checkAccess(user, ACLType.AM_VIEW_ACL)); Assert.assertTrue(aclManager.checkAccess(user, ACLType.AM_MODIFY_ACL)); - ACLManager dagAclManager = new ACLManager(aclManager, dagUser.getShortUserName(), new Configuration(false)); + ACLManager dagAclManager = new ACLManager(aclManager, dagUser.getShortUserName(), + ACLInfo.getDefaultInstance()); user = dagUser; Assert.assertFalse(dagAclManager.checkAccess(user, ACLType.AM_VIEW_ACL)); Assert.assertFalse(dagAclManager.checkAccess(user, ACLType.AM_MODIFY_ACL)); @@ -256,17 +255,20 @@ public class TestACLManager { conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, modifyACLs); conf.set(YarnConfiguration.YARN_ADMIN_ACL, yarnAdminACLs); - // DAG View ACLs: user1, user4, grp3, grp4. - String dagViewACLs = "user6, grp5 "; - // DAG Modify ACLs: user3, grp6, grp7 - String dagModifyACLs = "user6,user5 "; - conf.set(TezConstants.TEZ_DAG_VIEW_ACLS, dagViewACLs); - conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, dagModifyACLs); + ACLInfo.Builder builder = ACLInfo.newBuilder(); + // DAG View ACLs: user6, grp5 + builder.addUsersWithViewAccess("user6"); + builder.addGroupsWithViewAccess("grp5"); + + // DAG Modify ACLs: user6,user5 + builder.addUsersWithModifyAccess("user6"); + builder.addUsersWithModifyAccess("user7"); UserGroupInformation dagUser = UserGroupInformation.createUserForTesting("dagUser", noGroups); ACLManager amAclManager = new ACLManager(currentUser.getShortUserName(), conf); - ACLManager aclManager = new ACLManager(amAclManager, dagUser.getShortUserName(), conf); + ACLManager aclManager = new ACLManager(amAclManager, dagUser.getShortUserName(), + builder.build()); Assert.assertTrue(aclManager.checkAMViewAccess(currentUser)); Assert.assertFalse(aclManager.checkAMViewAccess(dagUser)); http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java b/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java index 6afc83b..4335a20 100644 --- a/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java +++ b/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java @@ -18,76 +18,14 @@ package org.apache.tez.common.security; -import java.util.Arrays; - import org.apache.hadoop.conf.Configuration; -import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezConfiguration; import org.junit.Assert; import org.junit.Test; -public class TestDAGAccessControls { - - @Test(timeout = 5000) - public void testBasicSerializeToConf() { - DAGAccessControls dagAccessControls = new DAGAccessControls(); - dagAccessControls.setUsersWithViewACLs(Arrays.asList("u1")) - .setUsersWithModifyACLs(Arrays.asList("u2")) - .setGroupsWithViewACLs(Arrays.asList("g1")) - .setGroupsWithModifyACLs(Arrays.asList("g2")); - - Configuration conf = new Configuration(false); - dagAccessControls.serializeToConfiguration(conf); - Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS)); - Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS)); - - Assert.assertEquals("u1 g1", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS)); - Assert.assertEquals("u2 g2", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS)); - } - - @Test(timeout = 5000) - public void testWildCardSerializeToConf() { - DAGAccessControls dagAccessControls = new DAGAccessControls(); - dagAccessControls.setUsersWithViewACLs(Arrays.asList("*")) - .setUsersWithModifyACLs(Arrays.asList("*")) - .setGroupsWithViewACLs(Arrays.asList("g1")) - .setGroupsWithModifyACLs(Arrays.asList("g2")); - - Configuration conf = new Configuration(false); - dagAccessControls.serializeToConfiguration(conf); - Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS)); - Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS)); - - Assert.assertEquals("*", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS)); - Assert.assertEquals("*", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS)); - } - - @Test(timeout = 5000) - public void testGroupsOnlySerializeToConf() { - DAGAccessControls dagAccessControls = new DAGAccessControls(); - dagAccessControls.setGroupsWithViewACLs(Arrays.asList("g1")) - .setGroupsWithModifyACLs(Arrays.asList("g2")); - - Configuration conf = new Configuration(false); - dagAccessControls.serializeToConfiguration(conf); - Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS)); - Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS)); +import com.google.common.collect.Sets; - Assert.assertEquals(" g1", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS)); - Assert.assertEquals(" g2", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS)); - } - - @Test(timeout = 5000) - public void testEmptySerializeToConf() { - DAGAccessControls dagAccessControls = new DAGAccessControls(); - - Configuration conf = new Configuration(false); - dagAccessControls.serializeToConfiguration(conf); - Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS)); - Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS)); - - Assert.assertEquals(" ", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS)); - Assert.assertEquals(" ", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS)); - } +public class TestDAGAccessControls { @Test(timeout = 5000) public void testStringBasedConstructor() { @@ -102,8 +40,107 @@ public class TestDAGAccessControls { Assert.assertTrue(dagAccessControls.getUsersWithModifyACLs().contains("u2")); Assert.assertTrue(dagAccessControls.getGroupsWithViewACLs().contains("g1")); Assert.assertTrue(dagAccessControls.getGroupsWithModifyACLs().contains("g2")); + } + + @Test(timeout=5000) + public void testMergeIntoAmAcls() { + DAGAccessControls dagAccessControls = new DAGAccessControls("u1 g1", "u2 g2"); + Configuration conf = new Configuration(false); + + // default conf should have ACLs copied over. + dagAccessControls.mergeIntoAmAcls(conf); + assertACLS("u1 g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS)); + assertACLS("u2 g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS)); + + // both have unique users merged should have all + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u1 g1"); + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u2 g2"); + dagAccessControls.mergeIntoAmAcls(conf); + assertACLS("u1 g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS)); + assertACLS("u2 g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS)); + + // both have unique users merged should have all + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3"); + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4"); + dagAccessControls.mergeIntoAmAcls(conf); + assertACLS("u3,u1 g3,g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS)); + assertACLS("u4,u2 g4,g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS)); + + // one of the user is *, merged is always * + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3"); + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4"); + dagAccessControls.mergeIntoAmAcls(conf); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS)); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS)); + + // only * in the config, merged is * + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*"); + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*"); + dagAccessControls.mergeIntoAmAcls(conf); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS)); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS)); + + // DAG access with *, all operation yeild * + dagAccessControls = new DAGAccessControls("*", "*"); + + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3"); + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4"); + dagAccessControls.mergeIntoAmAcls(conf); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS)); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS)); + + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3"); + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4"); + dagAccessControls.mergeIntoAmAcls(conf); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS)); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS)); + + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*"); + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*"); + dagAccessControls.mergeIntoAmAcls(conf); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS)); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS)); + + // DAG access is empty, conf should be same. + dagAccessControls = new DAGAccessControls("", ""); + + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3"); + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4"); + dagAccessControls.mergeIntoAmAcls(conf); + assertACLS("u3 g3", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS)); + assertACLS("u4 g4", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS)); + + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3"); + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4"); + dagAccessControls.mergeIntoAmAcls(conf); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS)); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS)); + + conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*"); + conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*"); + dagAccessControls.mergeIntoAmAcls(conf); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS)); + assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS)); } + public void assertACLS(String expected, String obtained) { + if (expected.equals(obtained)) { + return; + } + String [] parts1 = expected.split(" "); + String [] parts2 = obtained.split(" "); + + Assert.assertEquals(parts1.length, parts2.length); + + Assert.assertEquals( + Sets.newHashSet(parts1[0].split(",")), Sets.newHashSet(parts2[0].split(","))); + + if (parts1.length < 2) { + return; + } + Assert.assertEquals( + Sets.newHashSet(parts1[1].split(",")), Sets.newHashSet(parts2[1].split(","))); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java index 005c027..8e1011f 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java @@ -370,14 +370,14 @@ public class TestDAGPlan { dag.addVertex(v1); // Should succeed. Default context is containers. - dag.createDag(new TezConfiguration(false), null, null, null, true, null, + dag.createDag(new TezConfiguration(false), null, null, null, true, servicePluginsDescriptor, null); // Set execute in AM should fail v1.setExecutionContext(VertexExecutionContext.createExecuteInAm(true)); try { - dag.createDag(new TezConfiguration(false), null, null, null, true, null, + dag.createDag(new TezConfiguration(false), null, null, null, true, servicePluginsDescriptor, null); fail("Expecting dag create to fail due to invalid ServicePluginDescriptor"); } catch (IllegalStateException e) { @@ -386,13 +386,13 @@ public class TestDAGPlan { // Valid context v1.setExecutionContext(validExecContext); - dag.createDag(new TezConfiguration(false), null, null, null, true, null, + dag.createDag(new TezConfiguration(false), null, null, null, true, servicePluginsDescriptor, null); // Invalid task scheduler v1.setExecutionContext(invalidExecContext1); try { - dag.createDag(new TezConfiguration(false), null, null, null, true, null, + dag.createDag(new TezConfiguration(false), null, null, null, true, servicePluginsDescriptor, null); fail("Expecting dag create to fail due to invalid ServicePluginDescriptor"); } catch (IllegalStateException e) { @@ -404,7 +404,7 @@ public class TestDAGPlan { // Invalid ContainerLauncher v1.setExecutionContext(invalidExecContext2); try { - dag.createDag(new TezConfiguration(false), null, null, null, true, null, + dag.createDag(new TezConfiguration(false), null, null, null, true, servicePluginsDescriptor, null); fail("Expecting dag create to fail due to invalid ServicePluginDescriptor"); } catch (IllegalStateException e) { @@ -416,7 +416,7 @@ public class TestDAGPlan { // Invalid task comm v1.setExecutionContext(invalidExecContext3); try { - dag.createDag(new TezConfiguration(false), null, null, null, true, null, + dag.createDag(new TezConfiguration(false), null, null, null, true, servicePluginsDescriptor, null); fail("Expecting dag create to fail due to invalid ServicePluginDescriptor"); } catch (IllegalStateException e) { @@ -444,7 +444,8 @@ public class TestDAGPlan { new ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create("plugin", null)}, new TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create("plugin", null)}); - Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)).setExecutionContext(v1Context); + Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)) + .setExecutionContext(v1Context); Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1)); v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>()) .addTaskLocalFiles(new HashMap<String, LocalResource>()); @@ -462,7 +463,7 @@ public class TestDAGPlan { dag.addVertex(v1).addVertex(v2).addEdge(edge); dag.setExecutionContext(defaultExecutionContext); - DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true, null, + DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true, servicePluginsDescriptor, null); assertEquals(2, dagProto.getVertexCount()); @@ -502,8 +503,7 @@ public class TestDAGPlan { TezConfiguration conf = new TezConfiguration(false); conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -XX:+UseParallelGC "); try { - DAGPlan dagProto = dag.createDag(conf, null, null, null, true, null, null, - new JavaOptsChecker()); + dag.createDag(conf, null, null, null, true, null, new JavaOptsChecker()); fail("Expected dag creation to fail for invalid java opts"); } catch (TezUncheckedException e) { Assert.assertTrue(e.getMessage().contains("Invalid/conflicting GC options")); @@ -511,12 +511,11 @@ public class TestDAGPlan { // Should not fail as java opts valid conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -XX:-UseParallelGC "); - DAGPlan dagProto1 = dag.createDag(conf, null, null, null, true, null, null, - new JavaOptsChecker()); + dag.createDag(conf, null, null, null, true, null, new JavaOptsChecker()); // Should not fail as no checker enabled conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -XX:+UseParallelGC "); - DAGPlan dagProto2 = dag.createDag(conf, null, null, null, true, null, null, null); + dag.createDag(conf, null, null, null, true, null, null); } http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java index c566b1a..794a597 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.api; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,9 +35,8 @@ import org.apache.tez.common.security.DAGAccessControls; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; -import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; +import org.apache.tez.dag.api.records.DAGProtos.ACLInfo; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; -import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.junit.Assert; @@ -1044,21 +1044,11 @@ public class TestDAGVerify { Assert.assertNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS)); Assert.assertNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS)); - ConfigurationProto confProto = dagPlan.getDagConf(); - boolean foundViewAcls = false; - boolean foundModifyAcls = false; - - for (PlanKeyValuePair pair : confProto.getConfKeyValuesList()) { - if (pair.getKey().equals(TezConstants.TEZ_DAG_VIEW_ACLS)) { - foundViewAcls = true; - Assert.assertEquals("u1 g1", pair.getValue()); - } else if (pair.getKey().equals(TezConstants.TEZ_DAG_MODIFY_ACLS)) { - foundModifyAcls = true; - Assert.assertEquals("*", pair.getValue()); - } - } - Assert.assertTrue(foundViewAcls); - Assert.assertTrue(foundModifyAcls); + ACLInfo aclInfo = dagPlan.getAclInfo(); + Assert.assertEquals(Collections.singletonList("u1"), aclInfo.getUsersWithViewAccessList()); + Assert.assertEquals(Collections.singletonList("g1"), aclInfo.getGroupsWithViewAccessList()); + Assert.assertEquals(Collections.singletonList("*"), aclInfo.getUsersWithModifyAccessList()); + Assert.assertEquals(Collections.singletonList("g2"), aclInfo.getGroupsWithModifyAccessList()); } // v1 has input initializer http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java index 6f795fc..dc04f2d 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java @@ -32,7 +32,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.security.DAGAccessControls; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; +import org.apache.tez.dag.api.records.DAGProtos.ACLInfo; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; @@ -44,6 +46,8 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Sets; + public class TestDagTypeConverters { @Test(timeout = 5000) @@ -208,6 +212,40 @@ public class TestDagTypeConverters { verifyPlugins(proto.getTaskCommunicatorsList(), 1, testComm, true); } + @Test + public void testAclConversions() { + DAGAccessControls dagAccessControls = new DAGAccessControls("u1,u2 g1,g2", "u3,u4 g3,g4"); + ACLInfo aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls); + assertSame(dagAccessControls, aclInfo); + assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo); + + dagAccessControls = new DAGAccessControls("u1 ", "u2 "); + aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls); + assertSame(dagAccessControls, aclInfo); + assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo); + + dagAccessControls = new DAGAccessControls(" g1", " g3,g4"); + aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls); + assertSame(dagAccessControls, aclInfo); + assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo); + + dagAccessControls = new DAGAccessControls("*", "*"); + aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls); + assertSame(dagAccessControls, aclInfo); + assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo); + } + + private void assertSame(DAGAccessControls dagAccessControls, ACLInfo aclInfo) { + assertEquals(dagAccessControls.getUsersWithViewACLs(), + Sets.newHashSet(aclInfo.getUsersWithViewAccessList())); + assertEquals(dagAccessControls.getUsersWithModifyACLs(), + Sets.newHashSet(aclInfo.getUsersWithModifyAccessList())); + assertEquals(dagAccessControls.getGroupsWithViewACLs(), + Sets.newHashSet(aclInfo.getGroupsWithViewAccessList())); + assertEquals(dagAccessControls.getGroupsWithModifyACLs(), + Sets.newHashSet(aclInfo.getGroupsWithModifyAccessList())); + } + private void verifyPlugins(List<TezNamedEntityDescriptorProto> entities, int expectedCount, String baseString, boolean hasPayload) { assertEquals(expectedCount, entities.size()); http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index fd6d446..481353b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -542,7 +542,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } this.aclManager = new ACLManager(appContext.getAMACLManager(), dagUGI.getShortUserName(), - this.dagConf); + this.jobPlan.getAclInfo()); // this is only for recovery in case it does not call the init transition this.startDAGCpuTime = appContext.getCumulativeCPUTime(); this.startDAGGCTime = appContext.getCumulativeGCTime(); http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java index 91ffe7b..45e24ce 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java @@ -154,7 +154,6 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager { if (domainId != null) { // do nothing LOG.info("Using specified domainId with Timeline, domainId=" + domainId); - return null; } else { if (!autoCreateDomain) { // Error - Cannot fallback to default as that leaves ACLs open @@ -164,18 +163,13 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager { domainId = DOMAIN_ID_PREFIX + applicationId.toString(); createTimelineDomain(domainId, tezConf, dagAccessControls); LOG.info("Created Timeline Domain for History ACLs, domainId=" + domainId); - return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId); } + return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId); } private Map<String, String> createDAGDomain(Configuration tezConf, ApplicationId applicationId, String dagName, DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException { - if (dagAccessControls == null) { - // No DAG specific ACLs - return null; - } - String domainId = tezConf.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID); if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED, @@ -193,7 +187,6 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager { if (domainId != null) { // do nothing LOG.info("Using specified domainId with Timeline, domainId=" + domainId); - return null; } else { if (!autoCreateDomain) { // Error - Cannot fallback to default as that leaves ACLs open @@ -201,14 +194,17 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager { + " Domains is disabled"); } + // Create a domain only if dagAccessControls has been specified. + if (dagAccessControls == null) { + return null; + } domainId = DOMAIN_ID_PREFIX + applicationId.toString() + "_" + dagName; createTimelineDomain(domainId, tezConf, dagAccessControls); LOG.info("Created Timeline Domain for DAG-specific History ACLs, domainId=" + domainId); - return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId); } + return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId); } - @Override public void setConf(Configuration conf) { this.conf = conf; http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java index 2c976f5..6b3ebd7 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java @@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.client.TezClient; import org.apache.tez.common.ReflectionUtils; @@ -54,6 +53,7 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.app.AppContext; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.history.events.DAGSubmittedEvent; @@ -73,7 +73,6 @@ import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; -import org.mockito.Matchers; public class TestATSHistoryWithACLs { @@ -307,170 +306,6 @@ public class TestATSHistoryWithACLs { } /** - * test failure of domain creation during dag submittion in session mode - * only affect logging for that dag not following submitted dag - * @throws Exception - */ - @Test (timeout=50000) - public void testMultipleDagSession() throws Exception { - TezClient tezSession = null; - String viewAcls = "nobody nobody_group"; - SleepProcessorConfig spConf = new SleepProcessorConfig(1); - - DAG dag = DAG.create("TezSleepProcessor"); - Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( - SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, - Resource.newInstance(256, 1)); - dag.addVertex(vertex); - DAGAccessControls accessControls = new DAGAccessControls(); - accessControls.setUsersWithViewACLs(Collections.singleton("nobody2")); - accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2")); - dag.setAccessControls(accessControls); - - TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); - tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls); - tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, - ATSHistoryLoggingService.class.getName()); - Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random - .nextInt(100000)))); - remoteFs.mkdirs(remoteStagingDir); - tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); - - tezSession = TezClient.create("TezSleepProcessor", tezConf, true); - tezSession.start(); - - //////submit first dag which fails in dag creation////// - ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance( - atsHistoryACLManagerClassName); - myAclPolicyManager.timelineClient = mock(TimelineClient.class); - - doThrow(new IOException("Fail to Put Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg()); - tezSession.setUpHistoryAclManager(myAclPolicyManager); - - DAGClient dagClient = tezSession.submitDAG(dag); - DAGStatus dagStatus = dagClient.getDAGStatus(null); - while (!dagStatus.isCompleted()) { - LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " - + dagStatus.getState()); - Thread.sleep(500l); - dagStatus = dagClient.getDAGStatus(null); - } - assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); - String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); - assertEquals(dagLogging, "false"); - - myAclPolicyManager.timelineClient = null; - myAclPolicyManager.setConf(tezConf); - tezSession.setUpHistoryAclManager(myAclPolicyManager); - - //////submit second dag which succeeds in dag creation////// - DAG dag2 = DAG.create("TezSleepProcessor2"); - vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( - SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, - Resource.newInstance(256, 1)); - dag2.addVertex(vertex); - accessControls = new DAGAccessControls(); - accessControls.setUsersWithViewACLs(Collections.singleton("nobody3")); - accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3")); - dag2.setAccessControls(accessControls); - dagClient = tezSession.submitDAG(dag2); - dagStatus = dagClient.getDAGStatus(null); - while (!dagStatus.isCompleted()) { - LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " - + dagStatus.getState()); - Thread.sleep(500l); - dagStatus = dagClient.getDAGStatus(null); - } - dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); - Assert.assertNull(dagLogging); - myAclPolicyManager.timelineClient = spy(myAclPolicyManager.timelineClient); - tezSession.stop(); - verify(myAclPolicyManager.timelineClient, times(1)).stop(); - } - -/** - * test failure of domain creation during dag submittion in nonsession mode - * only affect logging for that dag not following submitted dag - * @throws Exception - */ - @Test (timeout=50000) - public void testMultipleDagNonSession() throws Exception { - TezClient tezClient = null; - String viewAcls = "nobody nobody_group"; - SleepProcessorConfig spConf = new SleepProcessorConfig(1); - - DAG dag = DAG.create("TezSleepProcessor"); - Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( - SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, - Resource.newInstance(256, 1)); - dag.addVertex(vertex); - DAGAccessControls accessControls = new DAGAccessControls(); - accessControls.setUsersWithViewACLs(Collections.singleton("nobody2")); - accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2")); - dag.setAccessControls(accessControls); - - TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); - tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls); - tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, - ATSHistoryLoggingService.class.getName()); - Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random - .nextInt(100000)))); - remoteFs.mkdirs(remoteStagingDir); - tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); - - tezClient = TezClient.create("TezSleepProcessor", tezConf, false); - tezClient.start(); - - //////submit first dag which fails in dag creation////// - ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance( - atsHistoryACLManagerClassName); - myAclPolicyManager.timelineClient = mock(TimelineClient.class); - - doThrow(new IOException("Fail to Put Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg()); - tezClient.setUpHistoryAclManager(myAclPolicyManager); - - DAGClient dagClient = tezClient.submitDAG(dag); - DAGStatus dagStatus = dagClient.getDAGStatus(null); - while (!dagStatus.isCompleted()) { - LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " - + dagStatus.getState()); - Thread.sleep(500l); - dagStatus = dagClient.getDAGStatus(null); - } - assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); - String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); - assertEquals(dagLogging, "false"); - - myAclPolicyManager.timelineClient = null; - myAclPolicyManager.setConf(tezConf); - tezClient.setUpHistoryAclManager(myAclPolicyManager); - - //////submit second dag which succeeds in dag creation////// - DAG dag2 = DAG.create("TezSleepProcessor2"); - vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( - SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, - Resource.newInstance(256, 1)); - dag2.addVertex(vertex); - accessControls = new DAGAccessControls(); - accessControls.setUsersWithViewACLs(Collections.singleton("nobody3")); - accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3")); - dag2.setAccessControls(accessControls); - dagClient = tezClient.submitDAG(dag2); - dagStatus = dagClient.getDAGStatus(null); - while (!dagStatus.isCompleted()) { - LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " - + dagStatus.getState()); - Thread.sleep(500l); - dagStatus = dagClient.getDAGStatus(null); - } - dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); - Assert.assertNull(dagLogging); - myAclPolicyManager.timelineClient = spy(myAclPolicyManager.timelineClient); - tezClient.stop(); - verify(myAclPolicyManager.timelineClient, times(1)).stop(); - - } - /** * Test Disable Logging for all dags in a session * due to failure to create domain in session start * @throws Exception @@ -501,19 +336,8 @@ public class TestATSHistoryWithACLs { tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); tezSession = TezClient.create("TezSleepProcessor", tezConf, true); - ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance( - atsHistoryACLManagerClassName); - myAclPolicyManager.timelineClient = mock(TimelineClient.class); - - doThrow(new IOException("Fail to Put Domain")). - when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg()); - tezSession.setUpHistoryAclManager(myAclPolicyManager); tezSession.start(); - ///substitute back mocked timelineClient with a normal one - myAclPolicyManager.timelineClient = null; - myAclPolicyManager.setConf(tezConf); - tezSession.setUpHistoryAclManager(myAclPolicyManager); //////submit first dag ////// DAGClient dagClient = tezSession.submitDAG(dag); DAGStatus dagStatus = dagClient.getDAGStatus(null); @@ -524,9 +348,7 @@ public class TestATSHistoryWithACLs { dagStatus = dagClient.getDAGStatus(null); } assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); - String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); - assertEquals(dagLogging, "false"); - + //////submit second dag////// DAG dag2 = DAG.create("TezSleepProcessor2"); vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( @@ -545,10 +367,9 @@ public class TestATSHistoryWithACLs { Thread.sleep(500l); dagStatus = dagClient.getDAGStatus(null); } - dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED); - assertEquals(dagLogging, "false"); tezSession.stop(); } + /** * use mini cluster to verify data do not push to ats when the daglogging flag * in dagsubmittedevent is set off @@ -559,6 +380,9 @@ public class TestATSHistoryWithACLs { ATSHistoryLoggingService historyLoggingService; historyLoggingService = ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName()); + AppContext appContext = mock(AppContext.class); + when(appContext.getApplicationID()).thenReturn(ApplicationId.newInstance(0, 1)); + historyLoggingService.setAppContext(appContext); TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); String viewAcls = "nobody nobody_group"; tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls); @@ -568,8 +392,8 @@ public class TestATSHistoryWithACLs { .nextInt(100000)))); remoteFs.mkdirs(remoteStagingDir); tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); - historyLoggingService.serviceInit(tezConf); - historyLoggingService.serviceStart(); + historyLoggingService.init(tezConf); + historyLoggingService.start(); ApplicationId appId = ApplicationId.newInstance(100l, 1); TezDAGID tezDAGID = TezDAGID.getInstance( appId, 100); @@ -601,6 +425,9 @@ public class TestATSHistoryWithACLs { ATSHistoryLoggingService historyLoggingService; historyLoggingService = ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName()); + AppContext appContext = mock(AppContext.class); + when(appContext.getApplicationID()).thenReturn(ApplicationId.newInstance(0, 1)); + historyLoggingService.setAppContext(appContext); TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); String viewAcls = "nobody nobody_group"; tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls); @@ -610,8 +437,8 @@ public class TestATSHistoryWithACLs { .nextInt(100000)))); remoteFs.mkdirs(remoteStagingDir); tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); - historyLoggingService.serviceInit(tezConf); - historyLoggingService.serviceStart(); + historyLoggingService.init(tezConf); + historyLoggingService.start(); ApplicationId appId = ApplicationId.newInstance(100l, 1); TezDAGID tezDAGID = TezDAGID.getInstance( appId, 11); @@ -636,7 +463,7 @@ public class TestATSHistoryWithACLs { assertEquals(entity.getEntityType(), "TEZ_DAG_ID"); assertEquals(entity.getEvents().get(0).getEventType(), HistoryEventType.DAG_SUBMITTED.toString()); } - + private static final String atsHistoryACLManagerClassName = "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager"; @Test (timeout=50000) @@ -676,5 +503,4 @@ public class TestATSHistoryWithACLs { } } - }
