http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java deleted file mode 100644 index 6d713c5..0000000 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ /dev/null @@ -1,850 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.logging.ats; - -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.tez.common.ATSConstants; -import org.apache.tez.common.VersionInfo; -import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; -import org.apache.tez.dag.api.oldrecords.TaskAttemptState; -import org.apache.tez.dag.api.oldrecords.TaskState; -import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; -import org.apache.tez.dag.app.dag.DAGState; -import org.apache.tez.dag.app.dag.VertexState; -import org.apache.tez.dag.app.dag.impl.VertexStats; -import org.apache.tez.dag.history.HistoryEvent; -import org.apache.tez.dag.history.HistoryEventType; -import org.apache.tez.dag.history.events.AMLaunchedEvent; -import org.apache.tez.dag.history.events.AMStartedEvent; -import org.apache.tez.dag.history.events.AppLaunchedEvent; -import org.apache.tez.dag.history.events.ContainerLaunchedEvent; -import org.apache.tez.dag.history.events.ContainerStoppedEvent; -import org.apache.tez.dag.history.events.DAGCommitStartedEvent; -import org.apache.tez.dag.history.events.DAGFinishedEvent; -import org.apache.tez.dag.history.events.DAGInitializedEvent; -import org.apache.tez.dag.history.events.DAGStartedEvent; -import org.apache.tez.dag.history.events.DAGSubmittedEvent; -import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; -import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; -import org.apache.tez.dag.history.events.TaskFinishedEvent; -import org.apache.tez.dag.history.events.TaskStartedEvent; -import org.apache.tez.dag.history.events.VertexCommitStartedEvent; -import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; -import org.apache.tez.dag.history.events.VertexFinishedEvent; -import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; -import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; -import org.apache.tez.dag.history.events.VertexInitializedEvent; -import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; -import org.apache.tez.dag.history.events.VertexStartedEvent; -import org.apache.tez.dag.history.logging.EntityTypes; -import org.apache.tez.dag.history.utils.DAGUtils; -import org.apache.tez.dag.records.TaskAttemptTerminationCause; -import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.codehaus.jettison.json.JSONException; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class TestHistoryEventTimelineConversion { - - private ApplicationAttemptId applicationAttemptId; - private ApplicationId applicationId; - private String user = "user"; - private Random random = new Random(); - private TezDAGID tezDAGID; - private TezVertexID tezVertexID; - private TezTaskID tezTaskID; - private TezTaskAttemptID tezTaskAttemptID; - private DAGPlan dagPlan; - private ContainerId containerId; - private NodeId nodeId; - - @Before - public void setup() { - applicationId = ApplicationId.newInstance(9999l, 1); - applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1); - tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt()); - tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt()); - tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt()); - tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt()); - dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); - containerId = ContainerId.newInstance(applicationAttemptId, 111); - nodeId = NodeId.newInstance("node", 13435); - } - - @Test(timeout = 5000) - public void testHandlerExists() throws JSONException { - for (HistoryEventType eventType : HistoryEventType.values()) { - HistoryEvent event = null; - switch (eventType) { - case APP_LAUNCHED: - event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(), - user, new Configuration(false), null); - break; - case AM_LAUNCHED: - event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(), - user); - break; - case AM_STARTED: - event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user); - break; - case DAG_SUBMITTED: - event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId, - null, user, null); - break; - case DAG_INITIALIZED: - event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null); - break; - case DAG_STARTED: - event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName()); - break; - case DAG_FINISHED: - event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR, - null, null, user, dagPlan.getName(), null); - break; - case VERTEX_INITIALIZED: - event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(), - random.nextInt(), "proc", null); - break; - case VERTEX_STARTED: - event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt()); - break; - case VERTEX_PARALLELISM_UPDATED: - event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 1); - break; - case VERTEX_FINISHED: - event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(), - random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR, - null, null, null, null); - break; - case TASK_STARTED: - event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt()); - break; - case TASK_FINISHED: - event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(), - tezTaskAttemptID, TaskState.FAILED, null, null); - break; - case TASK_ATTEMPT_STARTED: - event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId, - nodeId, null, null, "nodeHttpAddress"); - break; - case TASK_ATTEMPT_FINISHED: - event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null); - break; - case CONTAINER_LAUNCHED: - event = new ContainerLaunchedEvent(containerId, random.nextInt(), - applicationAttemptId); - break; - case CONTAINER_STOPPED: - event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId); - break; - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: - event = new VertexRecoverableEventsGeneratedEvent(); - break; - case DAG_COMMIT_STARTED: - event = new DAGCommitStartedEvent(); - break; - case VERTEX_COMMIT_STARTED: - event = new VertexCommitStartedEvent(); - break; - case VERTEX_GROUP_COMMIT_STARTED: - event = new VertexGroupCommitStartedEvent(); - break; - case VERTEX_GROUP_COMMIT_FINISHED: - event = new VertexGroupCommitFinishedEvent(); - break; - default: - Assert.fail("Unhandled event type " + eventType); - } - if (event == null || !event.isHistoryEvent()) { - continue; - } - HistoryEventTimelineConversion.convertToTimelineEntity(event); - } - } - - static class MockVersionInfo extends VersionInfo { - - MockVersionInfo() { - super("component", "1.1.0", "rev1", "20120101", "git.apache.org"); - } - - } - - @Test(timeout = 5000) - public void testConvertAppLaunchedEvent() { - long launchTime = random.nextLong(); - long submitTime = random.nextLong(); - Configuration conf = new Configuration(false); - conf.set("foo", "bar"); - conf.set("applicationId", "1234"); - - MockVersionInfo mockVersionInfo = new MockVersionInfo(); - AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime, - submitTime, user, conf, mockVersionInfo); - - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - - Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue()); - - Assert.assertEquals(EntityTypes.TEZ_APPLICATION.name(), timelineEntity.getEntityType()); - Assert.assertEquals("tez_" + applicationId.toString(), timelineEntity.getEntityId()); - - Assert.assertEquals(2, timelineEntity.getRelatedEntities().size()); - Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user)); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains( - applicationId.toString())); - - Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size()); - Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); - - Assert.assertEquals(2, timelineEntity.getOtherInfo().size()); - Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.CONFIG)); - Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.TEZ_VERSION)); - - Map<String, String> config = - (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.CONFIG); - Assert.assertEquals(conf.get("foo"), config.get("foo")); - Assert.assertEquals(conf.get("applicationId"), config.get("applicationId")); - - Map<String, String> versionInfo = - (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.TEZ_VERSION); - Assert.assertEquals(mockVersionInfo.getVersion(), - versionInfo.get(ATSConstants.VERSION)); - Assert.assertEquals(mockVersionInfo.getRevision(), - versionInfo.get(ATSConstants.REVISION)); - Assert.assertEquals(mockVersionInfo.getBuildTime(), - versionInfo.get(ATSConstants.BUILD_TIME)); - - } - - @Test(timeout = 5000) - public void testConvertAMLaunchedEvent() { - long launchTime = random.nextLong(); - long submitTime = random.nextLong(); - AMLaunchedEvent event = new AMLaunchedEvent(applicationAttemptId, launchTime, submitTime, user); - - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - - Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId()); - Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType()); - - final Map<String, Set<String>> relatedEntities = timelineEntity.getRelatedEntities(); - Assert.assertEquals(3, relatedEntities.size()); - Assert.assertTrue(relatedEntities.get(ATSConstants.APPLICATION_ID) - .contains(applicationId.toString())); - Assert.assertTrue(relatedEntities.get(ATSConstants.APPLICATION_ATTEMPT_ID) - .contains(applicationAttemptId.toString())); - Assert.assertTrue(relatedEntities.get(ATSConstants.USER).contains(user)); - - final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); - Assert.assertEquals(2, primaryFilters.size()); - Assert.assertTrue(primaryFilters.get(ATSConstants.USER).contains(user)); - Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) - .contains(applicationId.toString())); - - Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue()); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - TimelineEvent evt = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.AM_LAUNCHED.name(), evt.getEventType()); - Assert.assertEquals(launchTime, evt.getTimestamp()); - - final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); - Assert.assertEquals(1, otherInfo.size()); - Assert.assertEquals(submitTime, otherInfo.get(ATSConstants.APP_SUBMIT_TIME)); - } - - @Test(timeout = 5000) - public void testConvertContainerLaunchedEvent() { - long launchTime = random.nextLong(); - ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime, - applicationAttemptId); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - - Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType()); - Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId()); - - Assert.assertEquals(2, timelineEntity.getRelatedEntities().size()); - Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains( - containerId.toString())); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains( - "tez_" + applicationAttemptId.toString())); - - Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size()); - Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( - applicationAttemptId.getApplicationId().toString())); - - Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue()); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - Assert.assertEquals(HistoryEventType.CONTAINER_LAUNCHED.name(), - timelineEntity.getEvents().get(0).getEventType()); - Assert.assertEquals(launchTime, - timelineEntity.getEvents().get(0).getTimestamp()); - } - - @Test(timeout = 5000) - public void testConvertContainerStoppedEvent() { - long stopTime = random.nextLong(); - int exitStatus = random.nextInt(); - ContainerStoppedEvent event = new ContainerStoppedEvent(containerId, stopTime, exitStatus, - applicationAttemptId); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - - Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId()); - Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType()); - - final Map<String, Set<String>> relatedEntities = timelineEntity.getRelatedEntities(); - Assert.assertEquals(1, relatedEntities.size()); - Assert.assertTrue(relatedEntities.get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()) - .contains("tez_" + applicationAttemptId.toString())); - - final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); - Assert.assertEquals(2, primaryFilters.size()); - Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) - .contains(applicationId.toString())); - Assert.assertTrue(primaryFilters.get(ATSConstants.EXIT_STATUS).contains(exitStatus)); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - final TimelineEvent evt = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.CONTAINER_STOPPED.name(), evt.getEventType()); - Assert.assertEquals(stopTime, evt.getTimestamp()); - - final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); - Assert.assertEquals(2, otherInfo.size()); - Assert.assertEquals(exitStatus, otherInfo.get(ATSConstants.EXIT_STATUS)); - Assert.assertEquals(stopTime, otherInfo.get(ATSConstants.FINISH_TIME)); - } - - @Test(timeout = 5000) - public void testConvertDAGStartedEvent() { - long startTime = random.nextLong(); - String dagName = "testDagName"; - DAGStartedEvent event = new DAGStartedEvent(tezDAGID, startTime, user, dagName); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - - Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); - Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - TimelineEvent evt = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.DAG_STARTED.name(), evt.getEventType()); - Assert.assertEquals(startTime, evt.getTimestamp()); - - final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); - Assert.assertEquals(3, primaryFilters.size()); - Assert.assertTrue(primaryFilters.get(ATSConstants.USER).contains(user)); - Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) - .contains(applicationId.toString())); - Assert.assertTrue(primaryFilters.get(ATSConstants.DAG_NAME).contains(dagName)); - - final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); - Assert.assertEquals(2, otherInfo.size()); - Assert.assertEquals(startTime, otherInfo.get(ATSConstants.START_TIME)); - Assert.assertEquals(DAGState.RUNNING.name(), otherInfo.get(ATSConstants.STATUS)); - } - @Test(timeout = 5000) - public void testConvertDAGSubmittedEvent() { - long submitTime = random.nextLong(); - - DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan, - applicationAttemptId, null, user, null); - - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); - Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); - - Assert.assertEquals(5, timelineEntity.getRelatedEntities().size()); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION.name()).contains( - "tez_" + applicationId.toString())); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains( - "tez_" + applicationAttemptId.toString())); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ATTEMPT_ID).contains( - applicationAttemptId.toString())); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains( - applicationAttemptId.getApplicationId().toString())); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user)); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), timelineEvent.getEventType()); - Assert.assertEquals(submitTime, timelineEvent.getTimestamp()); - - Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue()); - - Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); - - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains( - dagPlan.getName())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( - applicationAttemptId.getApplicationId().toString())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); - - Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN)); - Assert.assertEquals(applicationId.toString(), - timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); - - } - - @Test(timeout = 5000) - public void testConvertTaskAttemptFinishedEvent(){ - String vertexName = "testVertex"; - long startTime = random.nextLong(); - long finishTime = startTime + 1234; - TaskAttemptState state = TaskAttemptState - .values()[random.nextInt(TaskAttemptState.values().length)]; - TaskAttemptTerminationCause error = TaskAttemptTerminationCause - .values()[random.nextInt(TaskAttemptTerminationCause.values().length)]; - String diagnostics = "random diagnostics message"; - TezCounters counters = new TezCounters(); - - TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, - startTime, finishTime, state, error, diagnostics, counters); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); - Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); - - final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); - Assert.assertEquals(5, primaryFilters.size()); - Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) - .contains(applicationId.toString())); - Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name()) - .contains(tezDAGID.toString())); - Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_VERTEX_ID.name()) - .contains(tezVertexID.toString())); - Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_TASK_ID.name()) - .contains(tezTaskID.toString())); - Assert.assertTrue(primaryFilters.get(ATSConstants.STATUS).contains(state.toString())); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - TimelineEvent evt = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_FINISHED.name(), evt.getEventType()); - Assert.assertEquals(finishTime, evt.getTimestamp()); - - final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); - Assert.assertEquals(6, otherInfo.size()); - Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME)); - Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN)); - Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS)); - Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); - Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS)); - Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS)); - } - - @Test(timeout = 5000) - public void testConvertDAGInitializedEvent() { - long initTime = random.nextLong(); - - Map<String,TezVertexID> nameIdMap = new HashMap<String, TezVertexID>(); - nameIdMap.put("foo", tezVertexID); - - DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName", - nameIdMap); - - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); - Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); - - Assert.assertEquals(0, timelineEntity.getRelatedEntities().size()); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.DAG_INITIALIZED.name(), timelineEvent.getEventType()); - Assert.assertEquals(initTime, timelineEvent.getTimestamp()); - - Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( - applicationId.toString())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("dagName")); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); - - Assert.assertTrue(timelineEntity.getOtherInfo().containsKey( - ATSConstants.VERTEX_NAME_ID_MAPPING)); - Map<String, String> vIdMap = (Map<String, String>) timelineEntity.getOtherInfo().get( - ATSConstants.VERTEX_NAME_ID_MAPPING); - Assert.assertEquals(1, vIdMap.size()); - Assert.assertNotNull(vIdMap.containsKey("foo")); - Assert.assertEquals(tezVertexID.toString(), vIdMap.get("foo")); - - } - - @Test(timeout = 5000) - public void testConvertDAGFinishedEvent() { - long finishTime = random.nextLong(); - long startTime = random.nextLong(); - Map<String,Integer> taskStats = new HashMap<String, Integer>(); - taskStats.put("FOO", 100); - taskStats.put("BAR", 200); - - DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR, - "diagnostics", null, user, dagPlan.getName(), taskStats); - - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); - Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); - - Assert.assertEquals(0, timelineEntity.getRelatedEntities().size()); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType()); - Assert.assertEquals(finishTime, timelineEvent.getTimestamp()); - - Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size()); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( - applicationId.toString())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(dagPlan.getName())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains( - DAGState.ERROR.name())); - - Assert.assertEquals(startTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue()); - Assert.assertEquals(finishTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue()); - Assert.assertEquals(finishTime - startTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue()); - Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS)); - Assert.assertEquals(DAGState.ERROR.name(), - timelineEntity.getOtherInfo().get(ATSConstants.STATUS)); - Assert.assertEquals("diagnostics", - timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS)); - - Assert.assertEquals(100, - ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue()); - Assert.assertEquals(200, - ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue()); - } - - @Test(timeout = 5000) - public void testConvertVertexInitializedEvent() { - long initRequestedTime = random.nextLong(); - long initedTime = random.nextLong(); - int numTasks = random.nextInt(); - VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime, - initedTime, numTasks, "proc", null); - - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); - Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId()); - - Assert.assertEquals(initedTime, timelineEntity.getStartTime().longValue()); - - Assert.assertEquals(1, timelineEntity.getRelatedEntities().size()); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_DAG_ID.name()).contains( - tezDAGID.toString())); - - Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size()); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( - applicationId.toString())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( - tezDAGID.toString())); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.VERTEX_INITIALIZED.name(), timelineEvent.getEventType()); - Assert.assertEquals(initedTime, timelineEvent.getTimestamp()); - - Assert.assertEquals("v1", timelineEntity.getOtherInfo().get(ATSConstants.VERTEX_NAME)); - Assert.assertEquals("proc", timelineEntity.getOtherInfo().get(ATSConstants.PROCESSOR_CLASS_NAME)); - - Assert.assertEquals(initedTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue()); - Assert.assertEquals(initRequestedTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_REQUESTED_TIME)).longValue()); - Assert.assertEquals(initedTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue()); - Assert.assertEquals(numTasks, - ((Integer)timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue()); - } - - @Test(timeout = 5000) - public void testConvertVertexFinishedEvent() { - long initRequestedTime = random.nextLong(); - long initedTime = random.nextLong(); - long startRequestedTime = random.nextLong(); - long startTime = random.nextLong(); - long finishTime = random.nextLong(); - Map<String,Integer> taskStats = new HashMap<String, Integer>(); - taskStats.put("FOO", 100); - taskStats.put("BAR", 200); - VertexStats vertexStats = new VertexStats(); - - VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1,initRequestedTime, - initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR, - "diagnostics", null, vertexStats, taskStats); - - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); - Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId()); - - Assert.assertEquals(0, timelineEntity.getRelatedEntities().size()); - - Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); - Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID) - .contains(applicationId.toString())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( - tezDAGID.toString())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains( - VertexState.ERROR.name())); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.VERTEX_FINISHED.name(), timelineEvent.getEventType()); - Assert.assertEquals(finishTime, timelineEvent.getTimestamp()); - - Assert.assertEquals(finishTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue()); - Assert.assertEquals(finishTime - startTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue()); - Assert.assertEquals(VertexState.ERROR.name(), - timelineEntity.getOtherInfo().get(ATSConstants.STATUS)); - Assert.assertEquals("diagnostics", - timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS)); - - Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS)); - - Assert.assertEquals(100, - ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue()); - Assert.assertEquals(200, - ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue()); - } - - @Test(timeout = 5000) - public void testConvertTaskStartedEvent() { - long scheduleTime = random.nextLong(); - long startTime = random.nextLong(); - TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", scheduleTime, startTime); - - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType()); - Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId()); - - Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue()); - - Assert.assertEquals(1, timelineEntity.getRelatedEntities().size()); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_VERTEX_ID.name()).contains( - tezVertexID.toString())); - - Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( - applicationId.toString())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( - tezDAGID.toString())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains( - tezVertexID.toString())); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.TASK_STARTED.name(), timelineEvent.getEventType()); - Assert.assertEquals(startTime, timelineEvent.getTimestamp()); - - Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.SCHEDULED_TIME)); - Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME)); - - Assert.assertEquals(scheduleTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)).longValue()); - Assert.assertEquals(startTime, - ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue()); - Assert.assertTrue(TaskState.SCHEDULED.name() - .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS))); - } - - @Test(timeout = 5000) - public void testConvertTaskAttemptStartedEvent() { - long startTime = random.nextLong(); - TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", - startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress"); - - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); - Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); - - Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue()); - - Assert.assertEquals(3, timelineEntity.getRelatedEntities().size()); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(ATSConstants.NODE_ID).contains(nodeId.toString())); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains( - containerId.toString())); - Assert.assertTrue( - timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_TASK_ID.name()).contains( - tezTaskID.toString())); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_STARTED.name(), timelineEvent.getEventType()); - Assert.assertEquals(startTime, timelineEvent.getTimestamp()); - - Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size()); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( - applicationId.toString())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( - tezDAGID.toString())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains( - tezVertexID.toString())); - Assert.assertTrue( - timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_TASK_ID.name()).contains( - tezTaskID.toString())); - - Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME)); - Assert.assertEquals("inProgressURL", - timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL)); - Assert.assertEquals("logsURL", - timelineEntity.getOtherInfo().get(ATSConstants.COMPLETED_LOGS_URL)); - Assert.assertEquals(nodeId.toString(), - timelineEntity.getOtherInfo().get(ATSConstants.NODE_ID)); - Assert.assertEquals(containerId.toString(), - timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID)); - Assert.assertEquals("nodeHttpAddress", - timelineEntity.getOtherInfo().get(ATSConstants.NODE_HTTP_ADDRESS)); - Assert.assertTrue(TaskAttemptState.RUNNING.name() - .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS))); - } - - @Test(timeout = 5000) - public void testConvertTaskFinishedEvent() { - String vertexName = "testVertexName"; - long startTime = random.nextLong(); - long finishTime = random.nextLong(); - TaskState state = TaskState.values()[random.nextInt(TaskState.values().length)]; - String diagnostics = "diagnostics message"; - TezCounters counters = new TezCounters(); - - TaskFinishedEvent event = new TaskFinishedEvent(tezTaskID, vertexName, startTime, finishTime, - tezTaskAttemptID, state, diagnostics, counters); - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - - Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId()); - Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType()); - - final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); - Assert.assertEquals(4, primaryFilters.size()); - Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) - .contains(applicationId.toString())); - Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name()) - .contains(tezDAGID.toString())); - Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_VERTEX_ID.name()) - .contains(tezVertexID.toString())); - Assert.assertTrue(primaryFilters.get(ATSConstants.STATUS).contains(state.name())); - - Assert.assertEquals(1, timelineEntity.getEvents().size()); - TimelineEvent evt = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.TASK_FINISHED.name(), evt.getEventType()); - Assert.assertEquals(finishTime, evt.getTimestamp()); - - final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); - Assert.assertEquals(6, otherInfo.size()); - Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME)); - Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN)); - Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS)); - Assert.assertEquals(tezTaskAttemptID.toString(), - otherInfo.get(ATSConstants.SUCCESSFUL_ATTEMPT_ID)); - Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS)); - Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS)); - } - - @Test(timeout = 5000) - public void testConvertVertexParallelismUpdatedEvent() { - TezVertexID vId = tezVertexID; - Map<String, EdgeManagerPluginDescriptor> edgeMgrs = - new HashMap<String, EdgeManagerPluginDescriptor>(); - edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text")); - VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null, - edgeMgrs, null, 10); - - TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); - Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType()); - Assert.assertEquals(vId.toString(), timelineEntity.getEntityId()); - Assert.assertEquals(1, timelineEntity.getEvents().size()); - - final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); - Assert.assertEquals(2, primaryFilters.size()); - Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) - .contains(applicationId.toString())); - Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name()) - .contains(tezDAGID.toString())); - - TimelineEvent evt = timelineEntity.getEvents().get(0); - Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(), evt.getEventType()); - Assert.assertEquals(1, evt.getEventInfo().get(ATSConstants.NUM_TASKS)); - Assert.assertEquals(10, evt.getEventInfo().get(ATSConstants.OLD_NUM_TASKS)); - Assert.assertNotNull(evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS)); - - Map<String, Object> updatedEdgeMgrs = (Map<String, Object>) - evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS); - Assert.assertEquals(1, updatedEdgeMgrs.size()); - Assert.assertTrue(updatedEdgeMgrs.containsKey("a")); - Map<String, Object> updatedEdgeMgr = (Map<String, Object>) updatedEdgeMgrs.get("a"); - - Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY)); - - Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)); - - } - - -}
http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java deleted file mode 100644 index d48948b..0000000 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java +++ /dev/null @@ -1,253 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.tests; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.Collection; -import java.util.EnumSet; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.mapred.ShuffleHandler; -import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.util.JarFinder; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.MiniYARNCluster; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.app.DAGAppMaster; -import org.apache.tez.mapreduce.hadoop.MRConfig; -import org.apache.tez.mapreduce.hadoop.MRJobConfig; - -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; - -/** - * Configures and starts the Tez-specific components in the YARN cluster. - * - * When using this mini cluster, the user is expected to - */ -public class MiniTezClusterWithTimeline extends MiniYARNCluster { - - public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class); - - private static final Log LOG = LogFactory.getLog(MiniTezClusterWithTimeline.class); - - private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml"; - - private Path confFilePath; - - public MiniTezClusterWithTimeline(String testName) { - this(testName, 1); - } - - public MiniTezClusterWithTimeline(String testName, int noOfNMs) { - super(testName, noOfNMs, 4, 4); - } - - public MiniTezClusterWithTimeline(String testName, int noOfNMs, - int numLocalDirs, int numLogDirs) { - super(testName, noOfNMs, numLocalDirs, numLogDirs); - } - - public MiniTezClusterWithTimeline(String testName, int noOfNMs, - int numLocalDirs, int numLogDirs, boolean enableAHS) { - super(testName, 1, noOfNMs, numLocalDirs, numLogDirs, enableAHS); - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME); - // Use libs from cluster since no build is available - conf.setBoolean(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, true); - // blacklisting disabled to prevent scheduling issues - conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); - if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) { - conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), - "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath()); - } - - if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) { - // nothing defined. set quick delete value - conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l); - } - - File appJarLocalFile = new File(MiniTezClusterWithTimeline.APPJAR); - - if (!appJarLocalFile.exists()) { - String message = "TezAppJar " + MiniTezClusterWithTimeline.APPJAR - + " not found. Exiting."; - LOG.info(message); - throw new TezUncheckedException(message); - } else { - LOG.info("Using Tez AppJar: " + appJarLocalFile.getAbsolutePath()); - } - - FileSystem fs = FileSystem.get(conf); - Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir")); - Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar"); - // Copy AppJar and make it public. - Path appMasterJar = new Path(MiniTezClusterWithTimeline.APPJAR); - fs.copyFromLocalFile(appMasterJar, appRemoteJar); - fs.setPermission(appRemoteJar, new FsPermission("777")); - - conf.set(TezConfiguration.TEZ_LIB_URIS, appRemoteJar.toUri().toString()); - LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS)); - - // VMEM monitoring disabled, PMEM monitoring enabled. - conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false); - conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false); - - conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); - - try { - Path stagingPath = FileContext.getFileContext(conf).makeQualified( - new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR))); - /* - * Re-configure the staging path on Windows if the file system is localFs. - * We need to use a absolute path that contains the drive letter. The unit - * test could run on a different drive than the AM. We can run into the - * issue that job files are localized to the drive where the test runs on, - * while the AM starts on a different drive and fails to find the job - * metafiles. Using absolute path can avoid this ambiguity. - */ - if (Path.WINDOWS) { - if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) { - conf.set(MRJobConfig.MR_AM_STAGING_DIR, - new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR)) - .getAbsolutePath()); - } - } - FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf); - if (fc.util().exists(stagingPath)) { - LOG.info(stagingPath + " exists! deleting..."); - fc.delete(stagingPath, true); - } - LOG.info("mkdir: " + stagingPath); - fc.mkdir(stagingPath, null, true); - - //mkdir done directory as well - String doneDir = - JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); - Path doneDirPath = fc.makeQualified(new Path(doneDir)); - fc.mkdir(doneDirPath, null, true); - } catch (IOException e) { - throw new TezUncheckedException("Could not create staging directory. ", e); - } - conf.set(MRConfig.MASTER_ADDRESS, "test"); - - //configure the shuffle service in NM - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, - new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID }); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, - ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, - Service.class); - - // Non-standard shuffle port - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); - - conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, - DefaultContainerExecutor.class, ContainerExecutor.class); - - // TestMRJobs is for testing non-uberized operation only; see TestUberAM - // for corresponding uberized tests. - conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); - super.serviceInit(conf); - } - - @Override - public void serviceStart() throws Exception { - LOG.info("Starting MiniTezClusterWithTimeline"); - super.serviceStart(); - File workDir = super.getTestWorkDir(); - Configuration conf = super.getConfig(); - - confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG); - File confFile = new File(confFilePath.toString()); - try { - confFile.createNewFile(); - conf.writeXml(new FileOutputStream(confFile)); - confFile.deleteOnExit(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - throw new RuntimeException(e); - } - confFilePath = new Path(confFile.getAbsolutePath()); - conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, - workDir.getAbsolutePath(), System.getProperty("java.class.path")); - LOG.info("Setting yarn-site.xml via YARN-APP-CP at: " - + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)); - } - - @Override - protected void serviceStop() throws Exception { - waitForAppsToFinish(); - super.serviceStop(); - } - - private void waitForAppsToFinish() { - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(getConfig()); - yarnClient.start(); - try { - while(true) { - List<ApplicationReport> appReports = yarnClient.getApplications(); - Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>(){ - @Override - public boolean apply(ApplicationReport appReport) { - return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, - YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) - .contains(appReport.getYarnApplicationState()); - } - }); - if (unCompletedApps.size()==0){ - break; - } - LOG.info("wait for applications to finish in MiniTezClusterWithTimeline"); - Thread.sleep(1000); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - yarnClient.stop(); - } - } - - public Path getConfigFilePath() { - return confFilePath; - } - -}
