Repository: tez
Updated Branches:
refs/heads/branch-0.8 98e352627 -> 428288fb8
TEZ-3358. Support using the same TimelineGroupId for multiple DAGs. (Harish
Jaiprakash via hitesh)
(cherry picked from commit e610b00d388b61c2cc66a60b78d26e1fb4ce74de)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/428288fb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/428288fb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/428288fb
Branch: refs/heads/branch-0.8
Commit: 428288fb81522e466b4206a7c90e455000874a81
Parents: 98e3526
Author: Hitesh Shah <[email protected]>
Authored: Thu Jul 21 14:35:51 2016 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Thu Jul 21 14:37:09 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../ats/ATSV15HistoryLoggingService.java | 13 +-
.../ats/TestATSV15HistoryLoggingService.java | 286 +++++++++++++++++++
3 files changed, 297 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/428288fb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ce6440d..276573f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3358. Support using the same TimelineGroupId for multiple DAGs.
TEZ-3357. Change TimelineCachePlugin to handle DAG grouping.
TEZ-3348. NullPointerException in Tez MROutput while trying to write using
Parquet's DeprecatedParquetOutputFormat.
TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
http://git-wip-us.apache.org/repos/asf/tez/blob/428288fb/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
index fc7e97a..dd21d2d 100644
---
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
+++
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
@@ -54,7 +54,8 @@ public class ATSV15HistoryLoggingService extends
HistoryLoggingService {
private static final Logger LOG =
LoggerFactory.getLogger(ATSV15HistoryLoggingService.class);
- private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+ @VisibleForTesting
+ LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
new LinkedBlockingQueue<DAGHistoryEvent>();
private Thread eventHandlingThread;
@@ -81,6 +82,8 @@ public class ATSV15HistoryLoggingService extends
HistoryLoggingService {
"org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager";
private HistoryACLPolicyManager historyACLPolicyManager;
+ private int numDagsPerGroup;
+
public ATSV15HistoryLoggingService() {
super(ATSV15HistoryLoggingService.class.getName());
}
@@ -151,6 +154,8 @@ public class ATSV15HistoryLoggingService extends
HistoryLoggingService {
historyACLPolicyManager = null;
}
+ numDagsPerGroup =
conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP,
+
TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT);
}
@Override
@@ -290,8 +295,10 @@ public class ATSV15HistoryLoggingService extends
HistoryLoggingService {
case VERTEX_GROUP_COMMIT_STARTED:
case VERTEX_GROUP_COMMIT_FINISHED:
case DAG_RECOVERED:
- return
TimelineEntityGroupId.newInstance(event.getDagID().getApplicationId(),
- event.getDagID().toString());
+ String entityGroupId = numDagsPerGroup > 1
+ ? event.getDagID().getGroupId(numDagsPerGroup)
+ : event.getDagID().toString();
+ return
TimelineEntityGroupId.newInstance(event.getDagID().getApplicationId(),
entityGroupId);
case APP_LAUNCHED:
case AM_LAUNCHED:
case AM_STARTED:
http://git-wip-us.apache.org/repos/asf/tez/blob/428288fb/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
new file mode 100644
index 0000000..87a48f6
--- /dev/null
+++
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import
org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.junit.Test;
+
+public class TestATSV15HistoryLoggingService {
+ private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+ private static ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
+ private static String user = "TEST_USER";
+
+ private InMemoryTimelineClient timelineClient;
+
+ @Test(timeout=2000)
+ public void testDAGGroupingDefault() throws Exception {
+ ATSV15HistoryLoggingService service = createService(-1);
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(2, timelineClient.entityLog.size());
+
+ List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, appId.toString()));
+ assertNotNull(amEvents);
+ assertEquals(1, amEvents.size());
+
+ List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
+ assertNotNull(nonGroupedDagEvents);
+ assertEquals(4, nonGroupedDagEvents.size());
+
+ service.stop();
+ }
+
+ @Test(timeout=2000)
+ public void testDAGGroupingDisabled() throws Exception {
+ ATSV15HistoryLoggingService service = createService(1);
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(2, timelineClient.entityLog.size());
+
+ List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, appId.toString()));
+ assertNotNull(amEvents);
+ assertEquals(1, amEvents.size());
+
+ List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
+ assertNotNull(nonGroupedDagEvents);
+ assertEquals(4, nonGroupedDagEvents.size());
+
+ service.stop();
+ }
+
+ @Test(timeout=2000)
+ public void testDAGGroupingGroupingEnabled() throws Exception {
+ int numDagsPerGroup = 100;
+ ATSV15HistoryLoggingService service = createService(numDagsPerGroup);
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ TezDAGID dagId2 = TezDAGID.getInstance(appId, numDagsPerGroup - 1);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId2, service)) {
+ service.handle(event);
+ }
+
+ TezDAGID dagId3 = TezDAGID.getInstance(appId, numDagsPerGroup);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId3, service)) {
+ service.handle(event);
+ }
+
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(dagId1.getGroupId(numDagsPerGroup),
dagId2.getGroupId(numDagsPerGroup));
+ assertNotEquals(dagId2.getGroupId(numDagsPerGroup),
dagId3.getGroupId(numDagsPerGroup));
+
+ assertEquals(3, timelineClient.entityLog.size());
+
+ List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, appId.toString()));
+ assertNotNull(amEvents);
+ assertEquals(3, amEvents.size());
+
+ List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
+ assertNull(nonGroupedDagEvents);
+
+ List<TimelineEntity> groupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId,
dagId1.getGroupId(numDagsPerGroup)));
+ assertNotNull(groupedDagEvents);
+ assertEquals(8, groupedDagEvents.size());
+
+ nonGroupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, dagId3.toString()));
+ assertNull(nonGroupedDagEvents);
+
+ groupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId,
dagId3.getGroupId(numDagsPerGroup)));
+ assertNotNull(groupedDagEvents);
+ assertEquals(4, groupedDagEvents.size());
+
+ service.stop();
+ }
+
+ private ATSV15HistoryLoggingService createService(int numDagsPerGroup) {
+ ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService();
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getApplicationID()).thenReturn(appId);
+ when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
+ service.setAppContext(appContext);
+
+ Configuration conf = new Configuration();
+ if (numDagsPerGroup != -1) {
+
conf.setInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP,
+ numDagsPerGroup);
+ }
+ service.init(conf);
+
+ // Set timeline service.
+ timelineClient = new InMemoryTimelineClient();
+ timelineClient.init(conf);
+ service.timelineClient = timelineClient;
+
+ service.start();
+ return service;
+ }
+
+ private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
+ ATSV15HistoryLoggingService
service) {
+ List<DAGHistoryEvent> historyEvents = new ArrayList<>();
+
+ long time = System.currentTimeMillis();
+ Configuration conf = new Configuration(service.getConfig());
+ historyEvents.add(new DAGHistoryEvent(null,
+ new AMStartedEvent(attemptId, time, user)));
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(),
attemptId, null, user,
+ conf, null)));
+ TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new VertexStartedEvent(vertexID, time, time)));
+ TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new TaskStartedEvent(tezTaskID, "test", time, time)));
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID,
1), "test", time,
+ ContainerId.newContainerId(attemptId, 1),
NodeId.newInstance("localhost", 8765), null,
+ null, null)));
+ return historyEvents;
+ }
+
+ private static class InMemoryTimelineClient extends TimelineClient {
+ Map<TimelineEntityGroupId, List<TimelineEntity>> entityLog = new
HashMap<>();
+
+ protected InMemoryTimelineClient() {
+ super("InMemoryTimelineClient");
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ public static final ApplicationId DEFAULT_APP_ID =
ApplicationId.newInstance(0, -1);
+ public static final TimelineEntityGroupId DEFAULT_GROUP_ID =
+ TimelineEntityGroupId.newInstance(DEFAULT_APP_ID, "");
+
+ @Override
+ public synchronized TimelinePutResponse putEntities(TimelineEntity...
entities)
+ throws IOException, YarnException {
+ return putEntities(null, DEFAULT_GROUP_ID, entities);
+ }
+
+ @Override
+ public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
+ TimelineEntityGroupId groupId,
+ TimelineEntity... entities) throws IOException, YarnException {
+ List<TimelineEntity> groupEntities = entityLog.get(groupId);
+ if (groupEntities == null) {
+ groupEntities = new ArrayList<>();
+ entityLog.put(groupId, groupEntities);
+ }
+ for (TimelineEntity entity : entities) {
+ groupEntities.add(entity);
+ }
+ return null;
+ }
+
+ @Override
+ public void putDomain(TimelineDomain domain) throws IOException,
YarnException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void putDomain(ApplicationAttemptId appAttemptId, TimelineDomain
domain)
+ throws IOException, YarnException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Token<TimelineDelegationTokenIdentifier> getDelegationToken(String
renewer)
+ throws IOException, YarnException {
+ return null;
+ }
+
+ @Override
+ public long renewDelegationToken(Token<TimelineDelegationTokenIdentifier>
timelineDT)
+ throws IOException, YarnException {
+ return 0;
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<TimelineDelegationTokenIdentifier>
timelineDT)
+ throws IOException, YarnException {
+ }
+ }
+}