Repository: tez Updated Branches: refs/heads/master 9930011b0 -> e610b00d3
TEZ-3358. Support using the same TimelineGroupId for multiple DAGs. (Harish Jaiprakash via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e610b00d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e610b00d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e610b00d Branch: refs/heads/master Commit: e610b00d388b61c2cc66a60b78d26e1fb4ce74de Parents: 9930011 Author: Hitesh Shah <[email protected]> Authored: Thu Jul 21 14:35:51 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Thu Jul 21 14:35:51 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../ats/ATSV15HistoryLoggingService.java | 13 +- .../ats/TestATSV15HistoryLoggingService.java | 286 +++++++++++++++++++ 3 files changed, 298 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e610b00d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a925104..364ff2c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3358. Support using the same TimelineGroupId for multiple DAGs. TEZ-3357. Change TimelineCachePlugin to handle DAG grouping. TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat. TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used. @@ -86,6 +87,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3358. Support using the same TimelineGroupId for multiple DAGs. TEZ-3357. Change TimelineCachePlugin to handle DAG grouping. TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat. TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used. http://git-wip-us.apache.org/repos/asf/tez/blob/e610b00d/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java index fc7e97a..dd21d2d 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java @@ -54,7 +54,8 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { private static final Logger LOG = LoggerFactory.getLogger(ATSV15HistoryLoggingService.class); - private LinkedBlockingQueue<DAGHistoryEvent> eventQueue = + @VisibleForTesting + LinkedBlockingQueue<DAGHistoryEvent> eventQueue = new LinkedBlockingQueue<DAGHistoryEvent>(); private Thread eventHandlingThread; @@ -81,6 +82,8 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { "org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager"; private HistoryACLPolicyManager historyACLPolicyManager; + private int numDagsPerGroup; + public ATSV15HistoryLoggingService() { super(ATSV15HistoryLoggingService.class.getName()); } @@ -151,6 +154,8 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { historyACLPolicyManager = null; } + numDagsPerGroup = conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP, + TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT); } @Override @@ -290,8 +295,10 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService { case VERTEX_GROUP_COMMIT_STARTED: case VERTEX_GROUP_COMMIT_FINISHED: case DAG_RECOVERED: - return TimelineEntityGroupId.newInstance(event.getDagID().getApplicationId(), - event.getDagID().toString()); + String entityGroupId = numDagsPerGroup > 1 + ? event.getDagID().getGroupId(numDagsPerGroup) + : event.getDagID().toString(); + return TimelineEntityGroupId.newInstance(event.getDagID().getApplicationId(), entityGroupId); case APP_LAUNCHED: case AM_LAUNCHED: case AM_STARTED: http://git-wip-us.apache.org/repos/asf/tez/blob/e610b00d/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java new file mode 100644 index 0000000..87a48f6 --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.AMStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.HadoopShim; +import org.junit.Test; + +public class TestATSV15HistoryLoggingService { + private static ApplicationId appId = ApplicationId.newInstance(1000l, 1); + private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + private static String user = "TEST_USER"; + + private InMemoryTimelineClient timelineClient; + + @Test(timeout=2000) + public void testDAGGroupingDefault() throws Exception { + ATSV15HistoryLoggingService service = createService(-1); + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { + service.handle(event); + } + while (!service.eventQueue.isEmpty()) { + Thread.sleep(100); + } + + assertEquals(2, timelineClient.entityLog.size()); + + List<TimelineEntity> amEvents = timelineClient.entityLog.get( + TimelineEntityGroupId.newInstance(appId, appId.toString())); + assertNotNull(amEvents); + assertEquals(1, amEvents.size()); + + List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get( + TimelineEntityGroupId.newInstance(appId, dagId1.toString())); + assertNotNull(nonGroupedDagEvents); + assertEquals(4, nonGroupedDagEvents.size()); + + service.stop(); + } + + @Test(timeout=2000) + public void testDAGGroupingDisabled() throws Exception { + ATSV15HistoryLoggingService service = createService(1); + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { + service.handle(event); + } + while (!service.eventQueue.isEmpty()) { + Thread.sleep(100); + } + + assertEquals(2, timelineClient.entityLog.size()); + + List<TimelineEntity> amEvents = timelineClient.entityLog.get( + TimelineEntityGroupId.newInstance(appId, appId.toString())); + assertNotNull(amEvents); + assertEquals(1, amEvents.size()); + + List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get( + TimelineEntityGroupId.newInstance(appId, dagId1.toString())); + assertNotNull(nonGroupedDagEvents); + assertEquals(4, nonGroupedDagEvents.size()); + + service.stop(); + } + + @Test(timeout=2000) + public void testDAGGroupingGroupingEnabled() throws Exception { + int numDagsPerGroup = 100; + ATSV15HistoryLoggingService service = createService(numDagsPerGroup); + TezDAGID dagId1 = TezDAGID.getInstance(appId, 0); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) { + service.handle(event); + } + TezDAGID dagId2 = TezDAGID.getInstance(appId, numDagsPerGroup - 1); + for (DAGHistoryEvent event : makeHistoryEvents(dagId2, service)) { + service.handle(event); + } + + TezDAGID dagId3 = TezDAGID.getInstance(appId, numDagsPerGroup); + for (DAGHistoryEvent event : makeHistoryEvents(dagId3, service)) { + service.handle(event); + } + + while (!service.eventQueue.isEmpty()) { + Thread.sleep(100); + } + + assertEquals(dagId1.getGroupId(numDagsPerGroup), dagId2.getGroupId(numDagsPerGroup)); + assertNotEquals(dagId2.getGroupId(numDagsPerGroup), dagId3.getGroupId(numDagsPerGroup)); + + assertEquals(3, timelineClient.entityLog.size()); + + List<TimelineEntity> amEvents = timelineClient.entityLog.get( + TimelineEntityGroupId.newInstance(appId, appId.toString())); + assertNotNull(amEvents); + assertEquals(3, amEvents.size()); + + List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get( + TimelineEntityGroupId.newInstance(appId, dagId1.toString())); + assertNull(nonGroupedDagEvents); + + List<TimelineEntity> groupedDagEvents = timelineClient.entityLog.get( + TimelineEntityGroupId.newInstance(appId, dagId1.getGroupId(numDagsPerGroup))); + assertNotNull(groupedDagEvents); + assertEquals(8, groupedDagEvents.size()); + + nonGroupedDagEvents = timelineClient.entityLog.get( + TimelineEntityGroupId.newInstance(appId, dagId3.toString())); + assertNull(nonGroupedDagEvents); + + groupedDagEvents = timelineClient.entityLog.get( + TimelineEntityGroupId.newInstance(appId, dagId3.getGroupId(numDagsPerGroup))); + assertNotNull(groupedDagEvents); + assertEquals(4, groupedDagEvents.size()); + + service.stop(); + } + + private ATSV15HistoryLoggingService createService(int numDagsPerGroup) { + ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService(); + AppContext appContext = mock(AppContext.class); + when(appContext.getApplicationID()).thenReturn(appId); + when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {}); + service.setAppContext(appContext); + + Configuration conf = new Configuration(); + if (numDagsPerGroup != -1) { + conf.setInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP, + numDagsPerGroup); + } + service.init(conf); + + // Set timeline service. + timelineClient = new InMemoryTimelineClient(); + timelineClient.init(conf); + service.timelineClient = timelineClient; + + service.start(); + return service; + } + + private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId, + ATSV15HistoryLoggingService service) { + List<DAGHistoryEvent> historyEvents = new ArrayList<>(); + + long time = System.currentTimeMillis(); + Configuration conf = new Configuration(service.getConfig()); + historyEvents.add(new DAGHistoryEvent(null, + new AMStartedEvent(attemptId, time, user))); + historyEvents.add(new DAGHistoryEvent(dagId, + new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user, + conf, null))); + TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); + historyEvents.add(new DAGHistoryEvent(dagId, + new VertexStartedEvent(vertexID, time, time))); + TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1); + historyEvents.add(new DAGHistoryEvent(dagId, + new TaskStartedEvent(tezTaskID, "test", time, time))); + historyEvents.add(new DAGHistoryEvent(dagId, + new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time, + ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765), null, + null, null))); + return historyEvents; + } + + private static class InMemoryTimelineClient extends TimelineClient { + Map<TimelineEntityGroupId, List<TimelineEntity>> entityLog = new HashMap<>(); + + protected InMemoryTimelineClient() { + super("InMemoryTimelineClient"); + } + + @Override + public void flush() throws IOException { + } + + public static final ApplicationId DEFAULT_APP_ID = ApplicationId.newInstance(0, -1); + public static final TimelineEntityGroupId DEFAULT_GROUP_ID = + TimelineEntityGroupId.newInstance(DEFAULT_APP_ID, ""); + + @Override + public synchronized TimelinePutResponse putEntities(TimelineEntity... entities) + throws IOException, YarnException { + return putEntities(null, DEFAULT_GROUP_ID, entities); + } + + @Override + public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId, + TimelineEntityGroupId groupId, + TimelineEntity... entities) throws IOException, YarnException { + List<TimelineEntity> groupEntities = entityLog.get(groupId); + if (groupEntities == null) { + groupEntities = new ArrayList<>(); + entityLog.put(groupId, groupEntities); + } + for (TimelineEntity entity : entities) { + groupEntities.add(entity); + } + return null; + } + + @Override + public void putDomain(TimelineDomain domain) throws IOException, YarnException { + throw new UnsupportedOperationException(); + } + + @Override + public void putDomain(ApplicationAttemptId appAttemptId, TimelineDomain domain) + throws IOException, YarnException { + throw new UnsupportedOperationException(); + } + + @Override + public Token<TimelineDelegationTokenIdentifier> getDelegationToken(String renewer) + throws IOException, YarnException { + return null; + } + + @Override + public long renewDelegationToken(Token<TimelineDelegationTokenIdentifier> timelineDT) + throws IOException, YarnException { + return 0; + } + + @Override + public void cancelDelegationToken(Token<TimelineDelegationTokenIdentifier> timelineDT) + throws IOException, YarnException { + } + } +}
