Repository: tez Updated Branches: refs/heads/TEZ-2003 e328dc4e4 -> 9bde98d7c (forced update)
http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java new file mode 100644 index 0000000..6d713c5 --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -0,0 +1,850 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.tez.common.ATSConstants; +import org.apache.tez.common.VersionInfo; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.impl.VertexStats; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.AMLaunchedEvent; +import org.apache.tez.dag.history.events.AMStartedEvent; +import org.apache.tez.dag.history.events.AppLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerStoppedEvent; +import org.apache.tez.dag.history.events.DAGCommitStartedEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexCommitStartedEvent; +import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; +import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; +import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; +import org.apache.tez.dag.history.events.VertexInitializedEvent; +import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.history.logging.EntityTypes; +import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.codehaus.jettison.json.JSONException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestHistoryEventTimelineConversion { + + private ApplicationAttemptId applicationAttemptId; + private ApplicationId applicationId; + private String user = "user"; + private Random random = new Random(); + private TezDAGID tezDAGID; + private TezVertexID tezVertexID; + private TezTaskID tezTaskID; + private TezTaskAttemptID tezTaskAttemptID; + private DAGPlan dagPlan; + private ContainerId containerId; + private NodeId nodeId; + + @Before + public void setup() { + applicationId = ApplicationId.newInstance(9999l, 1); + applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1); + tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt()); + tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt()); + tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt()); + tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt()); + dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); + containerId = ContainerId.newInstance(applicationAttemptId, 111); + nodeId = NodeId.newInstance("node", 13435); + } + + @Test(timeout = 5000) + public void testHandlerExists() throws JSONException { + for (HistoryEventType eventType : HistoryEventType.values()) { + HistoryEvent event = null; + switch (eventType) { + case APP_LAUNCHED: + event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(), + user, new Configuration(false), null); + break; + case AM_LAUNCHED: + event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(), + user); + break; + case AM_STARTED: + event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user); + break; + case DAG_SUBMITTED: + event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId, + null, user, null); + break; + case DAG_INITIALIZED: + event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null); + break; + case DAG_STARTED: + event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName()); + break; + case DAG_FINISHED: + event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR, + null, null, user, dagPlan.getName(), null); + break; + case VERTEX_INITIALIZED: + event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(), + random.nextInt(), "proc", null); + break; + case VERTEX_STARTED: + event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt()); + break; + case VERTEX_PARALLELISM_UPDATED: + event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 1); + break; + case VERTEX_FINISHED: + event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(), + random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR, + null, null, null, null); + break; + case TASK_STARTED: + event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt()); + break; + case TASK_FINISHED: + event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(), + tezTaskAttemptID, TaskState.FAILED, null, null); + break; + case TASK_ATTEMPT_STARTED: + event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId, + nodeId, null, null, "nodeHttpAddress"); + break; + case TASK_ATTEMPT_FINISHED: + event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), + random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null); + break; + case CONTAINER_LAUNCHED: + event = new ContainerLaunchedEvent(containerId, random.nextInt(), + applicationAttemptId); + break; + case CONTAINER_STOPPED: + event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId); + break; + case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: + event = new VertexRecoverableEventsGeneratedEvent(); + break; + case DAG_COMMIT_STARTED: + event = new DAGCommitStartedEvent(); + break; + case VERTEX_COMMIT_STARTED: + event = new VertexCommitStartedEvent(); + break; + case VERTEX_GROUP_COMMIT_STARTED: + event = new VertexGroupCommitStartedEvent(); + break; + case VERTEX_GROUP_COMMIT_FINISHED: + event = new VertexGroupCommitFinishedEvent(); + break; + default: + Assert.fail("Unhandled event type " + eventType); + } + if (event == null || !event.isHistoryEvent()) { + continue; + } + HistoryEventTimelineConversion.convertToTimelineEntity(event); + } + } + + static class MockVersionInfo extends VersionInfo { + + MockVersionInfo() { + super("component", "1.1.0", "rev1", "20120101", "git.apache.org"); + } + + } + + @Test(timeout = 5000) + public void testConvertAppLaunchedEvent() { + long launchTime = random.nextLong(); + long submitTime = random.nextLong(); + Configuration conf = new Configuration(false); + conf.set("foo", "bar"); + conf.set("applicationId", "1234"); + + MockVersionInfo mockVersionInfo = new MockVersionInfo(); + AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime, + submitTime, user, conf, mockVersionInfo); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(EntityTypes.TEZ_APPLICATION.name(), timelineEntity.getEntityType()); + Assert.assertEquals("tez_" + applicationId.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(2, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user)); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + + Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + + Assert.assertEquals(2, timelineEntity.getOtherInfo().size()); + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.CONFIG)); + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.TEZ_VERSION)); + + Map<String, String> config = + (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.CONFIG); + Assert.assertEquals(conf.get("foo"), config.get("foo")); + Assert.assertEquals(conf.get("applicationId"), config.get("applicationId")); + + Map<String, String> versionInfo = + (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.TEZ_VERSION); + Assert.assertEquals(mockVersionInfo.getVersion(), + versionInfo.get(ATSConstants.VERSION)); + Assert.assertEquals(mockVersionInfo.getRevision(), + versionInfo.get(ATSConstants.REVISION)); + Assert.assertEquals(mockVersionInfo.getBuildTime(), + versionInfo.get(ATSConstants.BUILD_TIME)); + + } + + @Test(timeout = 5000) + public void testConvertAMLaunchedEvent() { + long launchTime = random.nextLong(); + long submitTime = random.nextLong(); + AMLaunchedEvent event = new AMLaunchedEvent(applicationAttemptId, launchTime, submitTime, user); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType()); + + final Map<String, Set<String>> relatedEntities = timelineEntity.getRelatedEntities(); + Assert.assertEquals(3, relatedEntities.size()); + Assert.assertTrue(relatedEntities.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(relatedEntities.get(ATSConstants.APPLICATION_ATTEMPT_ID) + .contains(applicationAttemptId.toString())); + Assert.assertTrue(relatedEntities.get(ATSConstants.USER).contains(user)); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(2, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.USER).contains(user)); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + + Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.AM_LAUNCHED.name(), evt.getEventType()); + Assert.assertEquals(launchTime, evt.getTimestamp()); + + final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); + Assert.assertEquals(1, otherInfo.size()); + Assert.assertEquals(submitTime, otherInfo.get(ATSConstants.APP_SUBMIT_TIME)); + } + + @Test(timeout = 5000) + public void testConvertContainerLaunchedEvent() { + long launchTime = random.nextLong(); + ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime, + applicationAttemptId); + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(2, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains( + containerId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains( + "tez_" + applicationAttemptId.toString())); + + Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationAttemptId.getApplicationId().toString())); + + Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + Assert.assertEquals(HistoryEventType.CONTAINER_LAUNCHED.name(), + timelineEntity.getEvents().get(0).getEventType()); + Assert.assertEquals(launchTime, + timelineEntity.getEvents().get(0).getTimestamp()); + } + + @Test(timeout = 5000) + public void testConvertContainerStoppedEvent() { + long stopTime = random.nextLong(); + int exitStatus = random.nextInt(); + ContainerStoppedEvent event = new ContainerStoppedEvent(containerId, stopTime, exitStatus, + applicationAttemptId); + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType()); + + final Map<String, Set<String>> relatedEntities = timelineEntity.getRelatedEntities(); + Assert.assertEquals(1, relatedEntities.size()); + Assert.assertTrue(relatedEntities.get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()) + .contains("tez_" + applicationAttemptId.toString())); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(2, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(primaryFilters.get(ATSConstants.EXIT_STATUS).contains(exitStatus)); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + final TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.CONTAINER_STOPPED.name(), evt.getEventType()); + Assert.assertEquals(stopTime, evt.getTimestamp()); + + final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); + Assert.assertEquals(2, otherInfo.size()); + Assert.assertEquals(exitStatus, otherInfo.get(ATSConstants.EXIT_STATUS)); + Assert.assertEquals(stopTime, otherInfo.get(ATSConstants.FINISH_TIME)); + } + + @Test(timeout = 5000) + public void testConvertDAGStartedEvent() { + long startTime = random.nextLong(); + String dagName = "testDagName"; + DAGStartedEvent event = new DAGStartedEvent(tezDAGID, startTime, user, dagName); + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_STARTED.name(), evt.getEventType()); + Assert.assertEquals(startTime, evt.getTimestamp()); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(3, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.USER).contains(user)); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(primaryFilters.get(ATSConstants.DAG_NAME).contains(dagName)); + + final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); + Assert.assertEquals(2, otherInfo.size()); + Assert.assertEquals(startTime, otherInfo.get(ATSConstants.START_TIME)); + Assert.assertEquals(DAGState.RUNNING.name(), otherInfo.get(ATSConstants.STATUS)); + } + @Test(timeout = 5000) + public void testConvertDAGSubmittedEvent() { + long submitTime = random.nextLong(); + + DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan, + applicationAttemptId, null, user, null); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(5, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION.name()).contains( + "tez_" + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains( + "tez_" + applicationAttemptId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ATTEMPT_ID).contains( + applicationAttemptId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains( + applicationAttemptId.getApplicationId().toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user)); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), timelineEvent.getEventType()); + Assert.assertEquals(submitTime, timelineEvent.getTimestamp()); + + Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains( + dagPlan.getName())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationAttemptId.getApplicationId().toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN)); + Assert.assertEquals(applicationId.toString(), + timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); + + } + + @Test(timeout = 5000) + public void testConvertTaskAttemptFinishedEvent(){ + String vertexName = "testVertex"; + long startTime = random.nextLong(); + long finishTime = startTime + 1234; + TaskAttemptState state = TaskAttemptState + .values()[random.nextInt(TaskAttemptState.values().length)]; + TaskAttemptTerminationCause error = TaskAttemptTerminationCause + .values()[random.nextInt(TaskAttemptTerminationCause.values().length)]; + String diagnostics = "random diagnostics message"; + TezCounters counters = new TezCounters(); + + TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, + startTime, finishTime, state, error, diagnostics, counters); + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(5, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name()) + .contains(tezDAGID.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_VERTEX_ID.name()) + .contains(tezVertexID.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_TASK_ID.name()) + .contains(tezTaskID.toString())); + Assert.assertTrue(primaryFilters.get(ATSConstants.STATUS).contains(state.toString())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_FINISHED.name(), evt.getEventType()); + Assert.assertEquals(finishTime, evt.getTimestamp()); + + final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); + Assert.assertEquals(6, otherInfo.size()); + Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME)); + Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN)); + Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS)); + Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); + Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS)); + Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS)); + } + + @Test(timeout = 5000) + public void testConvertDAGInitializedEvent() { + long initTime = random.nextLong(); + + Map<String,TezVertexID> nameIdMap = new HashMap<String, TezVertexID>(); + nameIdMap.put("foo", tezVertexID); + + DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName", + nameIdMap); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(0, timelineEntity.getRelatedEntities().size()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_INITIALIZED.name(), timelineEvent.getEventType()); + Assert.assertEquals(initTime, timelineEvent.getTimestamp()); + + Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("dagName")); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey( + ATSConstants.VERTEX_NAME_ID_MAPPING)); + Map<String, String> vIdMap = (Map<String, String>) timelineEntity.getOtherInfo().get( + ATSConstants.VERTEX_NAME_ID_MAPPING); + Assert.assertEquals(1, vIdMap.size()); + Assert.assertNotNull(vIdMap.containsKey("foo")); + Assert.assertEquals(tezVertexID.toString(), vIdMap.get("foo")); + + } + + @Test(timeout = 5000) + public void testConvertDAGFinishedEvent() { + long finishTime = random.nextLong(); + long startTime = random.nextLong(); + Map<String,Integer> taskStats = new HashMap<String, Integer>(); + taskStats.put("FOO", 100); + taskStats.put("BAR", 200); + + DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR, + "diagnostics", null, user, dagPlan.getName(), taskStats); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(0, timelineEntity.getRelatedEntities().size()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType()); + Assert.assertEquals(finishTime, timelineEvent.getTimestamp()); + + Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(dagPlan.getName())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains( + DAGState.ERROR.name())); + + Assert.assertEquals(startTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue()); + Assert.assertEquals(finishTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue()); + Assert.assertEquals(finishTime - startTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue()); + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS)); + Assert.assertEquals(DAGState.ERROR.name(), + timelineEntity.getOtherInfo().get(ATSConstants.STATUS)); + Assert.assertEquals("diagnostics", + timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS)); + + Assert.assertEquals(100, + ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue()); + Assert.assertEquals(200, + ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue()); + } + + @Test(timeout = 5000) + public void testConvertVertexInitializedEvent() { + long initRequestedTime = random.nextLong(); + long initedTime = random.nextLong(); + int numTasks = random.nextInt(); + VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime, + initedTime, numTasks, "proc", null); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(initedTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(1, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_DAG_ID.name()).contains( + tezDAGID.toString())); + + Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( + tezDAGID.toString())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.VERTEX_INITIALIZED.name(), timelineEvent.getEventType()); + Assert.assertEquals(initedTime, timelineEvent.getTimestamp()); + + Assert.assertEquals("v1", timelineEntity.getOtherInfo().get(ATSConstants.VERTEX_NAME)); + Assert.assertEquals("proc", timelineEntity.getOtherInfo().get(ATSConstants.PROCESSOR_CLASS_NAME)); + + Assert.assertEquals(initedTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue()); + Assert.assertEquals(initRequestedTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_REQUESTED_TIME)).longValue()); + Assert.assertEquals(initedTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue()); + Assert.assertEquals(numTasks, + ((Integer)timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue()); + } + + @Test(timeout = 5000) + public void testConvertVertexFinishedEvent() { + long initRequestedTime = random.nextLong(); + long initedTime = random.nextLong(); + long startRequestedTime = random.nextLong(); + long startTime = random.nextLong(); + long finishTime = random.nextLong(); + Map<String,Integer> taskStats = new HashMap<String, Integer>(); + taskStats.put("FOO", 100); + taskStats.put("BAR", 200); + VertexStats vertexStats = new VertexStats(); + + VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1,initRequestedTime, + initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR, + "diagnostics", null, vertexStats, taskStats); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(0, timelineEntity.getRelatedEntities().size()); + + Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( + tezDAGID.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains( + VertexState.ERROR.name())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.VERTEX_FINISHED.name(), timelineEvent.getEventType()); + Assert.assertEquals(finishTime, timelineEvent.getTimestamp()); + + Assert.assertEquals(finishTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue()); + Assert.assertEquals(finishTime - startTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue()); + Assert.assertEquals(VertexState.ERROR.name(), + timelineEntity.getOtherInfo().get(ATSConstants.STATUS)); + Assert.assertEquals("diagnostics", + timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS)); + + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS)); + + Assert.assertEquals(100, + ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue()); + Assert.assertEquals(200, + ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue()); + } + + @Test(timeout = 5000) + public void testConvertTaskStartedEvent() { + long scheduleTime = random.nextLong(); + long startTime = random.nextLong(); + TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", scheduleTime, startTime); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(1, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_VERTEX_ID.name()).contains( + tezVertexID.toString())); + + Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( + tezDAGID.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains( + tezVertexID.toString())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.TASK_STARTED.name(), timelineEvent.getEventType()); + Assert.assertEquals(startTime, timelineEvent.getTimestamp()); + + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.SCHEDULED_TIME)); + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME)); + + Assert.assertEquals(scheduleTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)).longValue()); + Assert.assertEquals(startTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue()); + Assert.assertTrue(TaskState.SCHEDULED.name() + .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS))); + } + + @Test(timeout = 5000) + public void testConvertTaskAttemptStartedEvent() { + long startTime = random.nextLong(); + TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", + startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress"); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(3, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.NODE_ID).contains(nodeId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains( + containerId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_TASK_ID.name()).contains( + tezTaskID.toString())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_STARTED.name(), timelineEvent.getEventType()); + Assert.assertEquals(startTime, timelineEvent.getTimestamp()); + + Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( + tezDAGID.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains( + tezVertexID.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_TASK_ID.name()).contains( + tezTaskID.toString())); + + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME)); + Assert.assertEquals("inProgressURL", + timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL)); + Assert.assertEquals("logsURL", + timelineEntity.getOtherInfo().get(ATSConstants.COMPLETED_LOGS_URL)); + Assert.assertEquals(nodeId.toString(), + timelineEntity.getOtherInfo().get(ATSConstants.NODE_ID)); + Assert.assertEquals(containerId.toString(), + timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID)); + Assert.assertEquals("nodeHttpAddress", + timelineEntity.getOtherInfo().get(ATSConstants.NODE_HTTP_ADDRESS)); + Assert.assertTrue(TaskAttemptState.RUNNING.name() + .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS))); + } + + @Test(timeout = 5000) + public void testConvertTaskFinishedEvent() { + String vertexName = "testVertexName"; + long startTime = random.nextLong(); + long finishTime = random.nextLong(); + TaskState state = TaskState.values()[random.nextInt(TaskState.values().length)]; + String diagnostics = "diagnostics message"; + TezCounters counters = new TezCounters(); + + TaskFinishedEvent event = new TaskFinishedEvent(tezTaskID, vertexName, startTime, finishTime, + tezTaskAttemptID, state, diagnostics, counters); + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType()); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(4, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name()) + .contains(tezDAGID.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_VERTEX_ID.name()) + .contains(tezVertexID.toString())); + Assert.assertTrue(primaryFilters.get(ATSConstants.STATUS).contains(state.name())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.TASK_FINISHED.name(), evt.getEventType()); + Assert.assertEquals(finishTime, evt.getTimestamp()); + + final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); + Assert.assertEquals(6, otherInfo.size()); + Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME)); + Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN)); + Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS)); + Assert.assertEquals(tezTaskAttemptID.toString(), + otherInfo.get(ATSConstants.SUCCESSFUL_ATTEMPT_ID)); + Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS)); + Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS)); + } + + @Test(timeout = 5000) + public void testConvertVertexParallelismUpdatedEvent() { + TezVertexID vId = tezVertexID; + Map<String, EdgeManagerPluginDescriptor> edgeMgrs = + new HashMap<String, EdgeManagerPluginDescriptor>(); + edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text")); + VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null, + edgeMgrs, null, 10); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType()); + Assert.assertEquals(vId.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(1, timelineEntity.getEvents().size()); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(2, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name()) + .contains(tezDAGID.toString())); + + TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(), evt.getEventType()); + Assert.assertEquals(1, evt.getEventInfo().get(ATSConstants.NUM_TASKS)); + Assert.assertEquals(10, evt.getEventInfo().get(ATSConstants.OLD_NUM_TASKS)); + Assert.assertNotNull(evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS)); + + Map<String, Object> updatedEdgeMgrs = (Map<String, Object>) + evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS); + Assert.assertEquals(1, updatedEdgeMgrs.size()); + Assert.assertTrue(updatedEdgeMgrs.containsKey("a")); + Map<String, Object> updatedEdgeMgr = (Map<String, Object>) updatedEdgeMgrs.get("a"); + + Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY)); + + Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)); + + } + + +} http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests deleted file mode 120000 index 784b54a..0000000 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests +++ /dev/null @@ -1 +0,0 @@ -../../../../../../../tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests \ No newline at end of file diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java new file mode 100644 index 0000000..d48948b --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java @@ -0,0 +1,253 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.tests; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapred.ShuffleHandler; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.JarFinder; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.DAGAppMaster; +import org.apache.tez.mapreduce.hadoop.MRConfig; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; + +/** + * Configures and starts the Tez-specific components in the YARN cluster. + * + * When using this mini cluster, the user is expected to + */ +public class MiniTezClusterWithTimeline extends MiniYARNCluster { + + public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class); + + private static final Log LOG = LogFactory.getLog(MiniTezClusterWithTimeline.class); + + private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml"; + + private Path confFilePath; + + public MiniTezClusterWithTimeline(String testName) { + this(testName, 1); + } + + public MiniTezClusterWithTimeline(String testName, int noOfNMs) { + super(testName, noOfNMs, 4, 4); + } + + public MiniTezClusterWithTimeline(String testName, int noOfNMs, + int numLocalDirs, int numLogDirs) { + super(testName, noOfNMs, numLocalDirs, numLogDirs); + } + + public MiniTezClusterWithTimeline(String testName, int noOfNMs, + int numLocalDirs, int numLogDirs, boolean enableAHS) { + super(testName, 1, noOfNMs, numLocalDirs, numLogDirs, enableAHS); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME); + // Use libs from cluster since no build is available + conf.setBoolean(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, true); + // blacklisting disabled to prevent scheduling issues + conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); + if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) { + conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), + "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath()); + } + + if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) { + // nothing defined. set quick delete value + conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l); + } + + File appJarLocalFile = new File(MiniTezClusterWithTimeline.APPJAR); + + if (!appJarLocalFile.exists()) { + String message = "TezAppJar " + MiniTezClusterWithTimeline.APPJAR + + " not found. Exiting."; + LOG.info(message); + throw new TezUncheckedException(message); + } else { + LOG.info("Using Tez AppJar: " + appJarLocalFile.getAbsolutePath()); + } + + FileSystem fs = FileSystem.get(conf); + Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir")); + Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar"); + // Copy AppJar and make it public. + Path appMasterJar = new Path(MiniTezClusterWithTimeline.APPJAR); + fs.copyFromLocalFile(appMasterJar, appRemoteJar); + fs.setPermission(appRemoteJar, new FsPermission("777")); + + conf.set(TezConfiguration.TEZ_LIB_URIS, appRemoteJar.toUri().toString()); + LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS)); + + // VMEM monitoring disabled, PMEM monitoring enabled. + conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false); + conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false); + + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); + + try { + Path stagingPath = FileContext.getFileContext(conf).makeQualified( + new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR))); + /* + * Re-configure the staging path on Windows if the file system is localFs. + * We need to use a absolute path that contains the drive letter. The unit + * test could run on a different drive than the AM. We can run into the + * issue that job files are localized to the drive where the test runs on, + * while the AM starts on a different drive and fails to find the job + * metafiles. Using absolute path can avoid this ambiguity. + */ + if (Path.WINDOWS) { + if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) { + conf.set(MRJobConfig.MR_AM_STAGING_DIR, + new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR)) + .getAbsolutePath()); + } + } + FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf); + if (fc.util().exists(stagingPath)) { + LOG.info(stagingPath + " exists! deleting..."); + fc.delete(stagingPath, true); + } + LOG.info("mkdir: " + stagingPath); + fc.mkdir(stagingPath, null, true); + + //mkdir done directory as well + String doneDir = + JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); + Path doneDirPath = fc.makeQualified(new Path(doneDir)); + fc.mkdir(doneDirPath, null, true); + } catch (IOException e) { + throw new TezUncheckedException("Could not create staging directory. ", e); + } + conf.set(MRConfig.MASTER_ADDRESS, "test"); + + //configure the shuffle service in NM + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID }); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, + ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, + Service.class); + + // Non-standard shuffle port + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + + conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, + DefaultContainerExecutor.class, ContainerExecutor.class); + + // TestMRJobs is for testing non-uberized operation only; see TestUberAM + // for corresponding uberized tests. + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + LOG.info("Starting MiniTezClusterWithTimeline"); + super.serviceStart(); + File workDir = super.getTestWorkDir(); + Configuration conf = super.getConfig(); + + confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG); + File confFile = new File(confFilePath.toString()); + try { + confFile.createNewFile(); + conf.writeXml(new FileOutputStream(confFile)); + confFile.deleteOnExit(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + throw new RuntimeException(e); + } + confFilePath = new Path(confFile.getAbsolutePath()); + conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + workDir.getAbsolutePath(), System.getProperty("java.class.path")); + LOG.info("Setting yarn-site.xml via YARN-APP-CP at: " + + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)); + } + + @Override + protected void serviceStop() throws Exception { + waitForAppsToFinish(); + super.serviceStop(); + } + + private void waitForAppsToFinish() { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(getConfig()); + yarnClient.start(); + try { + while(true) { + List<ApplicationReport> appReports = yarnClient.getApplications(); + Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>(){ + @Override + public boolean apply(ApplicationReport appReport) { + return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, + YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) + .contains(appReport.getYarnApplicationState()); + } + }); + if (unCompletedApps.size()==0){ + break; + } + LOG.info("wait for applications to finish in MiniTezClusterWithTimeline"); + Thread.sleep(1000); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + yarnClient.stop(); + } + } + + public Path getConfigFilePath() { + return confFilePath; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java new file mode 100644 index 0000000..d48948b --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java @@ -0,0 +1,253 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.tests; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapred.ShuffleHandler; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.JarFinder; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.DAGAppMaster; +import org.apache.tez.mapreduce.hadoop.MRConfig; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; + +/** + * Configures and starts the Tez-specific components in the YARN cluster. + * + * When using this mini cluster, the user is expected to + */ +public class MiniTezClusterWithTimeline extends MiniYARNCluster { + + public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class); + + private static final Log LOG = LogFactory.getLog(MiniTezClusterWithTimeline.class); + + private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml"; + + private Path confFilePath; + + public MiniTezClusterWithTimeline(String testName) { + this(testName, 1); + } + + public MiniTezClusterWithTimeline(String testName, int noOfNMs) { + super(testName, noOfNMs, 4, 4); + } + + public MiniTezClusterWithTimeline(String testName, int noOfNMs, + int numLocalDirs, int numLogDirs) { + super(testName, noOfNMs, numLocalDirs, numLogDirs); + } + + public MiniTezClusterWithTimeline(String testName, int noOfNMs, + int numLocalDirs, int numLogDirs, boolean enableAHS) { + super(testName, 1, noOfNMs, numLocalDirs, numLogDirs, enableAHS); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME); + // Use libs from cluster since no build is available + conf.setBoolean(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, true); + // blacklisting disabled to prevent scheduling issues + conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); + if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) { + conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), + "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath()); + } + + if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) { + // nothing defined. set quick delete value + conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l); + } + + File appJarLocalFile = new File(MiniTezClusterWithTimeline.APPJAR); + + if (!appJarLocalFile.exists()) { + String message = "TezAppJar " + MiniTezClusterWithTimeline.APPJAR + + " not found. Exiting."; + LOG.info(message); + throw new TezUncheckedException(message); + } else { + LOG.info("Using Tez AppJar: " + appJarLocalFile.getAbsolutePath()); + } + + FileSystem fs = FileSystem.get(conf); + Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir")); + Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar"); + // Copy AppJar and make it public. + Path appMasterJar = new Path(MiniTezClusterWithTimeline.APPJAR); + fs.copyFromLocalFile(appMasterJar, appRemoteJar); + fs.setPermission(appRemoteJar, new FsPermission("777")); + + conf.set(TezConfiguration.TEZ_LIB_URIS, appRemoteJar.toUri().toString()); + LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS)); + + // VMEM monitoring disabled, PMEM monitoring enabled. + conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false); + conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false); + + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); + + try { + Path stagingPath = FileContext.getFileContext(conf).makeQualified( + new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR))); + /* + * Re-configure the staging path on Windows if the file system is localFs. + * We need to use a absolute path that contains the drive letter. The unit + * test could run on a different drive than the AM. We can run into the + * issue that job files are localized to the drive where the test runs on, + * while the AM starts on a different drive and fails to find the job + * metafiles. Using absolute path can avoid this ambiguity. + */ + if (Path.WINDOWS) { + if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) { + conf.set(MRJobConfig.MR_AM_STAGING_DIR, + new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR)) + .getAbsolutePath()); + } + } + FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf); + if (fc.util().exists(stagingPath)) { + LOG.info(stagingPath + " exists! deleting..."); + fc.delete(stagingPath, true); + } + LOG.info("mkdir: " + stagingPath); + fc.mkdir(stagingPath, null, true); + + //mkdir done directory as well + String doneDir = + JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); + Path doneDirPath = fc.makeQualified(new Path(doneDir)); + fc.mkdir(doneDirPath, null, true); + } catch (IOException e) { + throw new TezUncheckedException("Could not create staging directory. ", e); + } + conf.set(MRConfig.MASTER_ADDRESS, "test"); + + //configure the shuffle service in NM + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID }); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, + ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, + Service.class); + + // Non-standard shuffle port + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + + conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, + DefaultContainerExecutor.class, ContainerExecutor.class); + + // TestMRJobs is for testing non-uberized operation only; see TestUberAM + // for corresponding uberized tests. + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + LOG.info("Starting MiniTezClusterWithTimeline"); + super.serviceStart(); + File workDir = super.getTestWorkDir(); + Configuration conf = super.getConfig(); + + confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG); + File confFile = new File(confFilePath.toString()); + try { + confFile.createNewFile(); + conf.writeXml(new FileOutputStream(confFile)); + confFile.deleteOnExit(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + throw new RuntimeException(e); + } + confFilePath = new Path(confFile.getAbsolutePath()); + conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + workDir.getAbsolutePath(), System.getProperty("java.class.path")); + LOG.info("Setting yarn-site.xml via YARN-APP-CP at: " + + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)); + } + + @Override + protected void serviceStop() throws Exception { + waitForAppsToFinish(); + super.serviceStop(); + } + + private void waitForAppsToFinish() { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(getConfig()); + yarnClient.start(); + try { + while(true) { + List<ApplicationReport> appReports = yarnClient.getApplications(); + Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>(){ + @Override + public boolean apply(ApplicationReport appReport) { + return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, + YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) + .contains(appReport.getYarnApplicationState()); + } + }); + if (unCompletedApps.size()==0){ + break; + } + LOG.info("wait for applications to finish in MiniTezClusterWithTimeline"); + Thread.sleep(1000); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + yarnClient.stop(); + } + } + + public Path getConfigFilePath() { + return confFilePath; + } + +}
