Repository: tez Updated Branches: refs/heads/master c07ec7b6f -> a23de4982
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java index f0ec1eb..d28ffe0 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java @@ -31,7 +31,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineClient; @@ -159,7 +158,6 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager { if (domainId != null) { // do nothing LOG.info("Using specified domainId with Timeline, domainId=" + domainId); - return null; } else { if (!autoCreateDomain) { // Error - Cannot fallback to default as that leaves ACLs open @@ -169,18 +167,13 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager { domainId = DOMAIN_ID_PREFIX + applicationId.toString(); createTimelineDomain(applicationId, domainId, tezConf, dagAccessControls); LOG.info("Created Timeline Domain for History ACLs, domainId=" + domainId); - return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId); } + return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId); } private Map<String, String> createDAGDomain(Configuration tezConf, ApplicationId applicationId, String dagName, DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException { - if (dagAccessControls == null) { - // No DAG specific ACLs - return null; - } - String domainId = tezConf.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID); if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED, @@ -198,7 +191,6 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager { if (domainId != null) { // do nothing LOG.info("Using specified domainId with Timeline, domainId=" + domainId); - return null; } else { if (!autoCreateDomain) { // Error - Cannot fallback to default as that leaves ACLs open @@ -206,14 +198,17 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager { + " Domains is disabled"); } + // Create a domain only if dagAccessControls has been specified. + if (dagAccessControls == null) { + return null; + } domainId = DOMAIN_ID_PREFIX + applicationId.toString() + "_" + dagName; createTimelineDomain(applicationId, domainId, tezConf, dagAccessControls); LOG.info("Created Timeline Domain for DAG-specific History ACLs, domainId=" + domainId); - return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId); } + return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId); } - @Override public void setConf(Configuration conf) { this.conf = conf; http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java index dd21d2d..a095cbc 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.history.logging.ats; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -36,11 +37,16 @@ import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.common.security.DAGAccessControls; +import org.apache.tez.common.security.HistoryACLPolicyException; import org.apache.tez.common.security.HistoryACLPolicyManager; +import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezReflectionException; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.logging.HistoryLoggingService; @@ -80,7 +86,9 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { ATSV15HistoryLoggingService.class.getName(); private static final String atsHistoryACLManagerClassName = "org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager"; - private HistoryACLPolicyManager historyACLPolicyManager; + + @VisibleForTesting + HistoryACLPolicyManager historyACLPolicyManager; private int numDagsPerGroup; @@ -133,7 +141,6 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { if (maxTimeToWaitOnShutdown < 0) { waitForeverOnShutdown = true; } - sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID); LOG.info("Initializing " + ATSV15HistoryLoggingService.class.getSimpleName() + " with " + ", maxPollingTime(ms)=" + maxPollingTimeMillis @@ -165,6 +172,15 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { } timelineClient.start(); + // create a session domain id, if it fails then disable history logging. + try { + sessionDomainId = createSessionDomain(); + } catch (HistoryACLPolicyException | IOException e) { + LOG.warn("Could not setup history acls, disabling history logging.", e); + historyLoggingEnabled = false; + return; + } + eventHandlingThread = new Thread(new Runnable() { @Override public void run() { @@ -216,9 +232,6 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { @Override public void serviceStop() { - if (!historyLoggingEnabled || timelineClient == null) { - return; - } LOG.info("Stopping ATSService" + ", eventQueueBacklog=" + eventQueue.size()); stopped.set(true); @@ -265,11 +278,12 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { LOG.warn("Did not finish flushing eventQueue before stopping ATSService" + ", eventQueueBacklog=" + eventQueue.size()); } - timelineClient.stop(); + if (timelineClient != null) { + timelineClient.stop(); + } if (historyACLPolicyManager != null) { historyACLPolicyManager.close(); } - } @VisibleForTesting @@ -331,13 +345,6 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { skippedDAGs.add(dagId); return false; } - if (historyACLPolicyManager != null) { - String dagDomainId = dagSubmittedEvent.getConf().get( - TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID); - if (dagDomainId != null) { - dagDomainIdMap.put(dagId, dagDomainId); - } - } } if (eventType.equals(HistoryEventType.DAG_RECOVERED)) { DAGRecoveredEvent dagRecoveredEvent = (DAGRecoveredEvent) event.getHistoryEvent(); @@ -363,28 +370,17 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { } private void handleEvents(DAGHistoryEvent event) { - String domainId = sessionDomainId; - TezDAGID dagId = event.getDagID(); - - if (historyACLPolicyManager != null && dagId != null) { - if (dagDomainIdMap.containsKey(dagId)) { - domainId = dagDomainIdMap.get(dagId); - } + String domainId = getDomainForEvent(event); + // skippedDags is updated in the above call so check again. + if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) { + return; } - TimelineEntity entity = - HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()); + TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity( + event.getHistoryEvent()); - if (historyACLPolicyManager != null) { - if (HistoryEventType.isDAGSpecificEvent(event.getHistoryEvent().getEventType())) { - if (domainId != null && !domainId.isEmpty()) { - historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); - } - } else { - if (sessionDomainId != null && !sessionDomainId.isEmpty()) { - historyACLPolicyManager.updateTimelineEntityDomain(entity, sessionDomainId); - } - } + if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) { + historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); } try { @@ -408,7 +404,99 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { } catch (Exception e) { LOG.warn("Could not handle history events", e); } + } + private String getDomainForEvent(DAGHistoryEvent event) { + String domainId = sessionDomainId; + if (historyACLPolicyManager == null) { + return domainId; + } + + TezDAGID dagId = event.getDagID(); + HistoryEvent historyEvent = event.getHistoryEvent(); + if (dagId == null || !HistoryEventType.isDAGSpecificEvent(historyEvent.getEventType())) { + return domainId; + } + + if (dagDomainIdMap.containsKey(dagId)) { + // If we already have the domain for the dag id return it + domainId = dagDomainIdMap.get(dagId); + // Cleanup if this is the last event. + if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) { + dagDomainIdMap.remove(dagId); + } + } else if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType() + || HistoryEventType.DAG_RECOVERED == historyEvent.getEventType()) { + // In case this is the first event for the dag, create and populate dag domain. + Configuration conf; + DAGPlan dagPlan; + if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()) { + conf = ((DAGSubmittedEvent)historyEvent).getConf(); + dagPlan = ((DAGSubmittedEvent)historyEvent).getDAGPlan(); + } else { + conf = appContext.getCurrentDAG().getConf(); + dagPlan = appContext.getCurrentDAG().getJobPlan(); + } + domainId = createDagDomain(conf, dagPlan, dagId); + + // createDagDomain updates skippedDAGs so another check here. + if (skippedDAGs.contains(dagId)) { + return null; + } + + dagDomainIdMap.put(dagId, domainId); + } + return domainId; + } + + /** + * Creates a domain for the session. + * @return domainId to be used. null if acls are disabled. + * @throws HistoryACLPolicyException, IOException Forward if historyACLPolicyManger exception. + */ + private String createSessionDomain() throws IOException, HistoryACLPolicyException { + if (historyACLPolicyManager == null) { + return null; + } + Map<String, String> domainInfo = historyACLPolicyManager.setupSessionACLs(getConfig(), + appContext.getApplicationID()); + if (domainInfo != null) { + return domainInfo.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID); + } + return null; } + /** + * When running in session mode, create a domain for the dag and return it. + * @param dagConf The configuration the dag for which domain has to be created. + * @param dagPlan The dag plan which contains the ACLs. + * @param dagId The dagId for which domain has to be created. + * @return The created domain id on success. + * sessionDomainId: If there is a failure also disable history logging for this dag. + * sessionDomainId: If historyACLPolicyManager returns null. + */ + private String createDagDomain(Configuration dagConf, DAGPlan dagPlan, TezDAGID dagId) { + // In non session mode dag domain is same as session domain id. + if (!appContext.isSession()) { + return sessionDomainId; + } + DAGAccessControls dagAccessControls = dagPlan.hasAclInfo() + ? DagTypeConverters.convertDAGAccessControlsFromProto(dagPlan.getAclInfo()) + : null; + try { + Map<String, String> domainInfo = historyACLPolicyManager.setupSessionDAGACLs( + dagConf, appContext.getApplicationID(), Integer.toString(dagId.getId()), + dagAccessControls); + if (domainInfo != null) { + return domainInfo.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID); + } + // Fallback to session domain, if domainInfo was null + return sessionDomainId; + } catch (IOException | HistoryACLPolicyException e) { + LOG.warn("Could not setup ACLs for DAG, disabling history logging for dag.", e); + skippedDAGs.add(dagId); + // Return value is not used, check for skippedDAG is important. + return null; + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java index 6f653ad..a690a19 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java @@ -50,6 +50,7 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.runtime.library.processor.SleepProcessor; @@ -300,6 +301,7 @@ public class TestATSHistoryV15 { ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService(); AppContext appContext = mock(AppContext.class); when(appContext.getApplicationID()).thenReturn(appId); + when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {}); service.setAppContext(appContext); TimelineEntityGroupId grpId = service.getGroupId(event); http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java index 1869b56..9111195 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java @@ -22,11 +22,18 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,6 +51,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.tez.common.security.DAGAccessControls; +import org.apache.tez.common.security.HistoryACLPolicyManager; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.AppContext; @@ -66,10 +75,14 @@ public class TestATSV15HistoryLoggingService { private static String user = "TEST_USER"; private InMemoryTimelineClient timelineClient; + private AppContext appContext; @Test(timeout=2000) public void testDAGGroupingDefault() throws Exception { ATSV15HistoryLoggingService service = createService(-1); + + service.start(); + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { service.handle(event); @@ -96,6 +109,8 @@ public class TestATSV15HistoryLoggingService { @Test(timeout=2000) public void testDAGGroupingDisabled() throws Exception { ATSV15HistoryLoggingService service = createService(1); + service.start(); + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { service.handle(event); @@ -123,6 +138,8 @@ public class TestATSV15HistoryLoggingService { public void testDAGGroupingGroupingEnabled() throws Exception { int numDagsPerGroup = 100; ATSV15HistoryLoggingService service = createService(numDagsPerGroup); + service.start(); + TezDAGID dagId1 = TezDAGID.getInstance(appId, 1); for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { service.handle(event); @@ -172,14 +189,243 @@ public class TestATSV15HistoryLoggingService { service.stop(); } + @Test + public void testNonSessionDomains() throws Exception { + ATSV15HistoryLoggingService service = createService(-1); + + HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class); + service.historyACLPolicyManager = historyACLPolicyManager; + + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId))) + .thenReturn(Collections.singletonMap( + TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id")); + + service.start(); + + verify(historyACLPolicyManager, times(1)) + .setupSessionACLs((Configuration)any(), eq(appId)); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { + service.handle(event); + } + while (!service.eventQueue.isEmpty()) { + Thread.sleep(100); + } + // No dag domain were created. + verify(historyACLPolicyManager, times(0)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // All calls made with session domain id. + verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("session-id")); + assertTrue(timelineClient.entityLog.size() > 0); + + service.stop(); + } + + @Test + public void testNonSessionDomainsFailed() throws Exception { + ATSV15HistoryLoggingService service = createService(-1); + + HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class); + service.historyACLPolicyManager = historyACLPolicyManager; + + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId))) + .thenThrow(new IOException()); + + service.start(); + + verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId)); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { + service.handle(event); + } + while (!service.eventQueue.isEmpty()) { + Thread.sleep(100); + } + // No dag domain were created. + verify(historyACLPolicyManager, times(0)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // History logging is disabled. + verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any()); + assertEquals(0, timelineClient.entityLog.size()); + + service.stop(); + } + + @Test + public void testNonSessionDomainsAclNull() throws Exception { + ATSV15HistoryLoggingService service = createService(-1); + + HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class); + service.historyACLPolicyManager = historyACLPolicyManager; + + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId))) + .thenReturn(null); + + service.start(); + + verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId)); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { + service.handle(event); + } + while (!service.eventQueue.isEmpty()) { + Thread.sleep(100); + } + // No dag domain were created. + verify(historyACLPolicyManager, times(0)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // No domain updates but history logging happened. + verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any()); + assertTrue(timelineClient.entityLog.size() > 0); + + service.stop(); + } + + @Test + public void testSessionDomains() throws Exception { + ATSV15HistoryLoggingService service = createService(-1); + + when(appContext.isSession()).thenReturn(true); + + HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class); + service.historyACLPolicyManager = historyACLPolicyManager; + + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId))) + .thenReturn(Collections.singletonMap( + TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id")); + + service.start(); + + // Verify that the session domain was created. + verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId)); + + // Mock dag domain creation. + when(historyACLPolicyManager.setupSessionDAGACLs( + (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any())) + .thenReturn( + Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-id")); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { + service.handle(event); + } + while (!service.eventQueue.isEmpty()) { + Thread.sleep(100); + } + // Verify dag domain was created. + verify(historyACLPolicyManager, times(1)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // calls were made with correct domain ids. + verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("session-id")); + verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-id")); + + service.stop(); + } + + @Test + public void testSessionDomainsFailed() throws Exception { + ATSV15HistoryLoggingService service = createService(-1); + + when(appContext.isSession()).thenReturn(true); + + HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class); + service.historyACLPolicyManager = historyACLPolicyManager; + + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId))) + .thenThrow(new IOException()); + + service.start(); + + // Verify that the session domain was created. + verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId)); + + // Mock dag domain creation. + when(historyACLPolicyManager.setupSessionDAGACLs( + (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any())) + .thenReturn( + Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-id")); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { + service.handle(event); + } + while (!service.eventQueue.isEmpty()) { + Thread.sleep(100); + } + // No dag creation was done. + verify(historyACLPolicyManager, times(0)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // No history logging calls were done + verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any()); + assertEquals(0, timelineClient.entityLog.size()); + + service.stop(); + } + + @Test + public void testSessionDomainsDagFailed() throws Exception { + ATSV15HistoryLoggingService service = createService(-1); + + when(appContext.isSession()).thenReturn(true); + + HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class); + service.historyACLPolicyManager = historyACLPolicyManager; + + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId))) + .thenReturn( + Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id")); + + service.start(); + + // Verify that the session domain creation was called. + verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId)); + + // Mock dag domain creation. + when(historyACLPolicyManager.setupSessionDAGACLs( + (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any())) + .thenThrow(new IOException()); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { + service.handle(event); + } + while (!service.eventQueue.isEmpty()) { + Thread.sleep(100); + } + // Verify dag domain creation was called. + verify(historyACLPolicyManager, times(1)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // AM events sent, dag events are not sent. + verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("session-id")); + verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("dag-id")); + assertEquals(1, timelineClient.entityLog.size()); + + service.stop(); + } + private ATSV15HistoryLoggingService createService(int numDagsPerGroup) { ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService(); - AppContext appContext = mock(AppContext.class); + appContext = mock(AppContext.class); when(appContext.getApplicationID()).thenReturn(appId); when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {}); service.setAppContext(appContext); - Configuration conf = new Configuration(); + Configuration conf = new Configuration(false); if (numDagsPerGroup != -1) { conf.setInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP, numDagsPerGroup); @@ -191,7 +437,6 @@ public class TestATSV15HistoryLoggingService { timelineClient.init(conf); service.timelineClient = timelineClient; - service.start(); return service; } http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java index 6b8d6e5..dc215fd 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java @@ -18,6 +18,8 @@ package org.apache.tez.dag.history.logging.ats; +import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -36,13 +38,17 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.security.DAGAccessControls; +import org.apache.tez.common.security.HistoryACLPolicyException; import org.apache.tez.common.security.HistoryACLPolicyManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezReflectionException; -import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.logging.HistoryLoggingService; @@ -54,8 +60,8 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { private static final Logger LOG = LoggerFactory.getLogger(ATSHistoryLoggingService.class); - private LinkedBlockingQueue<DAGHistoryEvent> eventQueue = - new LinkedBlockingQueue<DAGHistoryEvent>(); + @VisibleForTesting + LinkedBlockingQueue<DAGHistoryEvent> eventQueue = new LinkedBlockingQueue<DAGHistoryEvent>(); private Thread eventHandlingThread; private AtomicBoolean stopped = new AtomicBoolean(false); @@ -80,7 +86,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService"; private static final String atsHistoryACLManagerClassName = "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager"; - private HistoryACLPolicyManager historyACLPolicyManager; + + @VisibleForTesting + HistoryACLPolicyManager historyACLPolicyManager; public ATSHistoryLoggingService() { super(ATSHistoryLoggingService.class.getName()); @@ -121,7 +129,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { if (maxTimeToWaitOnShutdown < 0) { waitForeverOnShutdown = true; } - sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID); LOG.info("Initializing " + ATSHistoryLoggingService.class.getSimpleName() + " with " + "maxEventsPerBatch=" + maxEventsPerBatch @@ -150,8 +157,17 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { if (!historyLoggingEnabled || timelineClient == null) { return; } + timelineClient.start(); + try { + sessionDomainId = createSessionDomain(); + } catch (HistoryACLPolicyException | IOException e) { + LOG.warn("Could not setup history acls, disabling history logging.", e); + historyLoggingEnabled = false; + return; + } + eventHandlingThread = new Thread(new Runnable() { @Override public void run() { @@ -200,9 +216,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { @Override public void serviceStop() { - if (!historyLoggingEnabled || timelineClient == null) { - return; - } LOG.info("Stopping ATSService" + ", eventQueueBacklog=" + eventQueue.size()); stopped.set(true); @@ -242,7 +255,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { LOG.warn("Did not finish flushing eventQueue before stopping ATSService" + ", eventQueueBacklog=" + eventQueue.size()); } - timelineClient.stop(); + if (timelineClient != null) { + timelineClient.stop(); + } if (historyACLPolicyManager != null) { historyACLPolicyManager.close(); } @@ -289,13 +304,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { skippedDAGs.add(dagId); return false; } - if (historyACLPolicyManager != null) { - String dagDomainId = dagSubmittedEvent.getConf().get( - TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID); - if (dagDomainId != null) { - dagDomainIdMap.put(dagId, dagDomainId); - } - } } if (eventType.equals(HistoryEventType.DAG_RECOVERED)) { DAGRecoveredEvent dagRecoveredEvent = (DAGRecoveredEvent) event.getHistoryEvent(); @@ -321,33 +329,19 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { } private void handleEvents(List<DAGHistoryEvent> events) { - TimelineEntity[] entities = new TimelineEntity[events.size()]; - for (int i = 0; i < events.size(); ++i) { - DAGHistoryEvent event = events.get(i); - String domainId = sessionDomainId; - - TezDAGID dagId = event.getDagID(); - - if (historyACLPolicyManager != null && dagId != null) { - if (dagDomainIdMap.containsKey(dagId)) { - domainId = dagDomainIdMap.get(dagId); - } + List<TimelineEntity> entities = new ArrayList<>(events.size()); + for (DAGHistoryEvent event : events) { + String domainId = getDomainForEvent(event); + // skippedDags is updated in the above call so check again. + if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) { + continue; } - - entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()); - - if (historyACLPolicyManager != null) { - if (HistoryEventType.isDAGSpecificEvent(event.getHistoryEvent().getEventType())) { - if (domainId != null && !domainId.isEmpty()) { - historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId); - } - } else { - if (sessionDomainId != null && !sessionDomainId.isEmpty()) { - historyACLPolicyManager.updateTimelineEntityDomain(entities[i], sessionDomainId); - } - } + TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity( + event.getHistoryEvent()); + entities.add(entity); + if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) { + historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); } - } if (LOG.isDebugEnabled()) { @@ -355,7 +349,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { } try { TimelinePutResponse response = - timelineClient.putEntities(entities); + timelineClient.putEntities(entities.toArray(new TimelineEntity[entities.size()])); if (response != null && !response.getErrors().isEmpty()) { int count = response.getErrors().size(); @@ -375,4 +369,97 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { } } + private String getDomainForEvent(DAGHistoryEvent event) { + String domainId = sessionDomainId; + if (historyACLPolicyManager == null) { + return domainId; + } + + TezDAGID dagId = event.getDagID(); + HistoryEvent historyEvent = event.getHistoryEvent(); + if (dagId == null || !HistoryEventType.isDAGSpecificEvent(historyEvent.getEventType())) { + return domainId; + } + + if (dagDomainIdMap.containsKey(dagId)) { + // If we already have the domain for the dag id return it + domainId = dagDomainIdMap.get(dagId); + // Cleanup if this is the last event. + if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) { + dagDomainIdMap.remove(dagId); + } + } else if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType() + || HistoryEventType.DAG_RECOVERED == historyEvent.getEventType()) { + // In case this is the first event for the dag, create and populate dag domain. + Configuration conf; + DAGPlan dagPlan; + if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()) { + conf = ((DAGSubmittedEvent)historyEvent).getConf(); + dagPlan = ((DAGSubmittedEvent)historyEvent).getDAGPlan(); + } else { + conf = appContext.getCurrentDAG().getConf(); + dagPlan = appContext.getCurrentDAG().getJobPlan(); + } + domainId = createDagDomain(conf, dagPlan, dagId); + + // createDagDomain updates skippedDAGs so another check here. + if (skippedDAGs.contains(dagId)) { + return null; + } + + dagDomainIdMap.put(dagId, domainId); + } + return domainId; + } + + /** + * Creates a domain for the session. + * @return domainId to be used. null if acls are disabled. + * @throws HistoryACLPolicyException, IOException Forward if historyACLPolicyManger exception. + */ + private String createSessionDomain() throws HistoryACLPolicyException, IOException { + if (historyACLPolicyManager == null) { + return null; + } + Map<String, String> domainInfo = historyACLPolicyManager.setupSessionACLs(getConfig(), + appContext.getApplicationID()); + if (domainInfo != null) { + return domainInfo.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID); + } + return null; + } + + /** + * When running in session mode, create a domain for the dag and return it. + * @param dagConf The configuration the dag for which domain has to be created. + * @param dagPlan The dag plan which contains the ACLs. + * @param dagId The dagId for which domain has to be created. + * @return The created domain id on success. + * sessionDomainId: If there is a failure also disable history logging for this dag. + * sessionDomainId: If historyACLPolicyManager returns null. + */ + private String createDagDomain(Configuration dagConf, DAGPlan dagPlan, TezDAGID dagId) { + // In non session mode dag domain is same as session domain id. + if (!appContext.isSession()) { + return sessionDomainId; + } + DAGAccessControls dagAccessControls = dagPlan.hasAclInfo() + ? DagTypeConverters.convertDAGAccessControlsFromProto(dagPlan.getAclInfo()) + : null; + try { + Map<String, String> domainInfo = historyACLPolicyManager.setupSessionDAGACLs( + dagConf, appContext.getApplicationID(), Integer.toString(dagId.getId()), + dagAccessControls); + if (domainInfo != null) { + return domainInfo.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID); + } + // Fallback to session domain, if domainInfo was null + return sessionDomainId; + } catch (IOException | HistoryACLPolicyException e) { + LOG.warn("Could not setup ACLs for DAG, disabling history logging for dag.", e); + skippedDAGs.add(dagId); + // Return value is not used, check for skippedDAG is important. + return null; + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java index 464864e..da57eb2 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java @@ -21,16 +21,30 @@ package org.apache.tez.dag.history.logging.ats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.common.security.DAGAccessControls; +import org.apache.tez.common.security.HistoryACLPolicyManager; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.AMStartedEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -39,9 +53,19 @@ import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + public class TestATSHistoryLoggingService { private static final Logger LOG = LoggerFactory.getLogger(TestATSHistoryLoggingService.class); @@ -51,11 +75,15 @@ public class TestATSHistoryLoggingService { private Configuration conf; private int atsInvokeCounter; private int atsEntitiesCounter; + private HistoryACLPolicyManager historyACLPolicyManager; private SystemClock clock = new SystemClock(); + private static ApplicationId appId = ApplicationId.newInstance(1000l, 1); + private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); @Before public void setup() throws Exception { appContext = mock(AppContext.class); + historyACLPolicyManager = mock(HistoryACLPolicyManager.class); atsHistoryLoggingService = new ATSHistoryLoggingService(); atsHistoryLoggingService.setAppContext(appContext); conf = new Configuration(false); @@ -63,13 +91,16 @@ public class TestATSHistoryLoggingService { 1000l); conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2); conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + conf.set(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "test-domain"); atsInvokeCounter = 0; atsEntitiesCounter = 0; atsHistoryLoggingService.init(conf); + atsHistoryLoggingService.historyACLPolicyManager = historyACLPolicyManager; atsHistoryLoggingService.timelineClient = mock(TimelineClient.class); - atsHistoryLoggingService.start(); + when(appContext.getClock()).thenReturn(clock); when(appContext.getCurrentDAGID()).thenReturn(null); + when(appContext.getApplicationID()).thenReturn(appId); when(atsHistoryLoggingService.timelineClient.putEntities( Matchers.<TimelineEntity[]>anyVararg())).thenAnswer( new Answer<Object>() { @@ -96,6 +127,7 @@ public class TestATSHistoryLoggingService { @Test(timeout=20000) public void testATSHistoryLoggingServiceShutdown() { + atsHistoryLoggingService.start(); TezDAGID tezDAGID = TezDAGID.getInstance( ApplicationId.newInstance(100l, 1), 1); DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, @@ -122,6 +154,7 @@ public class TestATSHistoryLoggingService { @Test(timeout=20000) public void testATSEventBatching() { + atsHistoryLoggingService.start(); TezDAGID tezDAGID = TezDAGID.getInstance( ApplicationId.newInstance(100l, 1), 1); DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, @@ -145,9 +178,8 @@ public class TestATSHistoryLoggingService { @Test(timeout=20000) public void testTimelineServiceDisable() throws Exception { + atsHistoryLoggingService.start(); ATSHistoryLoggingService atsHistoryLoggingService1; - AppContext appContext1; - appContext1 = mock(AppContext.class); atsHistoryLoggingService1 = new ATSHistoryLoggingService(); atsHistoryLoggingService1.setAppContext(appContext); @@ -160,7 +192,7 @@ public class TestATSHistoryLoggingService { ++atsInvokeCounter; atsEntitiesCounter += invocation.getArguments().length; try { - Thread.sleep(500l); + Thread.sleep(10l); } catch (InterruptedException e) { // do nothing } @@ -181,7 +213,7 @@ public class TestATSHistoryLoggingService { } try { - Thread.sleep(1000l); + Thread.sleep(20l); } catch (InterruptedException e) { // Do nothing } @@ -190,6 +222,238 @@ public class TestATSHistoryLoggingService { Assert.assertEquals(atsInvokeCounter, 0); Assert.assertEquals(atsEntitiesCounter, 0); Assert.assertNull(atsHistoryLoggingService1.timelineClient); - + atsHistoryLoggingService1.close(); + } + + @Test(timeout=10000) + public void testNonSessionDomains() throws Exception { + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any())) + .thenReturn( + Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id")); + atsHistoryLoggingService.start(); + verify(historyACLPolicyManager, times(1)).setupSessionACLs( + (Configuration)any(), (ApplicationId)any()); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) { + atsHistoryLoggingService.handle(event); + } + Thread.sleep(2500); + while (!atsHistoryLoggingService.eventQueue.isEmpty()) { + Thread.sleep(100); + } + // No dag domain were created. + verify(historyACLPolicyManager, times(0)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // All calls made with session domain id. + verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("session-id")); + } + + @Test(timeout=10000) + public void testNonSessionDomainsFailed() throws Exception { + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any())) + .thenThrow(new IOException()); + atsHistoryLoggingService.start(); + verify(historyACLPolicyManager, times(1)).setupSessionACLs( + (Configuration)any(), (ApplicationId)any()); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) { + atsHistoryLoggingService.handle(event); + } + while (!atsHistoryLoggingService.eventQueue.isEmpty()) { + Thread.sleep(1000); + } + // No dag domain were created. + verify(historyACLPolicyManager, times(0)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // All calls made with session domain id. + verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("session-id")); + Assert.assertEquals(0, atsEntitiesCounter); + } + + @Test(timeout=10000) + public void testNonSessionDomainsAclNull() throws Exception { + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any())) + .thenReturn(null); + atsHistoryLoggingService.start(); + verify(historyACLPolicyManager, times(1)).setupSessionACLs( + (Configuration)any(), (ApplicationId)any()); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) { + atsHistoryLoggingService.handle(event); + } + Thread.sleep(2500); + while (!atsHistoryLoggingService.eventQueue.isEmpty()) { + Thread.sleep(100); + } + // No dag domain were created. + verify(historyACLPolicyManager, times(0)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // All calls made with session domain id. + verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("session-id")); + Assert.assertEquals(5, atsEntitiesCounter); + } + + @Test(timeout=10000) + public void testSessionDomains() throws Exception { + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any())) + .thenReturn( + Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "test-domain")); + + when(historyACLPolicyManager.setupSessionDAGACLs( + (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any())) + .thenReturn( + Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-domain")); + + when(appContext.isSession()).thenReturn(true); + atsHistoryLoggingService.start(); + verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), + (ApplicationId)any()); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) { + atsHistoryLoggingService.handle(event); + } + Thread.sleep(2500); + while (!atsHistoryLoggingService.eventQueue.isEmpty()) { + Thread.sleep(100); + } + // No dag domain were created. + verify(historyACLPolicyManager, times(1)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // All calls made with session domain id. + verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("test-domain")); + verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-domain")); + } + + @Test(timeout=10000) + public void testSessionDomainsFailed() throws Exception { + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any())) + .thenThrow(new IOException()); + + when(historyACLPolicyManager.setupSessionDAGACLs( + (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any())) + .thenReturn( + Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-domain")); + + when(appContext.isSession()).thenReturn(true); + atsHistoryLoggingService.start(); + verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), + (ApplicationId)any()); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) { + atsHistoryLoggingService.handle(event); + } + while (!atsHistoryLoggingService.eventQueue.isEmpty()) { + Thread.sleep(1000); + } + // No dag domain were created. + verify(historyACLPolicyManager, times(0)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // No calls were made for domains. + verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any()); + Assert.assertEquals(0, atsEntitiesCounter); + } + + @Test(timeout=10000) + public void testSessionDomainsDagFailed() throws Exception { + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any())) + .thenReturn(Collections.singletonMap( + TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-domain")); + + when(historyACLPolicyManager.setupSessionDAGACLs( + (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any())) + .thenThrow(new IOException()); + + when(appContext.isSession()).thenReturn(true); + atsHistoryLoggingService.start(); + verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), + (ApplicationId)any()); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) { + atsHistoryLoggingService.handle(event); + } + Thread.sleep(2500); + while (!atsHistoryLoggingService.eventQueue.isEmpty()) { + Thread.sleep(100); + } + // DAG domain was called once. + verify(historyACLPolicyManager, times(1)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // All calls made with session domain id. + verify(historyACLPolicyManager, times(1)) + .updateTimelineEntityDomain(any(), eq("session-domain")); + verify(historyACLPolicyManager, times(1)) + .updateTimelineEntityDomain(any(), (String)any()); + Assert.assertEquals(1, atsEntitiesCounter); + } + + @Test(timeout=10000) + public void testSessionDomainsAclNull() throws Exception { + when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any())) + .thenReturn(null); + + when(historyACLPolicyManager.setupSessionDAGACLs( + (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any())) + .thenReturn(null); + + when(appContext.isSession()).thenReturn(true); + atsHistoryLoggingService.start(); + verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), + (ApplicationId)any()); + + // Send the event and wait for completion. + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) { + atsHistoryLoggingService.handle(event); + } + Thread.sleep(2500); + while (!atsHistoryLoggingService.eventQueue.isEmpty()) { + Thread.sleep(100); + } + // No dag domain were created. + verify(historyACLPolicyManager, times(1)) + .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()); + + // All calls made with session domain id. + verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any()); + Assert.assertEquals(5, atsEntitiesCounter); + } + + private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId, + ATSHistoryLoggingService service) { + List<DAGHistoryEvent> historyEvents = new ArrayList<>(); + + long time = System.currentTimeMillis(); + Configuration conf = new Configuration(service.getConfig()); + historyEvents.add(new DAGHistoryEvent(null, new AMStartedEvent(attemptId, time, "user"))); + historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time, + DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null))); + TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); + historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time))); + TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1); + historyEvents + .add(new DAGHistoryEvent(dagId, new TaskStartedEvent(tezTaskID, "test", time, time))); + historyEvents.add(new DAGHistoryEvent(dagId, + new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time, + ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765), null, + null, null))); + return historyEvents; } } http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index b0bacf4..3be7131 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -663,7 +663,8 @@ public class MRRSleepJob extends Configured implements Tool { " [-irt intermediateReduceSleepTime]" + " [-recordt recordSleepTime (msec)]" + " [-generateSplitsInAM (false)/true]" + - " [-writeSplitsToDfs (false)/true]"); + " [-writeSplitsToDfs (false)/true]" + + " [-numDags numDagsToSubmit"); ToolRunner.printGenericCommandUsage(System.err); return 2; } @@ -676,6 +677,8 @@ public class MRRSleepJob extends Configured implements Tool { boolean writeSplitsToDfs = false; boolean generateSplitsInAM = false; boolean splitsOptionFound = false; + boolean isSession = false; + int numDags = 1; for(int i=0; i < args.length; i++ ) { if(args[i].equals("-m")) { @@ -716,6 +719,12 @@ public class MRRSleepJob extends Configured implements Tool { } splitsOptionFound = true; writeSplitsToDfs = Boolean.parseBoolean(args[++i]); + } else if (args[i].equals("-numDags")) { + numDags = Integer.parseInt(args[++i]); + if (numDags < 1) { + throw new RuntimeException("numDags should be positive"); + } + isSession = numDags > 1; } } @@ -747,13 +756,20 @@ public class MRRSleepJob extends Configured implements Tool { mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount, iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs, generateSplitsInAM); - TezClient tezSession = TezClient.create("MRRSleep", conf, false, null, credentials); + TezClient tezSession = TezClient.create("MRRSleep", conf, isSession, null, credentials); tezSession.start(); - DAGClient dagClient = tezSession.submitDAG(dag); - dagClient.waitForCompletion(); - tezSession.stop(); - - return dagClient.getDAGStatus(null).getState().equals(DAGStatus.State.SUCCEEDED) ? 0 : 1; + try { + for (; numDags > 0; --numDags) { + DAGClient dagClient = tezSession.submitDAG(dag); + dagClient.waitForCompletion(); + if (!dagClient.getDAGStatus(null).getState().equals(DAGStatus.State.SUCCEEDED)) { + return 1; + } + } + } finally { + tezSession.stop(); + } + return 0; } }
