Repository: tez Updated Branches: refs/heads/master 293c5933f -> cbcb3a786
TEZ-3665. TestATSV15HistoryLoggingService should use mocked TimelineClient (zhiyuany) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cbcb3a78 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cbcb3a78 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cbcb3a78 Branch: refs/heads/master Commit: cbcb3a786e1af6a9ee61648ec9614686123182c3 Parents: 293c593 Author: Zhiyuan Yang <[email protected]> Authored: Wed Mar 22 21:14:51 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Wed Mar 22 21:14:51 2017 -0700 ---------------------------------------------------------------------- .../ats/TestATSV15HistoryLoggingService.java | 144 ++++++++----------- 1 file changed, 58 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cbcb3a78/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java index cbded35..ef5da81 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java @@ -24,6 +24,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.anyVararg; import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -68,13 +71,20 @@ import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.hadoop.shim.HadoopShim; import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestATSV15HistoryLoggingService { private static ApplicationId appId = ApplicationId.newInstance(1000l, 1); private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); private static String user = "TEST_USER"; - private InMemoryTimelineClient timelineClient; + private TimelineClient timelineClient; + Map<TimelineEntityGroupId, List<TimelineEntity>> entityLog; + final TimelineEntityGroupId DEFAULT_GROUP_ID = + TimelineEntityGroupId.newInstance(ApplicationId.newInstance(0, -1), ""); + private AppContext appContext; @Test(timeout=2000) @@ -91,14 +101,14 @@ public class TestATSV15HistoryLoggingService { Thread.sleep(100); } - assertEquals(2, timelineClient.entityLog.size()); + assertEquals(2, entityLog.size()); - List<TimelineEntity> amEvents = timelineClient.entityLog.get( + List<TimelineEntity> amEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, appId.toString())); assertNotNull(amEvents); assertEquals(1, amEvents.size()); - List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get( + List<TimelineEntity> nonGroupedDagEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, dagId1.toString())); assertNotNull(nonGroupedDagEvents); assertEquals(4, nonGroupedDagEvents.size()); @@ -119,14 +129,14 @@ public class TestATSV15HistoryLoggingService { Thread.sleep(100); } - assertEquals(2, timelineClient.entityLog.size()); + assertEquals(2, entityLog.size()); - List<TimelineEntity> amEvents = timelineClient.entityLog.get( + List<TimelineEntity> amEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, appId.toString())); assertNotNull(amEvents); assertEquals(1, amEvents.size()); - List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get( + List<TimelineEntity> nonGroupedDagEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, dagId1.toString())); assertNotNull(nonGroupedDagEvents); assertEquals(4, nonGroupedDagEvents.size()); @@ -161,27 +171,27 @@ public class TestATSV15HistoryLoggingService { assertEquals(dagId1.getGroupId(numDagsPerGroup), dagId2.getGroupId(numDagsPerGroup)); assertNotEquals(dagId2.getGroupId(numDagsPerGroup), dagId3.getGroupId(numDagsPerGroup)); - assertEquals(3, timelineClient.entityLog.size()); + assertEquals(3, entityLog.size()); - List<TimelineEntity> amEvents = timelineClient.entityLog.get( + List<TimelineEntity> amEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, appId.toString())); assertNotNull(amEvents); assertEquals(3, amEvents.size()); - List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get( + List<TimelineEntity> nonGroupedDagEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, dagId1.toString())); assertNull(nonGroupedDagEvents); - List<TimelineEntity> groupedDagEvents = timelineClient.entityLog.get( + List<TimelineEntity> groupedDagEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, dagId1.getGroupId(numDagsPerGroup))); assertNotNull(groupedDagEvents); assertEquals(8, groupedDagEvents.size()); - nonGroupedDagEvents = timelineClient.entityLog.get( + nonGroupedDagEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, dagId3.toString())); assertNull(nonGroupedDagEvents); - groupedDagEvents = timelineClient.entityLog.get( + groupedDagEvents = entityLog.get( TimelineEntityGroupId.newInstance(appId, dagId3.getGroupId(numDagsPerGroup))); assertNotNull(groupedDagEvents); assertEquals(4, groupedDagEvents.size()); @@ -219,7 +229,7 @@ public class TestATSV15HistoryLoggingService { // All calls made with session domain id. verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("session-id")); - assertTrue(timelineClient.entityLog.size() > 0); + assertTrue(entityLog.size() > 0); service.stop(); } @@ -252,7 +262,7 @@ public class TestATSV15HistoryLoggingService { // History logging is disabled. verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any()); - assertEquals(0, timelineClient.entityLog.size()); + assertEquals(0, entityLog.size()); service.stop(); } @@ -285,7 +295,7 @@ public class TestATSV15HistoryLoggingService { // No domain updates but history logging happened. verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any()); - assertTrue(timelineClient.entityLog.size() > 0); + assertTrue(entityLog.size() > 0); service.stop(); } @@ -370,7 +380,7 @@ public class TestATSV15HistoryLoggingService { // No history logging calls were done verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any()); - assertEquals(0, timelineClient.entityLog.size()); + assertEquals(0, entityLog.size()); service.stop(); } @@ -413,12 +423,12 @@ public class TestATSV15HistoryLoggingService { // AM events sent, dag events are not sent. verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("session-id")); verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("dag-id")); - assertEquals(1, timelineClient.entityLog.size()); + assertEquals(1, entityLog.size()); service.stop(); } - private ATSV15HistoryLoggingService createService(int numDagsPerGroup) { + private ATSV15HistoryLoggingService createService(int numDagsPerGroup) throws IOException, YarnException { ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService(); appContext = mock(AppContext.class); when(appContext.getApplicationID()).thenReturn(appId); @@ -433,13 +443,40 @@ public class TestATSV15HistoryLoggingService { service.init(conf); // Set timeline service. - timelineClient = new InMemoryTimelineClient(); - timelineClient.init(conf); + timelineClient = mock(TimelineClient.class); + entityLog = new HashMap<>(); + //timelineClient.init(conf); + when(timelineClient.getDelegationToken(anyString())).thenReturn(null); + when(timelineClient.renewDelegationToken(Matchers.<Token<TimelineDelegationTokenIdentifier>>any())).thenReturn(0L); + when(timelineClient.putEntities(Matchers.<TimelineEntity>anyVararg())).thenAnswer(new Answer() { + @Override + public TimelinePutResponse answer(InvocationOnMock invocation) throws Throwable { + return putEntityHelper(DEFAULT_GROUP_ID, invocation.getArguments(), 0); + } + }); + when(timelineClient.putEntities(any(ApplicationAttemptId.class), any(TimelineEntityGroupId.class), Matchers.<TimelineEntity>anyVararg())).thenAnswer(new Answer() { + @Override + public TimelinePutResponse answer(InvocationOnMock invocation) throws Throwable { + return putEntityHelper(invocation.getArgumentAt(1, TimelineEntityGroupId.class), invocation.getArguments(), 2); + } + }); service.timelineClient = timelineClient; return service; } + private TimelinePutResponse putEntityHelper(TimelineEntityGroupId groupId, Object[] args, int firstEntityIdx) { + List<TimelineEntity> groupEntities = entityLog.get(groupId); + if (groupEntities == null) { + groupEntities = new ArrayList<>(); + entityLog.put(groupId, groupEntities); + } + for (int i = firstEntityIdx; i < args.length; i++) { + groupEntities.add((TimelineEntity) args[i]); + } + return null; + } + private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId, ATSV15HistoryLoggingService service) { List<DAGHistoryEvent> historyEvents = new ArrayList<>(); @@ -463,69 +500,4 @@ public class TestATSV15HistoryLoggingService { null, null))); return historyEvents; } - - private static class InMemoryTimelineClient extends TimelineClient { - Map<TimelineEntityGroupId, List<TimelineEntity>> entityLog = new HashMap<>(); - - protected InMemoryTimelineClient() { - super("InMemoryTimelineClient"); - } - - @Override - public void flush() throws IOException { - } - - public static final ApplicationId DEFAULT_APP_ID = ApplicationId.newInstance(0, -1); - public static final TimelineEntityGroupId DEFAULT_GROUP_ID = - TimelineEntityGroupId.newInstance(DEFAULT_APP_ID, ""); - - @Override - public synchronized TimelinePutResponse putEntities(TimelineEntity... entities) - throws IOException, YarnException { - return putEntities(null, DEFAULT_GROUP_ID, entities); - } - - @Override - public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId, - TimelineEntityGroupId groupId, - TimelineEntity... entities) throws IOException, YarnException { - List<TimelineEntity> groupEntities = entityLog.get(groupId); - if (groupEntities == null) { - groupEntities = new ArrayList<>(); - entityLog.put(groupId, groupEntities); - } - for (TimelineEntity entity : entities) { - groupEntities.add(entity); - } - return null; - } - - @Override - public void putDomain(TimelineDomain domain) throws IOException, YarnException { - throw new UnsupportedOperationException(); - } - - @Override - public void putDomain(ApplicationAttemptId appAttemptId, TimelineDomain domain) - throws IOException, YarnException { - throw new UnsupportedOperationException(); - } - - @Override - public Token<TimelineDelegationTokenIdentifier> getDelegationToken(String renewer) - throws IOException, YarnException { - return null; - } - - @Override - public long renewDelegationToken(Token<TimelineDelegationTokenIdentifier> timelineDT) - throws IOException, YarnException { - return 0; - } - - @Override - public void cancelDelegationToken(Token<TimelineDelegationTokenIdentifier> timelineDT) - throws IOException, YarnException { - } - } }
