Repository: tez
Updated Branches:
  refs/heads/master 871ea80c0 -> 24b872a7f


http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
 
b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
new file mode 100644
index 0000000..92d3e30
--- /dev/null
+++ 
b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
@@ -0,0 +1,716 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.common.VersionInfo;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.app.web.AMWebController;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGKillRequestEvent;
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import 
org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
+import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.KVPair;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskFailureType;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestHistoryEventProtoConverter {
+  private ApplicationAttemptId applicationAttemptId;
+  private ApplicationId applicationId;
+  private String user = "user";
+  private Random random = new Random();
+  private TezDAGID tezDAGID;
+  private TezVertexID tezVertexID;
+  private TezTaskID tezTaskID;
+  private TezTaskAttemptID tezTaskAttemptID;
+  private DAGPlan dagPlan;
+  private ContainerId containerId;
+  private NodeId nodeId;
+  private String containerLogs = "containerLogs";
+  private HistoryEventProtoConverter converter = new 
HistoryEventProtoConverter();
+
+  @Before
+  public void setup() {
+    applicationId = ApplicationId.newInstance(9999l, 1);
+    applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+    tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
+    tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
+    tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
+    tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 
random.nextInt());
+    CallerContextProto.Builder callerContextProto = 
CallerContextProto.newBuilder();
+    callerContextProto.setContext("ctxt");
+    callerContextProto.setCallerId("Caller_ID");
+    callerContextProto.setCallerType("Caller_Type");
+    callerContextProto.setBlob("Desc_1");
+    dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock")
+        .setCallerContext(callerContextProto).build();
+    containerId = ContainerId.newContainerId(applicationAttemptId, 111);
+    nodeId = NodeId.newInstance("node", 13435);
+  }
+
+  @Test(timeout = 5000)
+  public void testHandlerExists() {
+    for (HistoryEventType eventType : HistoryEventType.values()) {
+      HistoryEvent event = null;
+      switch (eventType) {
+        case APP_LAUNCHED:
+          event = new AppLaunchedEvent(applicationId, random.nextInt(), 
random.nextInt(),
+              user, new Configuration(false), null);
+          break;
+        case AM_LAUNCHED:
+          event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), 
random.nextInt(),
+              user);
+          break;
+        case AM_STARTED:
+          event = new AMStartedEvent(applicationAttemptId, random.nextInt(), 
user);
+          break;
+        case DAG_SUBMITTED:
+          event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, 
applicationAttemptId,
+              null, user, null, containerLogs, null);
+          break;
+        case DAG_INITIALIZED:
+          event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, 
dagPlan.getName(), null);
+          break;
+        case DAG_STARTED:
+          event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, 
dagPlan.getName());
+          break;
+        case DAG_FINISHED:
+          event = new DAGFinishedEvent(tezDAGID, random.nextInt(), 
random.nextInt(), DAGState.ERROR,
+              null, null, user, dagPlan.getName(), null, applicationAttemptId, 
dagPlan);
+          break;
+        case VERTEX_INITIALIZED:
+          event = new VertexInitializedEvent(tezVertexID, "v1", 
random.nextInt(), random.nextInt(),
+              random.nextInt(), "proc", null, null, null);
+          break;
+        case VERTEX_STARTED:
+          event = new VertexStartedEvent(tezVertexID, random.nextInt(), 
random.nextInt());
+          break;
+        case VERTEX_CONFIGURE_DONE:
+          event = new VertexConfigurationDoneEvent(tezVertexID, 0L, 1, null, 
null, null, true);
+          break;
+        case VERTEX_FINISHED:
+          event = new VertexFinishedEvent(tezVertexID, "v1", 1, 
random.nextInt(), random.nextInt(),
+              random.nextInt(), random.nextInt(), random.nextInt(), 
VertexState.ERROR,
+              null, null, null, null, null);
+          break;
+        case TASK_STARTED:
+          event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), 
random.nextInt());
+          break;
+        case TASK_FINISHED:
+          event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), 
random.nextInt(),
+              tezTaskAttemptID, TaskState.FAILED, null, null, 0);
+          break;
+        case TASK_ATTEMPT_STARTED:
+          event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", 
random.nextInt(), containerId,
+              nodeId, null, null, "nodeHttpAddress");
+          break;
+        case TASK_ATTEMPT_FINISHED:
+          event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", 
random.nextInt(),
+              random.nextInt(), TaskAttemptState.FAILED, 
TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.OUTPUT_LOST,
+              null, null, null, null, 0, null, 0,
+              containerId, nodeId, null, null, "nodeHttpAddress");
+          break;
+        case CONTAINER_LAUNCHED:
+          event = new ContainerLaunchedEvent(containerId, random.nextInt(),
+              applicationAttemptId);
+          break;
+        case CONTAINER_STOPPED:
+          event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, 
applicationAttemptId);
+          break;
+        case DAG_COMMIT_STARTED:
+          event = new DAGCommitStartedEvent();
+          break;
+        case VERTEX_COMMIT_STARTED:
+          event = new VertexCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          event = new VertexGroupCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          event = new VertexGroupCommitFinishedEvent();
+          break;
+        case DAG_RECOVERED:
+          event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, 
dagPlan.getName(),
+              user, random.nextLong(), containerLogs);
+          break;
+        case DAG_KILL_REQUEST:
+          event = new DAGKillRequestEvent();
+          break;
+        default:
+          Assert.fail("Unhandled event type " + eventType);
+      }
+      if (event == null || !event.isHistoryEvent()) {
+        continue;
+      }
+      converter.convert(event);
+    }
+  }
+
+  static class MockVersionInfo extends VersionInfo {
+    MockVersionInfo() {
+      super("component", "1.1.0", "rev1", "20120101", "git.apache.org");
+    }
+  }
+
+  private String findEventData(HistoryEventProto proto, String key) {
+    for (KVPair data : proto.getEventDataList()) {
+      if (data.getKey().equals(key)) {
+        return data.getValue();
+      }
+    }
+    return null;
+  }
+
+  private void assertEventData(HistoryEventProto proto, String key, String 
value) {
+    String evtVal = findEventData(proto, key);
+    if (evtVal == null) {
+      Assert.fail("Cannot find kv pair: " + key);
+    }
+    if (value != null) {
+      Assert.assertEquals(value, evtVal);
+    }
+  }
+
+  private void assertNoEventData(HistoryEventProto proto, String key) {
+    for (KVPair data : proto.getEventDataList()) {
+      if (data.getKey().equals(key)) {
+        Assert.fail("Found find kv pair: " + key);
+      }
+    }
+  }
+
+  private String safeToString(Object obj) {
+    return obj == null ? "" : obj.toString();
+  }
+
+  private void assertCommon(HistoryEventProto proto, HistoryEventType type, 
long eventTime,
+      EntityTypes entityType, ApplicationAttemptId appAttemptId, String user, 
int numData) {
+    Assert.assertEquals(type.name(), proto.getEventType());
+    Assert.assertEquals(eventTime, proto.getEventTime());
+    // Assert.assertEquals(safeToString(appId), proto.getAppId());
+    Assert.assertEquals(safeToString(appAttemptId), proto.getAppAttemptId());
+    Assert.assertEquals(safeToString(user), proto.getUser());
+    if (entityType != null) {
+      switch (entityType) { // Intentional fallthrough.
+      case TEZ_TASK_ATTEMPT_ID:
+        Assert.assertEquals(tezTaskAttemptID.toString(), 
proto.getTaskAttemptId());
+      case TEZ_TASK_ID:
+        Assert.assertEquals(tezTaskID.toString(), proto.getTaskId());
+      case TEZ_VERTEX_ID:
+        Assert.assertEquals(tezVertexID.toString(), proto.getVertexId());
+      case TEZ_DAG_ID:
+        Assert.assertEquals(tezDAGID.toString(), proto.getDagId());
+      case TEZ_APPLICATION:
+        Assert.assertEquals(applicationId.toString(), proto.getAppId());
+        break;
+      default:
+        Assert.fail("Invalid type: " + entityType.name());
+      }
+    }
+    Assert.assertEquals(numData, proto.getEventDataCount());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertAppLaunchedEvent() {
+    long launchTime = random.nextLong();
+    long submitTime = random.nextLong();
+    Configuration conf = new Configuration(false);
+    conf.set("foo", "bar");
+    conf.set("applicationId", "1234");
+
+    MockVersionInfo mockVersionInfo = new MockVersionInfo();
+    AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime, 
submitTime, user,
+        conf, mockVersionInfo);
+    HistoryEventProto proto = converter.convert(event);
+
+    assertCommon(proto, HistoryEventType.APP_LAUNCHED, launchTime, 
EntityTypes.TEZ_APPLICATION,
+        null, user, 3);
+    assertEventData(proto, ATSConstants.CONFIG, null);
+    assertEventData(proto, ATSConstants.TEZ_VERSION, null);
+    assertEventData(proto, ATSConstants.DAG_AM_WEB_SERVICE_VERSION, 
AMWebController.VERSION);
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertAMLaunchedEvent() {
+    long launchTime = random.nextLong();
+    long submitTime = random.nextLong();
+    AMLaunchedEvent event = new AMLaunchedEvent(applicationAttemptId, 
launchTime, submitTime,
+        user);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.AM_LAUNCHED, launchTime, 
EntityTypes.TEZ_APPLICATION,
+        applicationAttemptId, user, 1);
+    assertEventData(proto, ATSConstants.APP_SUBMIT_TIME, 
String.valueOf(submitTime));
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertAMStartedEvent() {
+    long startTime = random.nextLong();
+    AMStartedEvent event = new AMStartedEvent(applicationAttemptId, startTime, 
user);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.AM_STARTED, startTime, 
EntityTypes.TEZ_APPLICATION,
+        applicationAttemptId, user, 0);
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertContainerLaunchedEvent() {
+    long launchTime = random.nextLong();
+    ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, 
launchTime,
+        applicationAttemptId);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.CONTAINER_LAUNCHED, launchTime, 
EntityTypes.TEZ_APPLICATION,
+        applicationAttemptId, null, 1);
+    assertEventData(proto, ATSConstants.CONTAINER_ID, containerId.toString());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertContainerStoppedEvent() {
+    long stopTime = random.nextLong();
+    int exitStatus = random.nextInt();
+    ContainerStoppedEvent event = new ContainerStoppedEvent(containerId, 
stopTime, exitStatus,
+        applicationAttemptId);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.CONTAINER_STOPPED, stopTime, 
EntityTypes.TEZ_APPLICATION,
+        applicationAttemptId, null, 3);
+    assertEventData(proto, ATSConstants.CONTAINER_ID, containerId.toString());
+    assertEventData(proto, ATSConstants.EXIT_STATUS, 
String.valueOf(exitStatus));
+    assertEventData(proto, ATSConstants.FINISH_TIME, String.valueOf(stopTime));
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGStartedEvent() {
+    long startTime = random.nextLong();
+    String dagName = "testDagName";
+    DAGStartedEvent event = new DAGStartedEvent(tezDAGID, startTime, user, 
dagName);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.DAG_STARTED, startTime, 
EntityTypes.TEZ_DAG_ID, null,
+        user, 2);
+    assertEventData(proto, ATSConstants.DAG_NAME, dagName);
+    assertEventData(proto, ATSConstants.STATUS, DAGState.RUNNING.name());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGSubmittedEvent() {
+    long submitTime = random.nextLong();
+
+    final String queueName = "TEST_DAG_SUBMITTED";
+    DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, 
dagPlan,
+        applicationAttemptId, null, user, null, containerLogs, queueName);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.DAG_SUBMITTED, submitTime, 
EntityTypes.TEZ_DAG_ID,
+        applicationAttemptId, user, 8);
+
+    assertEventData(proto, ATSConstants.DAG_NAME, dagPlan.getName());
+    assertEventData(proto, ATSConstants.DAG_QUEUE_NAME, event.getQueueName());
+    assertEventData(proto, ATSConstants.DAG_AM_WEB_SERVICE_VERSION, 
AMWebController.VERSION);
+    assertEventData(proto, ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+        + applicationAttemptId.getAttemptId(), containerLogs);
+    assertEventData(proto, ATSConstants.CALLER_CONTEXT_ID,
+        dagPlan.getCallerContext().getCallerId());
+    assertEventData(proto, ATSConstants.CALLER_CONTEXT_TYPE,
+        dagPlan.getCallerContext().getCallerType());
+    assertEventData(proto, ATSConstants.CALLER_CONTEXT, 
dagPlan.getCallerContext().getContext());
+    assertEventData(proto, ATSConstants.DAG_PLAN, null);
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertTaskAttemptFinishedEvent() {
+    String vertexName = "testVertex";
+    long creationTime = random.nextLong();
+    long startTime = creationTime + 1000;
+    long allocationTime = creationTime + 1001;
+    long finishTime = startTime + 1002;
+    TaskAttemptState state = TaskAttemptState
+        .values()[random.nextInt(TaskAttemptState.values().length)];
+    TaskAttemptTerminationCause error = TaskAttemptTerminationCause
+        .values()[random.nextInt(TaskAttemptTerminationCause.values().length)];
+    String diagnostics = "random diagnostics message";
+    TezCounters counters = new TezCounters();
+    long lastDataEventTime = finishTime - 1;
+    List<DataEventDependencyInfo> events = Lists.newArrayList();
+    events.add(new DataEventDependencyInfo(lastDataEventTime, 
tezTaskAttemptID));
+    events.add(new DataEventDependencyInfo(lastDataEventTime, 
tezTaskAttemptID));
+
+    TaskAttemptFinishedEvent event = new 
TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
+        startTime, finishTime, state, TaskFailureType.FATAL, error, 
diagnostics, counters, events,
+        null, creationTime, tezTaskAttemptID, allocationTime, containerId, 
nodeId, "inProgressURL",
+        "logsURL", "nodeHttpAddress");
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.TASK_ATTEMPT_FINISHED, finishTime,
+        EntityTypes.TEZ_DAG_ID, null, null, 16);
+
+    assertEventData(proto, ATSConstants.STATUS, state.name());
+    assertEventData(proto, ATSConstants.CREATION_CAUSAL_ATTEMPT, 
tezTaskAttemptID.toString());
+    assertEventData(proto, ATSConstants.CREATION_TIME, 
String.valueOf(creationTime));
+    assertEventData(proto, ATSConstants.ALLOCATION_TIME, 
String.valueOf(allocationTime));
+    assertEventData(proto, ATSConstants.START_TIME, String.valueOf(startTime));
+    assertEventData(proto, ATSConstants.TIME_TAKEN, String.valueOf(finishTime 
- startTime));
+    assertEventData(proto, ATSConstants.TASK_FAILURE_TYPE, 
TaskFailureType.FATAL.name());
+    assertEventData(proto, ATSConstants.TASK_ATTEMPT_ERROR_ENUM, error.name());
+    assertEventData(proto, ATSConstants.DIAGNOSTICS, diagnostics);
+    assertEventData(proto, ATSConstants.LAST_DATA_EVENTS, null);
+    assertEventData(proto, ATSConstants.COUNTERS, null);
+    assertEventData(proto, ATSConstants.IN_PROGRESS_LOGS_URL, "inProgressURL");
+    assertEventData(proto, ATSConstants.COMPLETED_LOGS_URL, "logsURL");
+    assertEventData(proto, ATSConstants.NODE_ID, nodeId.toString());
+    assertEventData(proto, ATSConstants.CONTAINER_ID, containerId.toString());
+    assertEventData(proto, ATSConstants.NODE_HTTP_ADDRESS, "nodeHttpAddress");
+
+    TaskAttemptFinishedEvent eventWithNullFailureType =
+        new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
+            startTime, finishTime, state, null, error, diagnostics, counters, 
events, null,
+            creationTime,
+            tezTaskAttemptID, allocationTime, containerId, nodeId, 
"inProgressURL", "logsURL",
+            "nodeHttpAddress");
+    proto = converter.convert(eventWithNullFailureType);
+    assertNoEventData(proto, ATSConstants.TASK_FAILURE_TYPE);
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGInitializedEvent() {
+    long initTime = random.nextLong();
+
+    Map<String, TezVertexID> nameIdMap = new HashMap<String, TezVertexID>();
+    nameIdMap.put("foo", tezVertexID);
+
+    DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, 
"user", "dagName",
+        nameIdMap);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.DAG_INITIALIZED, initTime,
+        EntityTypes.TEZ_DAG_ID, null, user, 2);
+    assertEventData(proto, ATSConstants.DAG_NAME, "dagName");
+    assertEventData(proto, ATSConstants.VERTEX_NAME_ID_MAPPING, null);
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGFinishedEvent() {
+    long finishTime = random.nextLong();
+    long startTime = random.nextLong();
+    Map<String, Integer> taskStats = new HashMap<String, Integer>();
+    taskStats.put("FOO", 100);
+    taskStats.put("BAR", 200);
+
+    DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, 
finishTime, DAGState.ERROR,
+        "diagnostics", null, user, dagPlan.getName(), taskStats, 
applicationAttemptId, dagPlan);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.DAG_FINISHED, finishTime,
+        EntityTypes.TEZ_DAG_ID, applicationAttemptId, user, 11);
+
+    assertEventData(proto, ATSConstants.DAG_NAME, dagPlan.getName());
+    assertEventData(proto, ATSConstants.STATUS, DAGState.ERROR.name());
+    assertEventData(proto, ATSConstants.CALLER_CONTEXT_ID,
+        dagPlan.getCallerContext().getCallerId());
+    assertEventData(proto, ATSConstants.CALLER_CONTEXT_TYPE,
+        dagPlan.getCallerContext().getCallerType());
+    assertEventData(proto, ATSConstants.START_TIME, String.valueOf(startTime));
+    assertEventData(proto, ATSConstants.TIME_TAKEN, String.valueOf(finishTime 
- startTime));
+    assertEventData(proto, ATSConstants.DIAGNOSTICS, "diagnostics");
+    assertEventData(proto, ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID,
+        applicationAttemptId.toString());
+    assertEventData(proto, "FOO", String.valueOf(100));
+    assertEventData(proto, "BAR", String.valueOf(200));
+    assertEventData(proto, ATSConstants.COUNTERS, null);
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertVertexInitializedEvent() {
+    long initRequestedTime = random.nextLong();
+    long initedTime = random.nextLong();
+    int numTasks = random.nextInt();
+    VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, 
"v1", initRequestedTime,
+        initedTime, numTasks, "proc", null, null,
+        new ServicePluginInfo().setContainerLauncherName("abc")
+            .setTaskSchedulerName("def").setTaskCommunicatorName("ghi")
+            .setContainerLauncherClassName("abc1")
+            .setTaskSchedulerClassName("def1")
+            .setTaskCommunicatorClassName("ghi1"));
+
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.VERTEX_INITIALIZED, initedTime,
+        EntityTypes.TEZ_VERTEX_ID, null, null, 5);
+
+    assertEventData(proto, ATSConstants.VERTEX_NAME, "v1");
+    assertEventData(proto, ATSConstants.PROCESSOR_CLASS_NAME, "proc");
+    assertEventData(proto, ATSConstants.INIT_REQUESTED_TIME, 
String.valueOf(initRequestedTime));
+    assertEventData(proto, ATSConstants.NUM_TASKS, String.valueOf(numTasks));
+    assertEventData(proto, ATSConstants.SERVICE_PLUGIN, null);
+
+    /*
+    
Assert.assertNotNull(timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN));
+    Assert.assertEquals("abc",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.CONTAINER_LAUNCHER_NAME));
+    Assert.assertEquals("def",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_SCHEDULER_NAME));
+    Assert.assertEquals("ghi",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_COMMUNICATOR_NAME));
+    Assert.assertEquals("abc1",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME));
+    Assert.assertEquals("def1",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_SCHEDULER_CLASS_NAME));
+    Assert.assertEquals("ghi1",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_COMMUNICATOR_CLASS_NAME));
+    */
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertVertexStartedEvent() {
+    long startRequestedTime = random.nextLong();
+    long startTime = random.nextLong();
+
+    VertexStartedEvent event = new VertexStartedEvent(tezVertexID, 
startRequestedTime, startTime);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.VERTEX_STARTED, startTime,
+        EntityTypes.TEZ_VERTEX_ID, null, null, 2);
+    assertEventData(proto, ATSConstants.START_REQUESTED_TIME, 
String.valueOf(startRequestedTime));
+    assertEventData(proto, ATSConstants.STATUS, VertexState.RUNNING.name());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertVertexFinishedEvent() {
+    String vertexName = "v1";
+    long initRequestedTime = random.nextLong();
+    long initedTime = random.nextLong();
+    long startRequestedTime = random.nextLong();
+    long startTime = random.nextLong();
+    long finishTime = random.nextLong();
+    Map<String, Integer> taskStats = new HashMap<String, Integer>();
+    taskStats.put("FOO", 100);
+    taskStats.put("BAR", 200);
+    VertexStats vertexStats = new VertexStats();
+
+    VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, 
vertexName, 1,
+        initRequestedTime, initedTime, startRequestedTime, startTime, 
finishTime,
+        VertexState.ERROR, "diagnostics", null, vertexStats, taskStats,
+        new ServicePluginInfo().setContainerLauncherName("abc")
+            .setTaskSchedulerName("def").setTaskCommunicatorName("ghi")
+            .setContainerLauncherClassName("abc1")
+            .setTaskSchedulerClassName("def1")
+            .setTaskCommunicatorClassName("ghi1"));
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.VERTEX_FINISHED, finishTime,
+        EntityTypes.TEZ_VERTEX_ID, null, null, 9);
+
+    assertEventData(proto, ATSConstants.VERTEX_NAME, vertexName);
+    assertEventData(proto, ATSConstants.STATUS, VertexState.ERROR.name());
+
+    assertEventData(proto, ATSConstants.TIME_TAKEN, String.valueOf(finishTime 
- startTime));
+    assertEventData(proto, ATSConstants.DIAGNOSTICS, "diagnostics");
+    assertEventData(proto, ATSConstants.COUNTERS, null);
+    assertEventData(proto, ATSConstants.STATS, null);
+    assertEventData(proto, "FOO", "100");
+    assertEventData(proto, "BAR", "200");
+
+    assertEventData(proto, ATSConstants.SERVICE_PLUGIN, null);
+    /*
+    Assert.assertEquals("abc",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.CONTAINER_LAUNCHER_NAME));
+    Assert.assertEquals("def",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_SCHEDULER_NAME));
+    Assert.assertEquals("ghi",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_COMMUNICATOR_NAME));
+    Assert.assertEquals("abc1",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME));
+    Assert.assertEquals("def1",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_SCHEDULER_CLASS_NAME));
+    Assert.assertEquals("ghi1",
+        ((Map<String, 
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_COMMUNICATOR_CLASS_NAME));
+    */
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertTaskStartedEvent() {
+    long scheduleTime = random.nextLong();
+    long startTime = random.nextLong();
+    TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", 
scheduleTime, startTime);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.TASK_STARTED, startTime,
+        EntityTypes.TEZ_TASK_ID, null, null, 2);
+
+    assertEventData(proto, ATSConstants.SCHEDULED_TIME, 
String.valueOf(scheduleTime));
+    assertEventData(proto, ATSConstants.STATUS, TaskState.SCHEDULED.name());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertTaskAttemptStartedEvent() {
+    long startTime = random.nextLong();
+    TaskAttemptStartedEvent event = new 
TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
+        startTime, containerId, nodeId, "inProgressURL", "logsURL", 
"nodeHttpAddress");
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.TASK_ATTEMPT_STARTED, startTime,
+        EntityTypes.TEZ_TASK_ATTEMPT_ID, null, null, 6);
+
+    assertEventData(proto, ATSConstants.STATUS, 
TaskAttemptState.RUNNING.name());
+    assertEventData(proto, ATSConstants.IN_PROGRESS_LOGS_URL, "inProgressURL");
+    assertEventData(proto, ATSConstants.COMPLETED_LOGS_URL, "logsURL");
+    assertEventData(proto, ATSConstants.NODE_ID, nodeId.toString());
+    assertEventData(proto, ATSConstants.CONTAINER_ID, containerId.toString());
+    assertEventData(proto, ATSConstants.NODE_HTTP_ADDRESS, "nodeHttpAddress");
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertTaskFinishedEvent() {
+    String vertexName = "testVertexName";
+    long startTime = random.nextLong();
+    long finishTime = random.nextLong();
+    TaskState state = 
TaskState.values()[random.nextInt(TaskState.values().length)];
+    String diagnostics = "diagnostics message";
+    TezCounters counters = new TezCounters();
+
+    TaskFinishedEvent event = new TaskFinishedEvent(tezTaskID, vertexName, 
startTime, finishTime,
+        tezTaskAttemptID, state, diagnostics, counters, 3);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.TASK_FINISHED, finishTime,
+        EntityTypes.TEZ_TASK_ID, null, null, 6);
+
+    assertEventData(proto, ATSConstants.STATUS, state.name());
+    assertEventData(proto, ATSConstants.TIME_TAKEN, String.valueOf(finishTime 
- startTime));
+    assertEventData(proto, ATSConstants.SUCCESSFUL_ATTEMPT_ID, 
tezTaskAttemptID.toString());
+    assertEventData(proto, ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, "3");
+    assertEventData(proto, ATSConstants.DIAGNOSTICS, diagnostics);
+    assertEventData(proto, ATSConstants.COUNTERS, null);
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertVertexReconfigreDoneEvent() {
+    TezVertexID vId = tezVertexID;
+    Map<String, EdgeProperty> edgeMgrs =
+        new HashMap<String, EdgeProperty>();
+
+    edgeMgrs.put("a", 
EdgeProperty.create(EdgeManagerPluginDescriptor.create("a.class")
+        .setHistoryText("text"), DataSourceType.PERSISTED, 
SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("Out"), InputDescriptor.create("In")));
+    VertexConfigurationDoneEvent event = new VertexConfigurationDoneEvent(vId, 
0L, 1, null,
+        edgeMgrs, null, true);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.VERTEX_CONFIGURE_DONE, 0L,
+        EntityTypes.TEZ_VERTEX_ID, null, null, 2);
+    assertEventData(proto, ATSConstants.NUM_TASKS, "1");
+    assertEventData(proto, ATSConstants.UPDATED_EDGE_MANAGERS, null);
+
+    /*
+    Map<String, Object> updatedEdgeMgrs = (Map<String, Object>)
+        evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS);
+    Assert.assertEquals(1, updatedEdgeMgrs.size());
+    Assert.assertTrue(updatedEdgeMgrs.containsKey("a"));
+    Map<String, Object> updatedEdgeMgr = (Map<String, Object>) 
updatedEdgeMgrs.get("a");
+
+    Assert.assertEquals(DataMovementType.CUSTOM.name(),
+        updatedEdgeMgr.get(DAGUtils.DATA_MOVEMENT_TYPE_KEY));
+    Assert.assertEquals("In", 
updatedEdgeMgr.get(DAGUtils.EDGE_DESTINATION_CLASS_KEY));
+    Assert.assertEquals("a.class", 
updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY));
+    */
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGRecoveredEvent() {
+    long recoverTime = random.nextLong();
+    DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, 
tezDAGID,
+        dagPlan.getName(), user, recoverTime, containerLogs);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.DAG_RECOVERED, recoverTime,
+        EntityTypes.TEZ_DAG_ID, applicationAttemptId, user, 2);
+    assertEventData(proto, ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+        + applicationAttemptId.getAttemptId(), containerLogs);
+    assertEventData(proto, ATSConstants.DAG_NAME, dagPlan.getName());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGRecoveredEvent2() {
+    long recoverTime = random.nextLong();
+
+    DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, 
tezDAGID,
+        dagPlan.getName(), user, recoverTime, DAGState.ERROR, "mock reason", 
containerLogs);
+    HistoryEventProto proto = converter.convert(event);
+    assertCommon(proto, HistoryEventType.DAG_RECOVERED, recoverTime,
+        EntityTypes.TEZ_DAG_ID, applicationAttemptId, user, 4);
+    assertEventData(proto, ATSConstants.DAG_STATE, DAGState.ERROR.name());
+    assertEventData(proto, ATSConstants.RECOVERY_FAILURE_REASON, "mock 
reason");
+    assertEventData(proto, ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+        + applicationAttemptId.getAttemptId(), containerLogs);
+    assertEventData(proto, ATSConstants.DAG_NAME, dagPlan.getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
 
b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
new file mode 100644
index 0000000..4bd5d4e
--- /dev/null
+++ 
b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.VersionInfo;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import 
org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
+import 
org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestProtoHistoryLoggingService {
+  private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+  private static ApplicationAttemptId attemptId = 
ApplicationAttemptId.newInstance(appId, 1);
+  private static String user = "TEST_USER";
+  private Clock clock;
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Test
+  public void testService() throws Exception {
+    ProtoHistoryLoggingService service = createService();
+    service.start();
+    TezDAGID dagId = TezDAGID.getInstance(appId, 0);
+    List<HistoryEventProto> protos = new ArrayList<>();
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId, service)) {
+      protos.add(new 
HistoryEventProtoConverter().convert(event.getHistoryEvent()));
+      service.handle(event);
+    }
+    service.stop();
+
+    TezProtoLoggers loggers = new TezProtoLoggers();
+    Assert.assertTrue(loggers.setup(service.getConfig(), clock));
+
+    // Verify dag events are logged.
+    DatePartitionedLogger<HistoryEventProto> dagLogger = 
loggers.getDagEventsLogger();
+    Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), 
dagId.toString());
+    ProtoMessageReader<HistoryEventProto> reader = 
dagLogger.getReader(dagFilePath);
+    HistoryEventProto evt = reader.readEvent();
+    int ind = 1;
+    while (evt != null) {
+      Assert.assertEquals(protos.get(ind), evt);
+      ind++;
+      evt = reader.readEvent();
+    }
+    reader.close();
+
+    // Verify app events are logged.
+    DatePartitionedLogger<HistoryEventProto> appLogger = 
loggers.getAppEventsLogger();
+    Path appFilePath = appLogger.getPathForDate(LocalDate.ofEpochDay(0), 
attemptId.toString());
+    ProtoMessageReader<HistoryEventProto> appReader = 
appLogger.getReader(appFilePath);
+    long appOffset = appReader.getOffset();
+    Assert.assertEquals(protos.get(0), appReader.readEvent());
+    reader.close();
+
+    // Verify manifest events are logged.
+    DatePartitionedLogger<ManifestEntryProto> manifestLogger = 
loggers.getManifestEventsLogger();
+    Path manifestFilePath = manifestLogger.getPathForDate(
+        LocalDate.ofEpochDay(0), attemptId.toString());
+    ProtoMessageReader<ManifestEntryProto> reader2 = 
manifestLogger.getReader(manifestFilePath);
+    ManifestEntryProto manifest = reader2.readEvent();
+    Assert.assertEquals(appId.toString(), manifest.getAppId());
+    Assert.assertEquals(dagId.toString(), manifest.getDagId());
+    Assert.assertEquals(dagFilePath.toString(), manifest.getDagFilePath());
+    Assert.assertEquals(appFilePath.toString(), manifest.getAppFilePath());
+    Assert.assertEquals(appOffset, manifest.getAppLaunchedEventOffset());
+
+    // Verify offsets in manifest logger.
+    reader = dagLogger.getReader(new Path(manifest.getDagFilePath()));
+    reader.setOffset(manifest.getDagSubmittedEventOffset());
+    evt = reader.readEvent();
+    Assert.assertNotNull(evt);
+    Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), 
evt.getEventType());
+
+    reader.setOffset(manifest.getDagFinishedEventOffset());
+    evt = reader.readEvent();
+    Assert.assertNotNull(evt);
+    Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), 
evt.getEventType());
+
+    // Verify manifest file scanner.
+    DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger);
+    Assert.assertEquals(manifest, scanner.getNext());
+    Assert.assertNull(scanner.getNext());
+    scanner.close();
+  }
+
+  private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
+      ProtoHistoryLoggingService service) {
+    List<DAGHistoryEvent> historyEvents = new ArrayList<>();
+    DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+
+    long time = System.currentTimeMillis();
+    Configuration conf = new Configuration(service.getConfig());
+    historyEvents.add(new DAGHistoryEvent(null, new AppLaunchedEvent(appId, 
time, time, user, conf,
+        new VersionInfo("component", "1.1.0", "rev1", "20120101", 
"git.apache.org") {})));
+    historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, 
time,
+        DAGPlan.getDefaultInstance(), attemptId, null, user, conf, null, 
"default")));
+    TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
+    historyEvents.add(new DAGHistoryEvent(dagId, new 
VertexStartedEvent(vertexID, time, time)));
+    TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
+    historyEvents
+        .add(new DAGHistoryEvent(dagId, new TaskStartedEvent(tezTaskID, 
"test", time, time)));
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 
1), "test", time,
+            ContainerId.newContainerId(attemptId, 1), 
NodeId.newInstance("localhost", 8765), null,
+            null, null)));
+    historyEvents.add(new DAGHistoryEvent(dagId, new DAGFinishedEvent(dagId, 
time, time,
+        DAGState.ERROR, "diagnostics", null, user, dagPlan.getName(),
+        new HashMap<String, Integer>(), attemptId, dagPlan)));
+    return historyEvents;
+  }
+
+  private static class FixedClock implements Clock {
+    final Clock clock = new SystemClock();
+    final long diff;
+
+    public FixedClock(long startTime) {
+      diff = clock.getTime() - startTime;
+    }
+
+    @Override
+    public long getTime() {
+      return clock.getTime() - diff;
+    }
+  }
+
+  private ProtoHistoryLoggingService createService() throws IOException {
+    ProtoHistoryLoggingService service = new ProtoHistoryLoggingService();
+    clock = new FixedClock(0); // Start time is always first day, easier to 
write tests.
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getApplicationID()).thenReturn(appId);
+    when(appContext.getApplicationAttemptId()).thenReturn(attemptId);
+    when(appContext.getUser()).thenReturn(user);
+    when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
+    when(appContext.getClock()).thenReturn(clock);
+    service.setAppContext(appContext);
+    Configuration conf = new Configuration(false);
+    String basePath = tempFolder.newFolder().getAbsolutePath();
+    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR, basePath);
+    service.init(conf);
+    return service;
+  }
+}

Reply via email to