http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats deleted file mode 120000 index 362c202..0000000 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats +++ /dev/null @@ -1 +0,0 @@ -../../../../../../../../../../tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats \ No newline at end of file diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java new file mode 100644 index 0000000..18ec43e --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.records.TezDAGID; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestATSHistoryLoggingService { + + private static final Log LOG = LogFactory.getLog(TestATSHistoryLoggingService.class); + + private ATSHistoryLoggingService atsHistoryLoggingService; + private AppContext appContext; + private Configuration conf; + private int atsInvokeCounter; + private int atsEntitiesCounter; + private SystemClock clock = new SystemClock(); + + @Before + public void setup() throws Exception { + appContext = mock(AppContext.class); + atsHistoryLoggingService = new ATSHistoryLoggingService(); + atsHistoryLoggingService.setAppContext(appContext); + conf = new Configuration(false); + conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS, + 1000l); + conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2); + conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + atsInvokeCounter = 0; + atsEntitiesCounter = 0; + atsHistoryLoggingService.init(conf); + atsHistoryLoggingService.timelineClient = mock(TimelineClient.class); + atsHistoryLoggingService.start(); + when(appContext.getClock()).thenReturn(clock); + when(appContext.getCurrentDAGID()).thenReturn(null); + when(atsHistoryLoggingService.timelineClient.putEntities( + Matchers.<TimelineEntity[]>anyVararg())).thenAnswer( + new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + ++atsInvokeCounter; + atsEntitiesCounter += invocation.getArguments().length; + try { + Thread.sleep(500l); + } catch (InterruptedException e) { + // do nothing + } + return null; + } + } + ); + } + + @After + public void teardown() { + atsHistoryLoggingService.stop(); + atsHistoryLoggingService = null; + } + + @Test(timeout=20000) + public void testATSHistoryLoggingServiceShutdown() { + TezDAGID tezDAGID = TezDAGID.getInstance( + ApplicationId.newInstance(100l, 1), 1); + DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, + new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1")); + + for (int i = 0; i < 100; ++i) { + atsHistoryLoggingService.handle(historyEvent); + } + + try { + Thread.sleep(2500l); + } catch (InterruptedException e) { + // Do nothing + } + atsHistoryLoggingService.stop(); + + LOG.info("ATS entitiesSent=" + atsEntitiesCounter + + ", timelineInvocations=" + atsInvokeCounter); + + Assert.assertTrue(atsEntitiesCounter >= 4); + Assert.assertTrue(atsEntitiesCounter < 20); + + } + + @Test(timeout=20000) + public void testATSEventBatching() { + TezDAGID tezDAGID = TezDAGID.getInstance( + ApplicationId.newInstance(100l, 1), 1); + DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, + new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1")); + + for (int i = 0; i < 100; ++i) { + atsHistoryLoggingService.handle(historyEvent); + } + + try { + Thread.sleep(1000l); + } catch (InterruptedException e) { + // Do nothing + } + LOG.info("ATS entitiesSent=" + atsEntitiesCounter + + ", timelineInvocations=" + atsInvokeCounter); + + Assert.assertTrue(atsEntitiesCounter > atsInvokeCounter); + Assert.assertEquals(atsEntitiesCounter/2, atsInvokeCounter); + } + +} diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java new file mode 100644 index 0000000..9c4f721 --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import java.io.IOException; +import java.util.Random; + +import javax.ws.rs.core.MediaType; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.runtime.library.processor.SleepProcessor; +import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig; +import org.apache.tez.tests.MiniTezClusterWithTimeline; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; + +public class TestATSHistoryWithMiniCluster { + + private static final Log LOG = LogFactory.getLog(TestATSHistoryWithMiniCluster.class); + + protected static MiniTezClusterWithTimeline mrrTezCluster = null; + protected static MiniDFSCluster dfsCluster = null; + private static String timelineAddress; + private Random random = new Random(); + + private static Configuration conf = new Configuration(); + private static FileSystem remoteFs; + + private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + + TestATSHistoryWithMiniCluster.class.getName() + "-tmpDir"; + + @BeforeClass + public static void setup() throws IOException { + try { + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null) + .build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + + if (mrrTezCluster == null) { + try { + mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithMiniCluster.class.getName(), + 1, 1, 1, true); + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000); + mrrTezCluster.init(conf); + mrrTezCluster.start(); + } catch (Throwable e) { + LOG.info("Failed to start Mini Tez Cluster", e); + } + } + timelineAddress = mrrTezCluster.getConfig().get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS); + if (timelineAddress != null) { + // Hack to handle bug in MiniYARNCluster handling of webapp address + timelineAddress = timelineAddress.replace("0.0.0.0", "localhost"); + } + } + + @AfterClass + public static void tearDown() throws InterruptedException { + LOG.info("Shutdown invoked"); + Thread.sleep(10000); + if (mrrTezCluster != null) { + mrrTezCluster.stop(); + mrrTezCluster = null; + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + } + + // To be replaced after Timeline has java APIs for domains + private <K> K getTimelineData(String url, Class<K> clazz) { + Client client = new Client(); + WebResource resource = client.resource(url); + + ClientResponse response = resource.accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + + K entity = response.getEntity(clazz); + Assert.assertNotNull(entity); + return entity; + } + + @Test (timeout=50000) + public void testSimpleAMACls() throws Exception { + TezClient tezSession = null; + ApplicationId applicationId; + try { + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = DAG.create("TezSleepProcessor"); + Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(256, 1)); + dag.addVertex(vertex); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + + tezSession = TezClient.create("TezSleepProcessor", tezConf, true); + tezSession.start(); + + applicationId = tezSession.getAppMasterApplicationId(); + + DAGClient dagClient = tezSession.submitDAG(dag); + + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + } finally { + if (tezSession != null) { + tezSession.stop(); + } + } + +// verifyEntityExistence(applicationId); + } + + @Test (timeout=50000) + public void testDAGACls() throws Exception { + TezClient tezSession = null; + ApplicationId applicationId; + try { + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = DAG.create("TezSleepProcessor"); + Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(256, 1)); + dag.addVertex(vertex); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + + tezSession = TezClient.create("TezSleepProcessor", tezConf, true); + tezSession.start(); + + applicationId = tezSession.getAppMasterApplicationId(); + + DAGClient dagClient = tezSession.submitDAG(dag); + + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + } finally { + if (tezSession != null) { + tezSession.stop(); + } + } +// verifyEntityExistence(applicationId); + } + + private void verifyEntityExistence(ApplicationId applicationId) { + Assert.assertNotNull(timelineAddress); + + String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/" + + "tez_" + applicationId.toString() + "?fields=otherinfo"; + LOG.info("Getting timeline entity for tez application: " + appUrl); + TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class); + Assert.assertNotNull(appEntity); + + TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1); + String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/" + + tezDAGID.toString() + "?fields=otherinfo"; + LOG.info("Getting timeline entity for tez dag: " + dagUrl); + TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class); + Assert.assertNotNull(dagEntity); + } + + +} diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java new file mode 100644 index 0000000..6d713c5 --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -0,0 +1,850 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.tez.common.ATSConstants; +import org.apache.tez.common.VersionInfo; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.impl.VertexStats; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.AMLaunchedEvent; +import org.apache.tez.dag.history.events.AMStartedEvent; +import org.apache.tez.dag.history.events.AppLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerStoppedEvent; +import org.apache.tez.dag.history.events.DAGCommitStartedEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexCommitStartedEvent; +import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; +import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; +import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; +import org.apache.tez.dag.history.events.VertexInitializedEvent; +import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.history.logging.EntityTypes; +import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.codehaus.jettison.json.JSONException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestHistoryEventTimelineConversion { + + private ApplicationAttemptId applicationAttemptId; + private ApplicationId applicationId; + private String user = "user"; + private Random random = new Random(); + private TezDAGID tezDAGID; + private TezVertexID tezVertexID; + private TezTaskID tezTaskID; + private TezTaskAttemptID tezTaskAttemptID; + private DAGPlan dagPlan; + private ContainerId containerId; + private NodeId nodeId; + + @Before + public void setup() { + applicationId = ApplicationId.newInstance(9999l, 1); + applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1); + tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt()); + tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt()); + tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt()); + tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt()); + dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); + containerId = ContainerId.newInstance(applicationAttemptId, 111); + nodeId = NodeId.newInstance("node", 13435); + } + + @Test(timeout = 5000) + public void testHandlerExists() throws JSONException { + for (HistoryEventType eventType : HistoryEventType.values()) { + HistoryEvent event = null; + switch (eventType) { + case APP_LAUNCHED: + event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(), + user, new Configuration(false), null); + break; + case AM_LAUNCHED: + event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(), + user); + break; + case AM_STARTED: + event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user); + break; + case DAG_SUBMITTED: + event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId, + null, user, null); + break; + case DAG_INITIALIZED: + event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null); + break; + case DAG_STARTED: + event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName()); + break; + case DAG_FINISHED: + event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR, + null, null, user, dagPlan.getName(), null); + break; + case VERTEX_INITIALIZED: + event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(), + random.nextInt(), "proc", null); + break; + case VERTEX_STARTED: + event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt()); + break; + case VERTEX_PARALLELISM_UPDATED: + event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 1); + break; + case VERTEX_FINISHED: + event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(), + random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR, + null, null, null, null); + break; + case TASK_STARTED: + event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt()); + break; + case TASK_FINISHED: + event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(), + tezTaskAttemptID, TaskState.FAILED, null, null); + break; + case TASK_ATTEMPT_STARTED: + event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId, + nodeId, null, null, "nodeHttpAddress"); + break; + case TASK_ATTEMPT_FINISHED: + event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), + random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null); + break; + case CONTAINER_LAUNCHED: + event = new ContainerLaunchedEvent(containerId, random.nextInt(), + applicationAttemptId); + break; + case CONTAINER_STOPPED: + event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId); + break; + case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: + event = new VertexRecoverableEventsGeneratedEvent(); + break; + case DAG_COMMIT_STARTED: + event = new DAGCommitStartedEvent(); + break; + case VERTEX_COMMIT_STARTED: + event = new VertexCommitStartedEvent(); + break; + case VERTEX_GROUP_COMMIT_STARTED: + event = new VertexGroupCommitStartedEvent(); + break; + case VERTEX_GROUP_COMMIT_FINISHED: + event = new VertexGroupCommitFinishedEvent(); + break; + default: + Assert.fail("Unhandled event type " + eventType); + } + if (event == null || !event.isHistoryEvent()) { + continue; + } + HistoryEventTimelineConversion.convertToTimelineEntity(event); + } + } + + static class MockVersionInfo extends VersionInfo { + + MockVersionInfo() { + super("component", "1.1.0", "rev1", "20120101", "git.apache.org"); + } + + } + + @Test(timeout = 5000) + public void testConvertAppLaunchedEvent() { + long launchTime = random.nextLong(); + long submitTime = random.nextLong(); + Configuration conf = new Configuration(false); + conf.set("foo", "bar"); + conf.set("applicationId", "1234"); + + MockVersionInfo mockVersionInfo = new MockVersionInfo(); + AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime, + submitTime, user, conf, mockVersionInfo); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(EntityTypes.TEZ_APPLICATION.name(), timelineEntity.getEntityType()); + Assert.assertEquals("tez_" + applicationId.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(2, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user)); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + + Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + + Assert.assertEquals(2, timelineEntity.getOtherInfo().size()); + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.CONFIG)); + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.TEZ_VERSION)); + + Map<String, String> config = + (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.CONFIG); + Assert.assertEquals(conf.get("foo"), config.get("foo")); + Assert.assertEquals(conf.get("applicationId"), config.get("applicationId")); + + Map<String, String> versionInfo = + (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.TEZ_VERSION); + Assert.assertEquals(mockVersionInfo.getVersion(), + versionInfo.get(ATSConstants.VERSION)); + Assert.assertEquals(mockVersionInfo.getRevision(), + versionInfo.get(ATSConstants.REVISION)); + Assert.assertEquals(mockVersionInfo.getBuildTime(), + versionInfo.get(ATSConstants.BUILD_TIME)); + + } + + @Test(timeout = 5000) + public void testConvertAMLaunchedEvent() { + long launchTime = random.nextLong(); + long submitTime = random.nextLong(); + AMLaunchedEvent event = new AMLaunchedEvent(applicationAttemptId, launchTime, submitTime, user); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType()); + + final Map<String, Set<String>> relatedEntities = timelineEntity.getRelatedEntities(); + Assert.assertEquals(3, relatedEntities.size()); + Assert.assertTrue(relatedEntities.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(relatedEntities.get(ATSConstants.APPLICATION_ATTEMPT_ID) + .contains(applicationAttemptId.toString())); + Assert.assertTrue(relatedEntities.get(ATSConstants.USER).contains(user)); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(2, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.USER).contains(user)); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + + Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.AM_LAUNCHED.name(), evt.getEventType()); + Assert.assertEquals(launchTime, evt.getTimestamp()); + + final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); + Assert.assertEquals(1, otherInfo.size()); + Assert.assertEquals(submitTime, otherInfo.get(ATSConstants.APP_SUBMIT_TIME)); + } + + @Test(timeout = 5000) + public void testConvertContainerLaunchedEvent() { + long launchTime = random.nextLong(); + ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime, + applicationAttemptId); + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(2, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains( + containerId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains( + "tez_" + applicationAttemptId.toString())); + + Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationAttemptId.getApplicationId().toString())); + + Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + Assert.assertEquals(HistoryEventType.CONTAINER_LAUNCHED.name(), + timelineEntity.getEvents().get(0).getEventType()); + Assert.assertEquals(launchTime, + timelineEntity.getEvents().get(0).getTimestamp()); + } + + @Test(timeout = 5000) + public void testConvertContainerStoppedEvent() { + long stopTime = random.nextLong(); + int exitStatus = random.nextInt(); + ContainerStoppedEvent event = new ContainerStoppedEvent(containerId, stopTime, exitStatus, + applicationAttemptId); + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType()); + + final Map<String, Set<String>> relatedEntities = timelineEntity.getRelatedEntities(); + Assert.assertEquals(1, relatedEntities.size()); + Assert.assertTrue(relatedEntities.get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()) + .contains("tez_" + applicationAttemptId.toString())); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(2, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(primaryFilters.get(ATSConstants.EXIT_STATUS).contains(exitStatus)); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + final TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.CONTAINER_STOPPED.name(), evt.getEventType()); + Assert.assertEquals(stopTime, evt.getTimestamp()); + + final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); + Assert.assertEquals(2, otherInfo.size()); + Assert.assertEquals(exitStatus, otherInfo.get(ATSConstants.EXIT_STATUS)); + Assert.assertEquals(stopTime, otherInfo.get(ATSConstants.FINISH_TIME)); + } + + @Test(timeout = 5000) + public void testConvertDAGStartedEvent() { + long startTime = random.nextLong(); + String dagName = "testDagName"; + DAGStartedEvent event = new DAGStartedEvent(tezDAGID, startTime, user, dagName); + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_STARTED.name(), evt.getEventType()); + Assert.assertEquals(startTime, evt.getTimestamp()); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(3, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.USER).contains(user)); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(primaryFilters.get(ATSConstants.DAG_NAME).contains(dagName)); + + final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); + Assert.assertEquals(2, otherInfo.size()); + Assert.assertEquals(startTime, otherInfo.get(ATSConstants.START_TIME)); + Assert.assertEquals(DAGState.RUNNING.name(), otherInfo.get(ATSConstants.STATUS)); + } + @Test(timeout = 5000) + public void testConvertDAGSubmittedEvent() { + long submitTime = random.nextLong(); + + DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan, + applicationAttemptId, null, user, null); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(5, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION.name()).contains( + "tez_" + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains( + "tez_" + applicationAttemptId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ATTEMPT_ID).contains( + applicationAttemptId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains( + applicationAttemptId.getApplicationId().toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user)); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), timelineEvent.getEventType()); + Assert.assertEquals(submitTime, timelineEvent.getTimestamp()); + + Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains( + dagPlan.getName())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationAttemptId.getApplicationId().toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN)); + Assert.assertEquals(applicationId.toString(), + timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); + + } + + @Test(timeout = 5000) + public void testConvertTaskAttemptFinishedEvent(){ + String vertexName = "testVertex"; + long startTime = random.nextLong(); + long finishTime = startTime + 1234; + TaskAttemptState state = TaskAttemptState + .values()[random.nextInt(TaskAttemptState.values().length)]; + TaskAttemptTerminationCause error = TaskAttemptTerminationCause + .values()[random.nextInt(TaskAttemptTerminationCause.values().length)]; + String diagnostics = "random diagnostics message"; + TezCounters counters = new TezCounters(); + + TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, + startTime, finishTime, state, error, diagnostics, counters); + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(5, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name()) + .contains(tezDAGID.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_VERTEX_ID.name()) + .contains(tezVertexID.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_TASK_ID.name()) + .contains(tezTaskID.toString())); + Assert.assertTrue(primaryFilters.get(ATSConstants.STATUS).contains(state.toString())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_FINISHED.name(), evt.getEventType()); + Assert.assertEquals(finishTime, evt.getTimestamp()); + + final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); + Assert.assertEquals(6, otherInfo.size()); + Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME)); + Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN)); + Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS)); + Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); + Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS)); + Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS)); + } + + @Test(timeout = 5000) + public void testConvertDAGInitializedEvent() { + long initTime = random.nextLong(); + + Map<String,TezVertexID> nameIdMap = new HashMap<String, TezVertexID>(); + nameIdMap.put("foo", tezVertexID); + + DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName", + nameIdMap); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(0, timelineEntity.getRelatedEntities().size()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_INITIALIZED.name(), timelineEvent.getEventType()); + Assert.assertEquals(initTime, timelineEvent.getTimestamp()); + + Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("dagName")); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey( + ATSConstants.VERTEX_NAME_ID_MAPPING)); + Map<String, String> vIdMap = (Map<String, String>) timelineEntity.getOtherInfo().get( + ATSConstants.VERTEX_NAME_ID_MAPPING); + Assert.assertEquals(1, vIdMap.size()); + Assert.assertNotNull(vIdMap.containsKey("foo")); + Assert.assertEquals(tezVertexID.toString(), vIdMap.get("foo")); + + } + + @Test(timeout = 5000) + public void testConvertDAGFinishedEvent() { + long finishTime = random.nextLong(); + long startTime = random.nextLong(); + Map<String,Integer> taskStats = new HashMap<String, Integer>(); + taskStats.put("FOO", 100); + taskStats.put("BAR", 200); + + DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR, + "diagnostics", null, user, dagPlan.getName(), taskStats); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(0, timelineEntity.getRelatedEntities().size()); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType()); + Assert.assertEquals(finishTime, timelineEvent.getTimestamp()); + + Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(dagPlan.getName())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains( + DAGState.ERROR.name())); + + Assert.assertEquals(startTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue()); + Assert.assertEquals(finishTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue()); + Assert.assertEquals(finishTime - startTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue()); + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS)); + Assert.assertEquals(DAGState.ERROR.name(), + timelineEntity.getOtherInfo().get(ATSConstants.STATUS)); + Assert.assertEquals("diagnostics", + timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS)); + + Assert.assertEquals(100, + ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue()); + Assert.assertEquals(200, + ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue()); + } + + @Test(timeout = 5000) + public void testConvertVertexInitializedEvent() { + long initRequestedTime = random.nextLong(); + long initedTime = random.nextLong(); + int numTasks = random.nextInt(); + VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime, + initedTime, numTasks, "proc", null); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(initedTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(1, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_DAG_ID.name()).contains( + tezDAGID.toString())); + + Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( + tezDAGID.toString())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.VERTEX_INITIALIZED.name(), timelineEvent.getEventType()); + Assert.assertEquals(initedTime, timelineEvent.getTimestamp()); + + Assert.assertEquals("v1", timelineEntity.getOtherInfo().get(ATSConstants.VERTEX_NAME)); + Assert.assertEquals("proc", timelineEntity.getOtherInfo().get(ATSConstants.PROCESSOR_CLASS_NAME)); + + Assert.assertEquals(initedTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue()); + Assert.assertEquals(initRequestedTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_REQUESTED_TIME)).longValue()); + Assert.assertEquals(initedTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue()); + Assert.assertEquals(numTasks, + ((Integer)timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue()); + } + + @Test(timeout = 5000) + public void testConvertVertexFinishedEvent() { + long initRequestedTime = random.nextLong(); + long initedTime = random.nextLong(); + long startRequestedTime = random.nextLong(); + long startTime = random.nextLong(); + long finishTime = random.nextLong(); + Map<String,Integer> taskStats = new HashMap<String, Integer>(); + taskStats.put("FOO", 100); + taskStats.put("BAR", 200); + VertexStats vertexStats = new VertexStats(); + + VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1,initRequestedTime, + initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR, + "diagnostics", null, vertexStats, taskStats); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(0, timelineEntity.getRelatedEntities().size()); + + Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( + tezDAGID.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains( + VertexState.ERROR.name())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.VERTEX_FINISHED.name(), timelineEvent.getEventType()); + Assert.assertEquals(finishTime, timelineEvent.getTimestamp()); + + Assert.assertEquals(finishTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue()); + Assert.assertEquals(finishTime - startTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue()); + Assert.assertEquals(VertexState.ERROR.name(), + timelineEntity.getOtherInfo().get(ATSConstants.STATUS)); + Assert.assertEquals("diagnostics", + timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS)); + + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS)); + + Assert.assertEquals(100, + ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue()); + Assert.assertEquals(200, + ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue()); + } + + @Test(timeout = 5000) + public void testConvertTaskStartedEvent() { + long scheduleTime = random.nextLong(); + long startTime = random.nextLong(); + TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", scheduleTime, startTime); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(1, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_VERTEX_ID.name()).contains( + tezVertexID.toString())); + + Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( + tezDAGID.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains( + tezVertexID.toString())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.TASK_STARTED.name(), timelineEvent.getEventType()); + Assert.assertEquals(startTime, timelineEvent.getTimestamp()); + + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.SCHEDULED_TIME)); + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME)); + + Assert.assertEquals(scheduleTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)).longValue()); + Assert.assertEquals(startTime, + ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue()); + Assert.assertTrue(TaskState.SCHEDULED.name() + .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS))); + } + + @Test(timeout = 5000) + public void testConvertTaskAttemptStartedEvent() { + long startTime = random.nextLong(); + TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", + startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress"); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); + Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); + + Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue()); + + Assert.assertEquals(3, timelineEntity.getRelatedEntities().size()); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.NODE_ID).contains(nodeId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains( + containerId.toString())); + Assert.assertTrue( + timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_TASK_ID.name()).contains( + tezTaskID.toString())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent timelineEvent = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_STARTED.name(), timelineEvent.getEventType()); + Assert.assertEquals(startTime, timelineEvent.getTimestamp()); + + Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size()); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( + applicationId.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains( + tezDAGID.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains( + tezVertexID.toString())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_TASK_ID.name()).contains( + tezTaskID.toString())); + + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME)); + Assert.assertEquals("inProgressURL", + timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL)); + Assert.assertEquals("logsURL", + timelineEntity.getOtherInfo().get(ATSConstants.COMPLETED_LOGS_URL)); + Assert.assertEquals(nodeId.toString(), + timelineEntity.getOtherInfo().get(ATSConstants.NODE_ID)); + Assert.assertEquals(containerId.toString(), + timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID)); + Assert.assertEquals("nodeHttpAddress", + timelineEntity.getOtherInfo().get(ATSConstants.NODE_HTTP_ADDRESS)); + Assert.assertTrue(TaskAttemptState.RUNNING.name() + .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS))); + } + + @Test(timeout = 5000) + public void testConvertTaskFinishedEvent() { + String vertexName = "testVertexName"; + long startTime = random.nextLong(); + long finishTime = random.nextLong(); + TaskState state = TaskState.values()[random.nextInt(TaskState.values().length)]; + String diagnostics = "diagnostics message"; + TezCounters counters = new TezCounters(); + + TaskFinishedEvent event = new TaskFinishedEvent(tezTaskID, vertexName, startTime, finishTime, + tezTaskAttemptID, state, diagnostics, counters); + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + + Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType()); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(4, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name()) + .contains(tezDAGID.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_VERTEX_ID.name()) + .contains(tezVertexID.toString())); + Assert.assertTrue(primaryFilters.get(ATSConstants.STATUS).contains(state.name())); + + Assert.assertEquals(1, timelineEntity.getEvents().size()); + TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.TASK_FINISHED.name(), evt.getEventType()); + Assert.assertEquals(finishTime, evt.getTimestamp()); + + final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); + Assert.assertEquals(6, otherInfo.size()); + Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME)); + Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN)); + Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS)); + Assert.assertEquals(tezTaskAttemptID.toString(), + otherInfo.get(ATSConstants.SUCCESSFUL_ATTEMPT_ID)); + Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS)); + Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS)); + } + + @Test(timeout = 5000) + public void testConvertVertexParallelismUpdatedEvent() { + TezVertexID vId = tezVertexID; + Map<String, EdgeManagerPluginDescriptor> edgeMgrs = + new HashMap<String, EdgeManagerPluginDescriptor>(); + edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text")); + VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null, + edgeMgrs, null, 10); + + TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); + Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType()); + Assert.assertEquals(vId.toString(), timelineEntity.getEntityId()); + Assert.assertEquals(1, timelineEntity.getEvents().size()); + + final Map<String, Set<Object>> primaryFilters = timelineEntity.getPrimaryFilters(); + Assert.assertEquals(2, primaryFilters.size()); + Assert.assertTrue(primaryFilters.get(ATSConstants.APPLICATION_ID) + .contains(applicationId.toString())); + Assert.assertTrue(primaryFilters.get(EntityTypes.TEZ_DAG_ID.name()) + .contains(tezDAGID.toString())); + + TimelineEvent evt = timelineEntity.getEvents().get(0); + Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(), evt.getEventType()); + Assert.assertEquals(1, evt.getEventInfo().get(ATSConstants.NUM_TASKS)); + Assert.assertEquals(10, evt.getEventInfo().get(ATSConstants.OLD_NUM_TASKS)); + Assert.assertNotNull(evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS)); + + Map<String, Object> updatedEdgeMgrs = (Map<String, Object>) + evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS); + Assert.assertEquals(1, updatedEdgeMgrs.size()); + Assert.assertTrue(updatedEdgeMgrs.containsKey("a")); + Map<String, Object> updatedEdgeMgr = (Map<String, Object>) updatedEdgeMgrs.get("a"); + + Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY)); + + Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)); + + } + + +}
http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java new file mode 100644 index 0000000..18ec43e --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.records.TezDAGID; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestATSHistoryLoggingService { + + private static final Log LOG = LogFactory.getLog(TestATSHistoryLoggingService.class); + + private ATSHistoryLoggingService atsHistoryLoggingService; + private AppContext appContext; + private Configuration conf; + private int atsInvokeCounter; + private int atsEntitiesCounter; + private SystemClock clock = new SystemClock(); + + @Before + public void setup() throws Exception { + appContext = mock(AppContext.class); + atsHistoryLoggingService = new ATSHistoryLoggingService(); + atsHistoryLoggingService.setAppContext(appContext); + conf = new Configuration(false); + conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS, + 1000l); + conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2); + conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + atsInvokeCounter = 0; + atsEntitiesCounter = 0; + atsHistoryLoggingService.init(conf); + atsHistoryLoggingService.timelineClient = mock(TimelineClient.class); + atsHistoryLoggingService.start(); + when(appContext.getClock()).thenReturn(clock); + when(appContext.getCurrentDAGID()).thenReturn(null); + when(atsHistoryLoggingService.timelineClient.putEntities( + Matchers.<TimelineEntity[]>anyVararg())).thenAnswer( + new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + ++atsInvokeCounter; + atsEntitiesCounter += invocation.getArguments().length; + try { + Thread.sleep(500l); + } catch (InterruptedException e) { + // do nothing + } + return null; + } + } + ); + } + + @After + public void teardown() { + atsHistoryLoggingService.stop(); + atsHistoryLoggingService = null; + } + + @Test(timeout=20000) + public void testATSHistoryLoggingServiceShutdown() { + TezDAGID tezDAGID = TezDAGID.getInstance( + ApplicationId.newInstance(100l, 1), 1); + DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, + new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1")); + + for (int i = 0; i < 100; ++i) { + atsHistoryLoggingService.handle(historyEvent); + } + + try { + Thread.sleep(2500l); + } catch (InterruptedException e) { + // Do nothing + } + atsHistoryLoggingService.stop(); + + LOG.info("ATS entitiesSent=" + atsEntitiesCounter + + ", timelineInvocations=" + atsInvokeCounter); + + Assert.assertTrue(atsEntitiesCounter >= 4); + Assert.assertTrue(atsEntitiesCounter < 20); + + } + + @Test(timeout=20000) + public void testATSEventBatching() { + TezDAGID tezDAGID = TezDAGID.getInstance( + ApplicationId.newInstance(100l, 1), 1); + DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, + new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1")); + + for (int i = 0; i < 100; ++i) { + atsHistoryLoggingService.handle(historyEvent); + } + + try { + Thread.sleep(1000l); + } catch (InterruptedException e) { + // Do nothing + } + LOG.info("ATS entitiesSent=" + atsEntitiesCounter + + ", timelineInvocations=" + atsInvokeCounter); + + Assert.assertTrue(atsEntitiesCounter > atsInvokeCounter); + Assert.assertEquals(atsEntitiesCounter/2, atsInvokeCounter); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/6692c514/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java new file mode 100644 index 0000000..9c4f721 --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; + +import java.io.IOException; +import java.util.Random; + +import javax.ws.rs.core.MediaType; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.runtime.library.processor.SleepProcessor; +import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig; +import org.apache.tez.tests.MiniTezClusterWithTimeline; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; + +public class TestATSHistoryWithMiniCluster { + + private static final Log LOG = LogFactory.getLog(TestATSHistoryWithMiniCluster.class); + + protected static MiniTezClusterWithTimeline mrrTezCluster = null; + protected static MiniDFSCluster dfsCluster = null; + private static String timelineAddress; + private Random random = new Random(); + + private static Configuration conf = new Configuration(); + private static FileSystem remoteFs; + + private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + + TestATSHistoryWithMiniCluster.class.getName() + "-tmpDir"; + + @BeforeClass + public static void setup() throws IOException { + try { + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null) + .build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + + if (mrrTezCluster == null) { + try { + mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithMiniCluster.class.getName(), + 1, 1, 1, true); + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000); + mrrTezCluster.init(conf); + mrrTezCluster.start(); + } catch (Throwable e) { + LOG.info("Failed to start Mini Tez Cluster", e); + } + } + timelineAddress = mrrTezCluster.getConfig().get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS); + if (timelineAddress != null) { + // Hack to handle bug in MiniYARNCluster handling of webapp address + timelineAddress = timelineAddress.replace("0.0.0.0", "localhost"); + } + } + + @AfterClass + public static void tearDown() throws InterruptedException { + LOG.info("Shutdown invoked"); + Thread.sleep(10000); + if (mrrTezCluster != null) { + mrrTezCluster.stop(); + mrrTezCluster = null; + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + } + + // To be replaced after Timeline has java APIs for domains + private <K> K getTimelineData(String url, Class<K> clazz) { + Client client = new Client(); + WebResource resource = client.resource(url); + + ClientResponse response = resource.accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + + K entity = response.getEntity(clazz); + Assert.assertNotNull(entity); + return entity; + } + + @Test (timeout=50000) + public void testSimpleAMACls() throws Exception { + TezClient tezSession = null; + ApplicationId applicationId; + try { + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = DAG.create("TezSleepProcessor"); + Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(256, 1)); + dag.addVertex(vertex); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + + tezSession = TezClient.create("TezSleepProcessor", tezConf, true); + tezSession.start(); + + applicationId = tezSession.getAppMasterApplicationId(); + + DAGClient dagClient = tezSession.submitDAG(dag); + + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + } finally { + if (tezSession != null) { + tezSession.stop(); + } + } + +// verifyEntityExistence(applicationId); + } + + @Test (timeout=50000) + public void testDAGACls() throws Exception { + TezClient tezSession = null; + ApplicationId applicationId; + try { + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = DAG.create("TezSleepProcessor"); + Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(256, 1)); + dag.addVertex(vertex); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + + tezSession = TezClient.create("TezSleepProcessor", tezConf, true); + tezSession.start(); + + applicationId = tezSession.getAppMasterApplicationId(); + + DAGClient dagClient = tezSession.submitDAG(dag); + + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + } finally { + if (tezSession != null) { + tezSession.stop(); + } + } +// verifyEntityExistence(applicationId); + } + + private void verifyEntityExistence(ApplicationId applicationId) { + Assert.assertNotNull(timelineAddress); + + String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/" + + "tez_" + applicationId.toString() + "?fields=otherinfo"; + LOG.info("Getting timeline entity for tez application: " + appUrl); + TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class); + Assert.assertNotNull(appEntity); + + TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1); + String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/" + + tezDAGID.toString() + "?fields=otherinfo"; + LOG.info("Getting timeline entity for tez dag: " + dagUrl); + TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class); + Assert.assertNotNull(dagEntity); + } + + +}
