http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java index e17a4d4..95ff0cd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java @@ -64,7 +64,10 @@ public class HistoryEventHandler extends CompositeService { addService(historyLoggingService); if (recoveryEnabled) { - recoveryService = new RecoveryService(context); + String recoveryServiceClass = conf.get(TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS, + TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS_DEFAULT); + recoveryService = ReflectionUtils.createClazzInstance(recoveryServiceClass, + new Class[]{AppContext.class}, new Object[] {context}); addService(recoveryService); } super.serviceInit(conf);
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java index d791d9e..4e56e9f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java @@ -29,7 +29,7 @@ public enum HistoryEventType { DAG_KILL_REQUEST, VERTEX_INITIALIZED, VERTEX_STARTED, - VERTEX_PARALLELISM_UPDATED, + VERTEX_CONFIGURE_DONE, VERTEX_FINISHED, TASK_STARTED, TASK_FINISHED, @@ -37,7 +37,6 @@ public enum HistoryEventType { TASK_ATTEMPT_FINISHED, CONTAINER_LAUNCHED, CONTAINER_STOPPED, - VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, DAG_COMMIT_STARTED, VERTEX_COMMIT_STARTED, VERTEX_GROUP_COMMIT_STARTED, http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java b/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java new file mode 100644 index 0000000..bab713d --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.history; + +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.recovery.records.RecoveryProtos; +import org.apache.tez.runtime.api.impl.EventMetaData; + +public class RecoveryConverters { + + +} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java index 7d83db2..21b8719 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java @@ -34,10 +34,13 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.utils.TezEventUtils; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto; +import org.apache.tez.dag.recovery.records.RecoveryProtos.TezEventProto; +import org.apache.tez.runtime.api.impl.TezEvent; public class TaskAttemptFinishedEvent implements HistoryEvent { @@ -55,6 +58,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { private TezCounters tezCounters; private TaskAttemptTerminationCause error; private List<DataEventDependencyInfo> dataEvents; + private List<TezEvent> taGeneratedEvents; public TaskAttemptFinishedEvent(TezTaskAttemptID taId, String vertexName, @@ -63,7 +67,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { TaskAttemptState state, TaskAttemptTerminationCause error, String diagnostics, TezCounters counters, - List<DataEventDependencyInfo> dataEvents, + List<DataEventDependencyInfo> dataEvents, + List<TezEvent> taGeneratedEvents, long creationTime, TezTaskAttemptID creationCausalTA, long allocationTime) { @@ -79,6 +84,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.tezCounters = counters; this.error = error; this.dataEvents = dataEvents; + this.taGeneratedEvents = taGeneratedEvents; } public TaskAttemptFinishedEvent() { @@ -103,7 +109,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { return dataEvents; } - public TaskAttemptFinishedProto toProto() { + public TaskAttemptFinishedProto toProto() throws IOException { TaskAttemptFinishedProto.Builder builder = TaskAttemptFinishedProto.newBuilder(); builder.setTaskAttemptId(taskAttemptId.toString()) @@ -129,10 +135,15 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { builder.addDataEvents(DataEventDependencyInfo.toProto(info)); } } + if (taGeneratedEvents != null && !taGeneratedEvents.isEmpty()) { + for (TezEvent event : taGeneratedEvents) { + builder.addTaGeneratedEvents(TezEventUtils.toProto(event)); + } + } return builder.build(); } - public void fromProto(TaskAttemptFinishedProto proto) { + public void fromProto(TaskAttemptFinishedProto proto) throws IOException { this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId()); this.state = TaskAttemptState.values()[proto.getState()]; this.creationTime = proto.getCreationTime(); @@ -158,6 +169,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.dataEvents.add(DataEventDependencyInfo.fromProto(protoEvent)); } } + if (proto.getTaGeneratedEventsCount() > 0) { + this.taGeneratedEvents = Lists.newArrayListWithCapacity(proto.getTaGeneratedEventsCount()); + for (TezEventProto eventProto : proto.getTaGeneratedEventsList()) { + this.taGeneratedEvents.add(TezEventUtils.fromProto(eventProto)); + } + } } @Override @@ -236,4 +253,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { return creationCausalTA; } + public List<TezEvent> getTAGeneratedEvents() { + return taGeneratedEvents; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java new file mode 100644 index 0000000..4ad1c63 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.history.events; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.tez.dag.api.DagTypeConverters; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.VertexLocationHint; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto; +import org.apache.tez.dag.recovery.records.RecoveryProtos.RootInputSpecUpdateProto; +import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexConfigurationDoneProto; +import org.apache.tez.runtime.api.InputSpecUpdate; + +import com.google.common.collect.Maps; + +public class VertexConfigurationDoneEvent implements HistoryEvent { + + private TezVertexID vertexID; + private long reconfigureDoneTime; + private int numTasks; + private VertexLocationHint vertexLocationHint; + private Map<String, EdgeProperty> sourceEdgeProperties; + private Map<String, InputSpecUpdate> rootInputSpecUpdates; + private boolean setParallelismCalledFlag; + + public VertexConfigurationDoneEvent() { + } + + public VertexConfigurationDoneEvent(TezVertexID vertexID, + long reconfigureDoneTime, int numTasks, + VertexLocationHint vertexLocationHint, + Map<String, EdgeProperty> sourceEdgeProperties, + Map<String, InputSpecUpdate> rootInputSpecUpdates, + boolean setParallelismCalledFlag) { + super(); + this.vertexID = vertexID; + this.numTasks = numTasks; + this.vertexLocationHint = vertexLocationHint; + this.sourceEdgeProperties = sourceEdgeProperties; + this.rootInputSpecUpdates = rootInputSpecUpdates; + this.setParallelismCalledFlag = setParallelismCalledFlag; + } + + @Override + public HistoryEventType getEventType() { + return HistoryEventType.VERTEX_CONFIGURE_DONE; + } + + @Override + public boolean isRecoveryEvent() { + return true; + } + + @Override + public boolean isHistoryEvent() { + return true; + } + + public VertexConfigurationDoneProto toProto() { + VertexConfigurationDoneProto.Builder builder = + VertexConfigurationDoneProto.newBuilder(); + builder.setVertexId(vertexID.toString()) + .setReconfigureDoneTime(reconfigureDoneTime) + .setSetParallelismCalledFlag(setParallelismCalledFlag) + .setNumTasks(numTasks); + + if (vertexLocationHint != null) { + builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto( + this.vertexLocationHint)); + } + if (sourceEdgeProperties != null) { + for (Entry<String, EdgeProperty> entry : + sourceEdgeProperties.entrySet()) { + EdgeManagerDescriptorProto.Builder edgeMgrBuilder = + EdgeManagerDescriptorProto.newBuilder(); + edgeMgrBuilder.setEdgeName(entry.getKey()); + edgeMgrBuilder.setEdgeProperty(DagTypeConverters.convertToProto(entry.getValue())); + builder.addEdgeManagerDescriptors(edgeMgrBuilder.build()); + } + } + if (rootInputSpecUpdates != null) { + for (Entry<String, InputSpecUpdate> entry : rootInputSpecUpdates.entrySet()) { + RootInputSpecUpdateProto.Builder rootInputSpecUpdateBuilder = RootInputSpecUpdateProto + .newBuilder(); + rootInputSpecUpdateBuilder.setInputName(entry.getKey()); + rootInputSpecUpdateBuilder.setForAllWorkUnits(entry.getValue().isForAllWorkUnits()); + rootInputSpecUpdateBuilder.addAllNumPhysicalInputs(entry.getValue() + .getAllNumPhysicalInputs()); + builder.addRootInputSpecUpdates(rootInputSpecUpdateBuilder.build()); + } + } + return builder.build(); + } + + public void fromProto(VertexConfigurationDoneProto proto) { + this.vertexID = TezVertexID.fromString(proto.getVertexId()); + this.reconfigureDoneTime = proto.getReconfigureDoneTime(); + this.setParallelismCalledFlag = proto.getSetParallelismCalledFlag(); + this.numTasks = proto.getNumTasks(); + if (proto.hasVertexLocationHint()) { + this.vertexLocationHint = DagTypeConverters.convertVertexLocationHintFromProto( + proto.getVertexLocationHint()); + } + if (proto.getEdgeManagerDescriptorsCount() > 0) { + this.sourceEdgeProperties = new HashMap<String, EdgeProperty>( + proto.getEdgeManagerDescriptorsCount()); + for (EdgeManagerDescriptorProto edgeManagerProto : + proto.getEdgeManagerDescriptorsList()) { + EdgeProperty edgeProperty = + DagTypeConverters.convertFromProto( + edgeManagerProto.getEdgeProperty()); + sourceEdgeProperties.put(edgeManagerProto.getEdgeName(), + edgeProperty); + } + } + if (proto.getRootInputSpecUpdatesCount() > 0) { + this.rootInputSpecUpdates = Maps.newHashMap(); + for (RootInputSpecUpdateProto rootInputSpecUpdateProto : proto.getRootInputSpecUpdatesList()) { + InputSpecUpdate specUpdate; + if (rootInputSpecUpdateProto.getForAllWorkUnits()) { + specUpdate = InputSpecUpdate + .createAllTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputs(0)); + } else { + specUpdate = InputSpecUpdate + .createPerTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputsList()); + } + this.rootInputSpecUpdates.put(rootInputSpecUpdateProto.getInputName(), specUpdate); + } + } + } + + @Override + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); + } + + @Override + public void fromProtoStream(InputStream inputStream) throws IOException { + VertexConfigurationDoneProto proto = VertexConfigurationDoneProto.parseDelimitedFrom(inputStream); + if (proto == null) { + throw new IOException("No data found in stream"); + } + fromProto(proto); + } + + @Override + public String toString() { + return "vertexId=" + vertexID + + ", reconfigureDoneTime=" + reconfigureDoneTime + + ", numTasks=" + numTasks + + ", vertexLocationHint=" + + (vertexLocationHint == null? "null" : vertexLocationHint) + + ", edgeManagersCount=" + + (sourceEdgeProperties == null? "null" : sourceEdgeProperties.size()) + + ", rootInputSpecUpdateCount=" + + (rootInputSpecUpdates == null ? "null" : rootInputSpecUpdates.size()) + + ", setParallelismCalledFlag=" + setParallelismCalledFlag; + } + + public TezVertexID getVertexID() { + return this.vertexID; + } + + public int getNumTasks() { + return numTasks; + } + + public VertexLocationHint getVertexLocationHint() { + return vertexLocationHint; + } + + public Map<String, EdgeProperty> getSourceEdgeProperties() { + return sourceEdgeProperties; + } + + public Map<String, InputSpecUpdate> getRootInputSpecUpdates() { + return rootInputSpecUpdates; + } + + public long getReconfigureDoneTime() { + return reconfigureDoneTime; + } + + public boolean isSetParallelismCalled() { + return setParallelismCalledFlag; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java index 8947b5f..ec8f3e1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java @@ -21,28 +21,35 @@ package org.apache.tez.dag.history.events; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Collection; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos; import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitFinishedProto; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; + public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEvent { private TezDAGID dagID; private String vertexGroupName; + private Collection<TezVertexID> vertexIds; private long commitFinishTime; public VertexGroupCommitFinishedEvent() { } public VertexGroupCommitFinishedEvent(TezDAGID dagID, - String vertexGroupName, long commitFinishTime) { + String vertexGroupName, Collection<TezVertexID> vertexIds, long commitFinishTime) { this.dagID = dagID; this.vertexGroupName = vertexGroupName; + this.vertexIds = vertexIds; this.commitFinishTime = commitFinishTime; } @@ -62,15 +69,28 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven } public VertexGroupCommitFinishedProto toProto() { + Collection<String> vertexIdsStr = Collections2.transform(vertexIds, new Function<TezVertexID, String>(){ + @Override + public String apply(TezVertexID vertexId) { + return vertexId.toString(); + } + }); return VertexGroupCommitFinishedProto.newBuilder() .setDagId(dagID.toString()) .setVertexGroupName(vertexGroupName) + .addAllVertexIds(vertexIdsStr) .build(); } public void fromProto(VertexGroupCommitFinishedProto proto) { this.dagID = TezDAGID.fromString(proto.getDagId()); this.vertexGroupName = proto.getVertexGroupName(); + this.vertexIds = Collections2.transform(proto.getVertexIdsList(), new Function<String, TezVertexID>() { + @Override + public TezVertexID apply(String input) { + return TezVertexID.fromString(input); + } + }); } @Override @@ -124,4 +144,8 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven return dagID; } + public Collection<TezVertexID> getVertexIds() { + return vertexIds; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java index c388957..3de355c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java @@ -21,28 +21,35 @@ package org.apache.tez.dag.history.events; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Collection; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos; import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitStartedProto; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; + public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent { private TezDAGID dagID; private String vertexGroupName; + private Collection<TezVertexID> vertexIds; private long commitStartTime; public VertexGroupCommitStartedEvent() { } public VertexGroupCommitStartedEvent(TezDAGID dagID, - String vertexGroupName, long commitStartTime) { + String vertexGroupName, Collection<TezVertexID> vertexIds, long commitStartTime) { this.dagID = dagID; this.vertexGroupName = vertexGroupName; + this.vertexIds = vertexIds; this.commitStartTime = commitStartTime; } @@ -62,15 +69,28 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent } public VertexGroupCommitStartedProto toProto() { + Collection<String> vertexIdsStr = Collections2.transform(vertexIds, new Function<TezVertexID, String>(){ + @Override + public String apply(TezVertexID vertexId) { + return vertexId.toString(); + } + }); return VertexGroupCommitStartedProto.newBuilder() .setDagId(dagID.toString()) .setVertexGroupName(vertexGroupName) + .addAllVertexIds(vertexIdsStr) .build(); } public void fromProto(VertexGroupCommitStartedProto proto) { this.dagID = TezDAGID.fromString(proto.getDagId()); this.vertexGroupName = proto.getVertexGroupName(); + this.vertexIds = Collections2.transform(proto.getVertexIdsList(), new Function<String, TezVertexID>() { + @Override + public TezVertexID apply(String input) { + return TezVertexID.fromString(input); + } + }); } @Override @@ -124,4 +144,7 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent return dagID; } + public Collection<TezVertexID> getVertexIds() { + return vertexIds; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java index 01e0d3c..052908b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import org.apache.tez.dag.api.DagTypeConverters; @@ -31,9 +32,14 @@ import org.apache.tez.dag.api.RootInputLeafOutput; import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.utils.TezEventUtils; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos; +import org.apache.tez.dag.recovery.records.RecoveryProtos.TezEventProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexInitializedProto; +import org.apache.tez.runtime.api.impl.TezEvent; + +import com.google.common.collect.Lists; public class VertexInitializedEvent implements HistoryEvent { @@ -44,6 +50,7 @@ public class VertexInitializedEvent implements HistoryEvent { private int numTasks; private String processorName; private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs; + private List<TezEvent> initGeneratedEvents; public VertexInitializedEvent() { } @@ -51,7 +58,8 @@ public class VertexInitializedEvent implements HistoryEvent { public VertexInitializedEvent(TezVertexID vertexId, String vertexName, long initRequestedTime, long initedTime, int numTasks, String processorName, - Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs) { + Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs, + List<TezEvent> initGeneratedEvents) { this.vertexName = vertexName; this.vertexID = vertexId; this.initRequestedTime = initRequestedTime; @@ -59,6 +67,7 @@ public class VertexInitializedEvent implements HistoryEvent { this.numTasks = numTasks; this.processorName = processorName; this.additionalInputs = additionalInputs; + this.initGeneratedEvents = initGeneratedEvents; } @Override @@ -76,7 +85,7 @@ public class VertexInitializedEvent implements HistoryEvent { return true; } - public RecoveryProtos.VertexInitializedProto toProto() { + public RecoveryProtos.VertexInitializedProto toProto() throws IOException { VertexInitializedProto.Builder builder = VertexInitializedProto.newBuilder(); if (additionalInputs != null && !additionalInputs.isEmpty()) { @@ -94,6 +103,11 @@ public class VertexInitializedEvent implements HistoryEvent { builder.addInputs(inputBuilder.build()); } } + if (initGeneratedEvents != null && !initGeneratedEvents.isEmpty()) { + for (TezEvent event : initGeneratedEvents) { + builder.addInitGeneratedEvents(TezEventUtils.toProto(event)); + } + } return builder.setVertexId(vertexID.toString()) .setVertexName(vertexName) .setInitRequestedTime(initRequestedTime) @@ -102,7 +116,7 @@ public class VertexInitializedEvent implements HistoryEvent { .build(); } - public void fromProto(RecoveryProtos.VertexInitializedProto proto) { + public void fromProto(RecoveryProtos.VertexInitializedProto proto) throws IOException { this.vertexID = TezVertexID.fromString(proto.getVertexId()); this.vertexName = proto.getVertexName(); this.initRequestedTime = proto.getInitRequestedTime(); @@ -123,6 +137,14 @@ public class VertexInitializedEvent implements HistoryEvent { additionalInputs.put(input.getName(), input); } } + int eventCount = proto.getInitGeneratedEventsCount(); + if (eventCount > 0) { + this.initGeneratedEvents = Lists.newArrayListWithCapacity(eventCount); + } + for (TezEventProto eventProto : + proto.getInitGeneratedEventsList()) { + this.initGeneratedEvents.add(TezEventUtils.fromProto(eventProto)); + } } @Override @@ -149,7 +171,9 @@ public class VertexInitializedEvent implements HistoryEvent { + ", numTasks=" + numTasks + ", processorName=" + processorName + ", additionalInputsCount=" - + (additionalInputs != null ? additionalInputs.size() : 0); + + (additionalInputs != null ? additionalInputs.size() : 0) + + ", initGeneratedEventsCount=" + + (initGeneratedEvents != null ? initGeneratedEvents.size() : 0); } public TezVertexID getVertexID() { @@ -181,4 +205,7 @@ public class VertexInitializedEvent implements HistoryEvent { return vertexName; } + public List<TezEvent> getInitGeneratedEvents() { + return initGeneratedEvents; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java deleted file mode 100644 index 456e2a5..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.events; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.tez.dag.api.DagTypeConverters; -import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.VertexLocationHint; -import org.apache.tez.dag.history.HistoryEvent; -import org.apache.tez.dag.history.HistoryEventType; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto; -import org.apache.tez.dag.recovery.records.RecoveryProtos.RootInputSpecUpdateProto; -import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexParallelismUpdatedProto; -import org.apache.tez.runtime.api.InputSpecUpdate; - -import com.google.common.collect.Maps; - -public class VertexParallelismUpdatedEvent implements HistoryEvent { - - private TezVertexID vertexID; - private int numTasks; - private int oldNumTasks; - private VertexLocationHint vertexLocationHint; - private Map<String, EdgeProperty> sourceEdgeProperties; - private Map<String, InputSpecUpdate> rootInputSpecUpdates; - private long updateTime; - - public VertexParallelismUpdatedEvent() { - } - - public VertexParallelismUpdatedEvent(TezVertexID vertexID, - int numTasks, VertexLocationHint vertexLocationHint, - Map<String, EdgeProperty> sourceEdgeProperties, - Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldNumTasks) { - this.vertexID = vertexID; - this.numTasks = numTasks; - this.vertexLocationHint = vertexLocationHint; - this.sourceEdgeProperties = sourceEdgeProperties; - this.rootInputSpecUpdates = rootInputSpecUpdates; - this.updateTime = System.currentTimeMillis(); - this.oldNumTasks = oldNumTasks; - } - - @Override - public HistoryEventType getEventType() { - return HistoryEventType.VERTEX_PARALLELISM_UPDATED; - } - - @Override - public boolean isRecoveryEvent() { - return true; - } - - @Override - public boolean isHistoryEvent() { - return true; - } - - public VertexParallelismUpdatedProto toProto() { - VertexParallelismUpdatedProto.Builder builder = - VertexParallelismUpdatedProto.newBuilder(); - builder.setVertexId(vertexID.toString()) - .setNumTasks(numTasks); - if (vertexLocationHint != null) { - builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto( - this.vertexLocationHint)); - } - if (sourceEdgeProperties != null) { - for (Entry<String, EdgeProperty> entry : - sourceEdgeProperties.entrySet()) { - EdgeManagerDescriptorProto.Builder edgeMgrBuilder = - EdgeManagerDescriptorProto.newBuilder(); - edgeMgrBuilder.setEdgeName(entry.getKey()); - edgeMgrBuilder.setEdgeProperty(DagTypeConverters.convertToProto(entry.getValue())); - builder.addEdgeManagerDescriptors(edgeMgrBuilder.build()); - } - } - if (rootInputSpecUpdates != null) { - for (Entry<String, InputSpecUpdate> entry : rootInputSpecUpdates.entrySet()) { - RootInputSpecUpdateProto.Builder rootInputSpecUpdateBuilder = RootInputSpecUpdateProto - .newBuilder(); - rootInputSpecUpdateBuilder.setInputName(entry.getKey()); - rootInputSpecUpdateBuilder.setForAllWorkUnits(entry.getValue().isForAllWorkUnits()); - rootInputSpecUpdateBuilder.addAllNumPhysicalInputs(entry.getValue() - .getAllNumPhysicalInputs()); - builder.addRootInputSpecUpdates(rootInputSpecUpdateBuilder.build()); - } - } - return builder.build(); - } - - public void fromProto(VertexParallelismUpdatedProto proto) { - this.vertexID = TezVertexID.fromString(proto.getVertexId()); - this.numTasks = proto.getNumTasks(); - if (proto.hasVertexLocationHint()) { - this.vertexLocationHint = DagTypeConverters.convertVertexLocationHintFromProto( - proto.getVertexLocationHint()); - } - if (proto.getEdgeManagerDescriptorsCount() > 0) { - this.sourceEdgeProperties = new HashMap<String, EdgeProperty>( - proto.getEdgeManagerDescriptorsCount()); - for (EdgeManagerDescriptorProto edgeManagerProto : - proto.getEdgeManagerDescriptorsList()) { - EdgeProperty edgeProperty = - DagTypeConverters.convertFromProto( - edgeManagerProto.getEdgeProperty()); - sourceEdgeProperties.put(edgeManagerProto.getEdgeName(), - edgeProperty); - } - } - if (proto.getRootInputSpecUpdatesCount() > 0) { - this.rootInputSpecUpdates = Maps.newHashMap(); - for (RootInputSpecUpdateProto rootInputSpecUpdateProto : proto.getRootInputSpecUpdatesList()) { - InputSpecUpdate specUpdate; - if (rootInputSpecUpdateProto.getForAllWorkUnits()) { - specUpdate = InputSpecUpdate - .createAllTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputs(0)); - } else { - specUpdate = InputSpecUpdate - .createPerTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputsList()); - } - this.rootInputSpecUpdates.put(rootInputSpecUpdateProto.getInputName(), specUpdate); - } - } - } - - @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); - } - - @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - VertexParallelismUpdatedProto proto = VertexParallelismUpdatedProto.parseDelimitedFrom(inputStream); - if (proto == null) { - throw new IOException("No data found in stream"); - } - fromProto(proto); - } - - @Override - public String toString() { - return "vertexId=" + vertexID - + ", numTasks=" + numTasks - + ", vertexLocationHint=" + - (vertexLocationHint == null? "null" : vertexLocationHint) - + ", edgeManagersCount=" + - (sourceEdgeProperties == null? "null" : sourceEdgeProperties.size() - + ", rootInputSpecUpdateCount=" - + (rootInputSpecUpdates == null ? "null" : rootInputSpecUpdates.size())); - } - - public TezVertexID getVertexID() { - return this.vertexID; - } - - public int getNumTasks() { - return numTasks; - } - - public VertexLocationHint getVertexLocationHint() { - return vertexLocationHint; - } - - public Map<String, EdgeProperty> getSourceEdgeProperties() { - return sourceEdgeProperties; - } - - public Map<String, InputSpecUpdate> getRootInputSpecUpdates() { - return rootInputSpecUpdates; - } - - public long getUpdateTime() { - return updateTime; - } - - public int getOldNumTasks() { - return oldNumTasks; - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java deleted file mode 100644 index 6f44f33..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.events; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.EnumSet; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.tez.common.ProtoConverters; -import org.apache.tez.dag.history.HistoryEvent; -import org.apache.tez.dag.history.HistoryEventType; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.dag.recovery.records.RecoveryProtos; -import org.apache.tez.dag.recovery.records.RecoveryProtos.TezDataMovementEventProto; -import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexDataMovementEventsGeneratedProto; -import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; -import org.apache.tez.runtime.api.events.DataMovementEvent; -import org.apache.tez.runtime.api.events.InputDataInformationEvent; -import org.apache.tez.runtime.api.events.InputInitializerEvent; -import org.apache.tez.runtime.api.impl.EventMetaData; -import org.apache.tez.runtime.api.impl.EventType; -import org.apache.tez.runtime.api.impl.TezEvent; - -import com.google.common.collect.Lists; - -// TODO PreCommit - rename this to VertexRecoverableEventGeneratedEvent -public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent { - - private static final Logger LOG = LoggerFactory.getLogger( - VertexRecoverableEventsGeneratedEvent.class); - private List<TezEvent> events; - private TezVertexID vertexID; - - public VertexRecoverableEventsGeneratedEvent(TezVertexID vertexID, - List<TezEvent> events) { - this.vertexID = vertexID; - this.events = Lists.newArrayListWithCapacity(events.size()); - for (TezEvent event : events) { - if (EnumSet.of(EventType.DATA_MOVEMENT_EVENT, - EventType.COMPOSITE_DATA_MOVEMENT_EVENT, - EventType.ROOT_INPUT_DATA_INFORMATION_EVENT, - EventType.ROOT_INPUT_INITIALIZER_EVENT) - .contains(event.getEventType())) { - this.events.add(event); - } - } - if (events.isEmpty()) { - throw new RuntimeException("Invalid creation of VertexDataMovementEventsGeneratedEvent" - + ", no data movement/information events provided"); - } - } - - public VertexRecoverableEventsGeneratedEvent() { - } - - @Override - public HistoryEventType getEventType() { - return HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED; - } - - @Override - public boolean isRecoveryEvent() { - return true; - } - - @Override - public boolean isHistoryEvent() { - return false; - } - - static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto( - EventMetaData eventMetaData) { - RecoveryProtos.EventMetaDataProto.Builder builder = - RecoveryProtos.EventMetaDataProto.newBuilder() - .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal()) - .setEdgeVertexName(eventMetaData.getEdgeVertexName()) - .setTaskVertexName(eventMetaData.getTaskVertexName()); - if (eventMetaData.getTaskAttemptID() != null) { - builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString()); - } - return builder.build(); - } - - static EventMetaData convertEventMetaDataFromProto( - RecoveryProtos.EventMetaDataProto proto) { - TezTaskAttemptID attemptID = null; - if (proto.hasTaskAttemptId()) { - attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId()); - } - return new EventMetaData( - EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()], - proto.getTaskVertexName(), - proto.getEdgeVertexName(), - attemptID); - } - - public VertexDataMovementEventsGeneratedProto toProto() { - List<TezDataMovementEventProto> tezEventProtos = null; - if (events != null) { - tezEventProtos = Lists.newArrayListWithCapacity(events.size()); - for (TezEvent event : events) { - TezDataMovementEventProto.Builder evtBuilder = - TezDataMovementEventProto.newBuilder(); - if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) { - evtBuilder.setCompositeDataMovementEvent( - ProtoConverters.convertCompositeDataMovementEventToProto( - (CompositeDataMovementEvent) event.getEvent())); - } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) { - evtBuilder.setDataMovementEvent( - ProtoConverters.convertDataMovementEventToProto( - (DataMovementEvent) event.getEvent())); - } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) { - evtBuilder.setRootInputDataInformationEvent( - ProtoConverters.convertRootInputDataInformationEventToProto( - (InputDataInformationEvent) event.getEvent())); - } else if (event.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) { - evtBuilder.setInputInitializerEvent(ProtoConverters - .convertRootInputInitializerEventToProto((InputInitializerEvent) event.getEvent())); - } - if (event.getSourceInfo() != null) { - evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo())); - } - if (event.getDestinationInfo() != null) { - evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo())); - } - evtBuilder.setEventTime(event.getEventReceivedTime()); - tezEventProtos.add(evtBuilder.build()); - } - } - return VertexDataMovementEventsGeneratedProto.newBuilder() - .setVertexId(vertexID.toString()) - .addAllTezDataMovementEvent(tezEventProtos) - .build(); - } - - public void fromProto(VertexDataMovementEventsGeneratedProto proto) { - this.vertexID = TezVertexID.fromString(proto.getVertexId()); - int eventCount = proto.getTezDataMovementEventCount(); - if (eventCount > 0) { - this.events = Lists.newArrayListWithCapacity(eventCount); - } - for (TezDataMovementEventProto eventProto : - proto.getTezDataMovementEventList()) { - Event evt = null; - if (eventProto.hasCompositeDataMovementEvent()) { - evt = ProtoConverters.convertCompositeDataMovementEventFromProto( - eventProto.getCompositeDataMovementEvent()); - } else if (eventProto.hasDataMovementEvent()) { - evt = ProtoConverters.convertDataMovementEventFromProto( - eventProto.getDataMovementEvent()); - } else if (eventProto.hasRootInputDataInformationEvent()) { - evt = ProtoConverters.convertRootInputDataInformationEventFromProto( - eventProto.getRootInputDataInformationEvent()); - } else if (eventProto.hasInputInitializerEvent()) { - evt = ProtoConverters.convertRootInputInitializerEventFromProto( - eventProto.getInputInitializerEvent()); - } - EventMetaData sourceInfo = null; - EventMetaData destinationInfo = null; - if (eventProto.hasSourceInfo()) { - sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo()); - } - if (eventProto.hasDestinationInfo()) { - destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo()); - } - TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime()); - tezEvent.setDestinationInfo(destinationInfo); - this.events.add(tezEvent); - } - } - - @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); - } - - @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - VertexDataMovementEventsGeneratedProto proto = - VertexDataMovementEventsGeneratedProto.parseDelimitedFrom(inputStream); - if (proto == null) { - throw new IOException("No data found in stream"); - } - fromProto(proto); - } - - @Override - public String toString() { - return "vertexId=" + vertexID.toString() - + ", eventCount=" + (events != null ? events.size() : "null"); - - } - - public TezVertexID getVertexID() { - return this.vertexID; - } - - public List<TezEvent> getTezEvents() { - return this.events; - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index bf63045..c4e7e5b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -42,7 +42,7 @@ import org.apache.tez.dag.history.events.TaskFinishedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexInitializedEvent; -import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; +import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.history.utils.DAGUtils; @@ -108,13 +108,12 @@ public class HistoryEventJsonConversion { case TASK_ATTEMPT_FINISHED: jsonObject = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent); break; - case VERTEX_PARALLELISM_UPDATED: - jsonObject = convertVertexParallelismUpdatedEvent((VertexParallelismUpdatedEvent) historyEvent); + case VERTEX_CONFIGURE_DONE: + jsonObject = convertVertexReconfigureDoneEvent((VertexConfigurationDoneEvent) historyEvent); break; case DAG_RECOVERED: jsonObject = convertDAGRecoveredEvent((DAGRecoveredEvent) historyEvent); break; - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: case VERTEX_COMMIT_STARTED: case VERTEX_GROUP_COMMIT_STARTED: case VERTEX_GROUP_COMMIT_FINISHED: @@ -662,7 +661,6 @@ public class HistoryEventJsonConversion { JSONObject otherInfo = new JSONObject(); otherInfo.put(ATSConstants.START_TIME, event.getStartTime()); otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime()); - jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; @@ -705,6 +703,41 @@ public class HistoryEventJsonConversion { return jsonObject; } + private static JSONObject convertVertexReconfigureDoneEvent(VertexConfigurationDoneEvent event) throws JSONException { + JSONObject jsonObject = new JSONObject(); + jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString()); + jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name()); + + // Events + JSONArray events = new JSONArray(); + JSONObject updateEvent = new JSONObject(); + updateEvent.put(ATSConstants.TIMESTAMP, event.getReconfigureDoneTime()); + updateEvent.put(ATSConstants.EVENT_TYPE, + HistoryEventType.VERTEX_CONFIGURE_DONE.name()); + + JSONObject eventInfo = new JSONObject(); + eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks()); + if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) { + JSONObject updatedEdgeManagers = new JSONObject(); + for (Entry<String, EdgeProperty> entry : + event.getSourceEdgeProperties().entrySet()) { + updatedEdgeManagers.put(entry.getKey(), + new JSONObject(DAGUtils.convertEdgeProperty(entry.getValue()))); + } + eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers); + } + updateEvent.put(ATSConstants.EVENT_INFO, eventInfo); + events.put(updateEvent); + jsonObject.put(ATSConstants.EVENTS, events); + + // Other info + JSONObject otherInfo = new JSONObject(); + jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); + + // TODO add more on all other updated information + return jsonObject; + } + private static JSONObject convertVertexInitializedEvent(VertexInitializedEvent event) throws JSONException { JSONObject jsonObject = new JSONObject(); jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString()); @@ -773,42 +806,4 @@ public class HistoryEventJsonConversion { return jsonObject; } - private static JSONObject convertVertexParallelismUpdatedEvent( - VertexParallelismUpdatedEvent event) throws JSONException { - JSONObject jsonObject = new JSONObject(); - jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString()); - jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name()); - - // Events - JSONArray events = new JSONArray(); - JSONObject updateEvent = new JSONObject(); - updateEvent.put(ATSConstants.TIMESTAMP, event.getUpdateTime()); - updateEvent.put(ATSConstants.EVENT_TYPE, - HistoryEventType.VERTEX_PARALLELISM_UPDATED.name()); - - JSONObject eventInfo = new JSONObject(); - eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks()); - eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks()); - if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) { - JSONObject updatedEdgeManagers = new JSONObject(); - for (Entry<String, EdgeProperty> entry : - event.getSourceEdgeProperties().entrySet()) { - updatedEdgeManagers.put(entry.getKey(), - new JSONObject(DAGUtils.convertEdgeProperty(entry.getValue()))); - } - eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers); - } - updateEvent.put(ATSConstants.EVENT_INFO, eventInfo); - events.put(updateEvent); - jsonObject.put(ATSConstants.EVENTS, events); - - // Other info - JSONObject otherInfo = new JSONObject(); - otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks()); - jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); - - // TODO add more on all other updated information - return jsonObject; - } - } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index 585050d..fed4f3d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -50,7 +50,7 @@ import com.google.common.annotations.VisibleForTesting; public class RecoveryService extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(RecoveryService.class); - private final AppContext appContext; + protected final AppContext appContext; public static final String RECOVERY_FATAL_OCCURRED_DIR = "RecoveryFatalErrorOccurred"; @@ -73,7 +73,7 @@ public class RecoveryService extends AbstractService { private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>(); private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>(); - private Thread eventHandlingThread; + public Thread eventHandlingThread; private AtomicBoolean stopped = new AtomicBoolean(false); private AtomicBoolean started = new AtomicBoolean(false); private int eventCounter = 0; @@ -374,7 +374,7 @@ public class RecoveryService extends AbstractService { } } - private void handleSummaryEvent(TezDAGID dagID, + protected void handleSummaryEvent(TezDAGID dagID, HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException { if (LOG.isDebugEnabled()) { @@ -506,4 +506,8 @@ public class RecoveryService extends AbstractService { Thread.yield(); } } + + public void setStopped(boolean stopped) { + this.stopped.set(stopped); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java new file mode 100644 index 0000000..cc89b9f --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.history.utils; + +import java.io.IOException; +import org.apache.tez.common.ProtoConverters; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.recovery.records.RecoveryProtos; +import org.apache.tez.dag.recovery.records.RecoveryProtos.TezEventProto; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.apache.tez.runtime.api.events.InputInitializerEvent; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.EventType; +import org.apache.tez.runtime.api.impl.TezEvent; + +public class TezEventUtils { + + public static TezEventProto toProto(TezEvent event) throws IOException { + TezEventProto.Builder evtBuilder = + TezEventProto.newBuilder(); + if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) { + evtBuilder.setCompositeDataMovementEvent( + ProtoConverters.convertCompositeDataMovementEventToProto( + (CompositeDataMovementEvent) event.getEvent())); + } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) { + evtBuilder.setDataMovementEvent( + ProtoConverters.convertDataMovementEventToProto( + (DataMovementEvent) event.getEvent())); + } else if (event.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) { + evtBuilder.setInputInitializerEvent(ProtoConverters + .convertRootInputInitializerEventToProto((InputInitializerEvent) event.getEvent())); + } else if (event.getEventType().equals(EventType.VERTEX_MANAGER_EVENT)) { + evtBuilder.setVmEvent(ProtoConverters + .convertVertexManagerEventToProto((VertexManagerEvent)event.getEvent())); + } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) { + evtBuilder.setRootInputDataInformationEvent( + ProtoConverters.convertRootInputDataInformationEventToProto( + (InputDataInformationEvent) event.getEvent())); + } else { + throw new IOException("Unsupported TezEvent type:" + event.getEventType()); + } + + if (event.getSourceInfo() != null) { + evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo())); + } + if (event.getDestinationInfo() != null) { + evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo())); + } + evtBuilder.setEventTime(event.getEventReceivedTime()); + return evtBuilder.build(); + } + + public static TezEvent fromProto(TezEventProto eventProto) throws IOException { + Event evt = null; + if (eventProto.hasCompositeDataMovementEvent()) { + evt = ProtoConverters.convertCompositeDataMovementEventFromProto( + eventProto.getCompositeDataMovementEvent()); + } else if (eventProto.hasDataMovementEvent()) { + evt = ProtoConverters.convertDataMovementEventFromProto( + eventProto.getDataMovementEvent()); + } else if (eventProto.hasInputInitializerEvent()) { + evt = ProtoConverters.convertRootInputInitializerEventFromProto( + eventProto.getInputInitializerEvent()); + } else if (eventProto.hasVmEvent()) { + evt = ProtoConverters.convertVertexManagerEventFromProto( + eventProto.getVmEvent()); + } else if (eventProto.hasRootInputDataInformationEvent()) { + evt = ProtoConverters.convertRootInputDataInformationEventFromProto( + eventProto.getRootInputDataInformationEvent()); + } else { + throw new IOException("Unsupported TezEvent type"); + } + + EventMetaData sourceInfo = null; + EventMetaData destinationInfo = null; + if (eventProto.hasSourceInfo()) { + sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo()); + } + if (eventProto.hasDestinationInfo()) { + destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo()); + } + TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime()); + tezEvent.setDestinationInfo(destinationInfo); + return tezEvent; + } + + public static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto( + EventMetaData eventMetaData) { + RecoveryProtos.EventMetaDataProto.Builder builder = + RecoveryProtos.EventMetaDataProto.newBuilder() + .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal()) + .setEdgeVertexName(eventMetaData.getEdgeVertexName()) + .setTaskVertexName(eventMetaData.getTaskVertexName()); + if (eventMetaData.getTaskAttemptID() != null) { + builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString()); + } + return builder.build(); + } + + public static EventMetaData convertEventMetaDataFromProto( + RecoveryProtos.EventMetaDataProto proto) { + TezTaskAttemptID attemptID = null; + if (proto.hasTaskAttemptId()) { + attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId()); + } + return new EventMetaData( + EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()], + proto.getTaskVertexName(), + proto.getEdgeVertexName(), + attemptID); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index 5392d8a..b9e4507 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -90,6 +90,8 @@ message VertexInitializedProto { optional int64 init_time = 4; optional int32 num_tasks = 5; repeated RootInputLeafOutputProto inputs = 6; + repeated TezEventProto init_generated_events = 7; + optional bool isReconfigurePlanned = 8; } message VertexStartedProto { @@ -99,6 +101,16 @@ message VertexStartedProto { optional int64 start_time = 4; } +message VertexConfigurationDoneProto { + optional string vertex_id = 1; + optional int64 reconfigure_done_time = 2; + optional int32 num_tasks = 3; + optional VertexLocationHintProto vertex_location_hint = 4; + repeated EdgeManagerDescriptorProto edge_manager_descriptors = 5; + repeated RootInputSpecUpdateProto root_input_spec_updates = 6; + optional bool setParallelismCalled_flag = 7; +} + message EdgeManagerDescriptorProto { optional string edge_name = 1; optional PlanEdgeProperty edge_property = 2; @@ -110,14 +122,6 @@ message RootInputSpecUpdateProto { repeated int32 num_physical_inputs = 3; } -message VertexParallelismUpdatedProto { - optional string vertex_id = 1; - optional int32 num_tasks = 2; - optional VertexLocationHintProto vertex_location_hint = 3; - repeated EdgeManagerDescriptorProto edge_manager_descriptors = 4; - repeated RootInputSpecUpdateProto root_input_spec_updates = 5; -} - message VertexCommitStartedProto { optional string vertex_id = 1; } @@ -129,11 +133,13 @@ message VertexCommitFinishedProto { message VertexGroupCommitStartedProto { optional string dag_id = 1; optional string vertex_group_name = 2; + repeated string vertex_ids = 3; } message VertexGroupCommitFinishedProto { optional string dag_id = 1; optional string vertex_group_name = 2; + repeated string vertex_ids = 3; } message VertexFinishedProto { @@ -183,6 +189,7 @@ message TaskAttemptFinishedProto { optional TezCountersProto counters = 9; optional string error_enum = 10; repeated DataEventDependencyInfoProto data_events = 11; + repeated TezEventProto ta_generated_events = 12; } message EventMetaDataProto { @@ -192,7 +199,7 @@ message EventMetaDataProto { optional string task_attempt_id = 4; } -message TezDataMovementEventProto { +message TezEventProto { optional EventMetaDataProto source_info = 1; optional EventMetaDataProto destination_info = 2; optional DataMovementEventProto data_movement_event = 3; @@ -200,11 +207,7 @@ message TezDataMovementEventProto { optional RootInputDataInformationEventProto root_input_data_information_event = 5; optional RootInputInitializerEventProto input_initializer_event = 6; optional int64 event_time = 7; -} - -message VertexDataMovementEventsGeneratedProto { - optional string vertex_id = 1; - repeated TezDataMovementEventProto tez_data_movement_event = 2; + optional VertexManagerEventProto vm_event = 8; } message SummaryEventProto { http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java index 521ed50..0159372 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java @@ -33,12 +33,7 @@ public class TestVertexStatusBuilder { VertexStatusBuilder.getProtoState(state); VertexStatus.State clientState = VertexStatus.getState(stateProto); - if (state.equals(VertexState.RECOVERING)) { - Assert.assertEquals(clientState.name(), - State.NEW.name()); - } else { - Assert.assertEquals(state.name(), clientState.name()); - } + Assert.assertEquals(state.name(), clientState.name()); } }
