Repository: tez Updated Branches: refs/heads/master b6ce703c6 -> c636dc220
TEZ-2233. Allow EdgeProperty of an edge to be changed by VertexManager (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c636dc22 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c636dc22 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c636dc22 Branch: refs/heads/master Commit: c636dc2206a1f3a9261b923a0e02ade0d23a19ec Parents: b6ce703 Author: Bikas Saha <[email protected]> Authored: Wed Apr 8 15:16:43 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Wed Apr 8 15:16:43 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/DagTypeConverters.java | 31 +++++- .../org/apache/tez/dag/api/EdgeProperty.java | 23 +++-- .../tez/dag/api/VertexManagerPluginContext.java | 29 +++++- tez-api/src/main/proto/DAGApiRecords.proto | 9 ++ .../java/org/apache/tez/dag/app/dag/Vertex.java | 6 ++ .../org/apache/tez/dag/app/dag/impl/Edge.java | 29 ++++-- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 74 +++++++++----- .../tez/dag/app/dag/impl/VertexManager.java | 14 +++ .../events/VertexParallelismUpdatedEvent.java | 35 ++++--- .../impl/HistoryEventJsonConversion.java | 11 +- .../apache/tez/dag/history/utils/DAGUtils.java | 35 ++++++- tez-dag/src/main/proto/HistoryEvents.proto | 2 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 72 +++++++------ .../TestHistoryEventsProtoConversion.java | 59 ++++++----- .../impl/TestHistoryEventJsonConversion.java | 18 +++- .../ats/HistoryEventTimelineConversion.java | 10 +- .../ats/TestHistoryEventTimelineConversion.java | 18 +++- .../vertexmanager/InputReadyVertexManager.java | 8 +- .../vertexmanager/ShuffleVertexManager.java | 18 +++- .../TestInputReadyVertexManager.java | 10 +- .../vertexmanager/TestShuffleVertexManager.java | 102 +++++-------------- 22 files changed, 393 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f660feb..45c1541 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2233. Allow EdgeProperty of an edge to be changed by VertexManager TEZ-2293. When running in "mr" mode, always use MR config settings. TEZ-2273. Tez UI: Support client side searching & sorting for dag tasks page TEZ-1909. Remove need to copy over all events from attempt 1 to attempt 2 dir http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index 227897f..b4185b1 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -20,7 +20,6 @@ package org.apache.tez.dag.api; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -59,6 +58,7 @@ import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType; import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType; +import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeProperty; import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType; import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource; @@ -159,6 +159,7 @@ public class DagTypeConverters { case ONE_TO_ONE : return DataMovementType.ONE_TO_ONE; case BROADCAST : return DataMovementType.BROADCAST; case SCATTER_GATHER : return DataMovementType.SCATTER_GATHER; + case CUSTOM : return DataMovementType.CUSTOM; default : throw new IllegalArgumentException("unknown 'dataMovementType': " + type); } } @@ -263,6 +264,34 @@ public class DagTypeConverters { } return edgePlanMap; } + + public static PlanEdgeProperty convertToProto(EdgeProperty prop) { + PlanEdgeProperty.Builder edgePropBuilder = PlanEdgeProperty.newBuilder(); + edgePropBuilder.setDataMovementType(convertToDAGPlan(prop.getDataMovementType())); + edgePropBuilder.setDataSourceType(convertToDAGPlan(prop.getDataSourceType())); + edgePropBuilder.setSchedulingType(convertToDAGPlan(prop.getSchedulingType())); + edgePropBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(prop.getEdgeSource())); + edgePropBuilder + .setEdgeDestination(DagTypeConverters.convertToDAGPlan(prop.getEdgeDestination())); + if (prop.getEdgeManagerDescriptor() != null) { + edgePropBuilder.setEdgeManager(DagTypeConverters.convertToDAGPlan(prop + .getEdgeManagerDescriptor())); + } + + return edgePropBuilder.build(); + } + + public static EdgeProperty convertFromProto(PlanEdgeProperty edge) { + return EdgeProperty.create( + (edge.hasEdgeManager() ? + convertEdgeManagerPluginDescriptorFromDAGPlan(edge.getEdgeManager()) : null), + convertFromDAGPlan(edge.getDataMovementType()), + convertFromDAGPlan(edge.getDataSourceType()), + convertFromDAGPlan(edge.getSchedulingType()), + convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()), + convertInputDescriptorFromDAGPlan(edge.getEdgeDestination()) + ); + } public static EdgeProperty createEdgePropertyMapFromDAGPlan(EdgePlan edge) { if (edge.getDataMovementType() == PlanEdgeDataMovementType.CUSTOM) { http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java index c6ac25f..07fb2c1 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java @@ -155,20 +155,22 @@ public class EdgeProperty { edgeDestination); } + @Private + public static EdgeProperty create(EdgeManagerPluginDescriptor edgeManagerDescriptor, + DataMovementType dataMovementType, DataSourceType dataSourceType, + SchedulingType schedulingType, OutputDescriptor edgeSource, InputDescriptor edgeDestination) { + return new EdgeProperty(edgeManagerDescriptor, dataMovementType, dataSourceType, + schedulingType, edgeSource, edgeDestination); + } private EdgeProperty(DataMovementType dataMovementType, DataSourceType dataSourceType, SchedulingType schedulingType, OutputDescriptor edgeSource, InputDescriptor edgeDestination) { + this(null, dataMovementType, dataSourceType, schedulingType, edgeSource, edgeDestination); Preconditions.checkArgument(dataMovementType != DataMovementType.CUSTOM, DataMovementType.CUSTOM + " cannot be used with this constructor"); - this.dataMovementType = dataMovementType; - this.dataSourceType = dataSourceType; - this.schedulingType = schedulingType; - this.outputDescriptor = edgeSource; - this.inputDescriptor = edgeDestination; - this.edgeManagerDescriptor = null; } @@ -177,7 +179,14 @@ public class EdgeProperty { SchedulingType schedulingType, OutputDescriptor edgeSource, InputDescriptor edgeDestination) { - this.dataMovementType = DataMovementType.CUSTOM; + this(edgeManagerDescriptor, DataMovementType.CUSTOM, dataSourceType, schedulingType, + edgeSource, edgeDestination); + } + + private EdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor, + DataMovementType dataMovementType, DataSourceType dataSourceType, + SchedulingType schedulingType, OutputDescriptor edgeSource, InputDescriptor edgeDestination) { + this.dataMovementType = dataMovementType; this.edgeManagerDescriptor = edgeManagerDescriptor; this.dataSourceType = dataSourceType; this.schedulingType = schedulingType; http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java index dfa9287..38ecbf6 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java @@ -129,8 +129,34 @@ public interface VertexManagerPluginContext { @Nullable VertexLocationHint locationHint, @Nullable Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, @Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate); + /** + * API to reconfigure a {@link Vertex} by changing its task parallelism. Task + * parallelism is often accompanied by changing the {@link EdgeProperty} of + * the source {@link Edge} because event routing between source and + * destination tasks may need to be updated to account for the new task + * parallelism. This method can be called to update the parallelism multiple + * times until any of the tasks of the vertex have been scheduled (by invoking + * {@link #scheduleVertexTasks(List)}. If needed, the original source edge + * properties may be obtained via {@link #getInputVertexEdgeProperties()} + * + * @param parallelism + * New number of tasks in the vertex + * @param locationHint + * the placement policy for tasks specified at + * {@link VertexLocationHint}s + * @param sourceEdgeProperties + * Map with Key=name of {@link Edge} to be updated and Value= + * {@link EdgeProperty}. The name of the Edge will be the + * corresponding source vertex name. + * @throws TezException Exception to indicate errors + */ + public void reconfigureVertex(int parallelism, + @Nullable VertexLocationHint locationHint, + @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws TezException; + + /** * Allows a VertexManagerPlugin to assign Events for Root Inputs * * For regular Event Routing changes - the EdgeManager should be configured @@ -189,8 +215,7 @@ public interface VertexManagerPluginContext { * reconfiguration. If the vertex is already fully defined, but the * {@link VertexManagerPlugin} wants to reconfigure the vertex, then it must * use this API to inform Tez about its intention. Without invoking this - * method, it is invalid to re-configure the vertex, e.g. via the - * {@link #setVertexParallelism(int, VertexLocationHint, Map, Map)} method if + * method, it is invalid to re-configure the vertex if * the vertex is already fully defined. This can be invoked at any time until * {@link VertexManagerPlugin#initialize()} has completed. Its invalid to * invoke this method after {@link VertexManagerPlugin#initialize()} has http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-api/src/main/proto/DAGApiRecords.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto index dbd569a..9ac08b2 100644 --- a/tez-api/src/main/proto/DAGApiRecords.proto +++ b/tez-api/src/main/proto/DAGApiRecords.proto @@ -141,6 +141,15 @@ message VertexPlan { optional ConfigurationProto vertexConf = 11; } +message PlanEdgeProperty { + required PlanEdgeDataMovementType dataMovementType = 1; + required PlanEdgeDataSourceType dataSourceType = 2; + required PlanEdgeSchedulingType schedulingType = 3; + optional TezEntityDescriptorProto edge_source = 4; + optional TezEntityDescriptorProto edge_destination = 5; + optional TezEntityDescriptorProto edge_manager = 6; +} + message EdgePlan { required string id = 1; required string inputVertexName = 2; http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 291a0c5..44df6cb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.OutputCommitterDescriptor; @@ -93,6 +94,11 @@ public interface Vertex extends Comparable<Vertex> { Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, Map<String, InputSpecUpdate> rootInputSpecUpdate, boolean fromVertexManager) throws AMUserCodeException; + + public void reconfigureVertex(int parallelism, + @Nullable VertexLocationHint locationHint, + @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws AMUserCodeException; + void setVertexLocationHint(VertexLocationHint vertexLocationHint); void vertexReconfigurationPlanned(); void doneReconfiguringVertex(); http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java index ffdff80..f5fef67 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java @@ -52,6 +52,7 @@ import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -162,7 +163,22 @@ public class Edge { null); } - public synchronized void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor) + public synchronized void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException { + this.edgeProperty = newEdgeProperty; + boolean wasUnInitialized = (edgeManager == null); + createEdgeManager(); + initialize(); + if (wasUnInitialized) { + sendEvent(new VertexEventNullEdgeInitialized(sourceVertex.getVertexId(), this, + destinationVertex)); + sendEvent(new VertexEventNullEdgeInitialized(destinationVertex.getVertexId(), this, + sourceVertex)); + } + } + + // Test only method for creating specific scenarios + @VisibleForTesting + synchronized void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor) throws AMUserCodeException { EdgeProperty modifiedEdgeProperty = EdgeProperty.create(descriptor, @@ -170,17 +186,10 @@ public class Edge { edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(), edgeProperty.getEdgeDestination()); - this.edgeProperty = modifiedEdgeProperty; - boolean wasUnInitialized = (edgeManager == null); - createEdgeManager(); - initialize(); - if (wasUnInitialized) { - sendEvent(new VertexEventNullEdgeInitialized(sourceVertex.getVertexId(), this, destinationVertex)); - sendEvent(new VertexEventNullEdgeInitialized(destinationVertex.getVertexId(), this, sourceVertex)); - } + setEdgeProperty(modifiedEdgeProperty); } - public EdgeProperty getEdgeProperty() { + public synchronized EdgeProperty getEdgeProperty() { return this.edgeProperty; } http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 183e780..2d892b0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -63,6 +63,7 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.OutputCommitterDescriptor; @@ -707,7 +708,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @VisibleForTesting boolean hasCommitter = false; private boolean vertexCompleteSeen = false; - private Map<String,EdgeManagerPluginDescriptor> recoveredSourceEdgeManagers = null; + private Map<String,EdgeProperty> recoveredSourceEdgeProperties = null; private Map<String, InputSpecUpdate> recoveredRootInputSpecUpdates = null; // Recovery related flags @@ -1117,7 +1118,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } private void handleParallelismUpdate(int newParallelism, - Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, + Map<String, EdgeProperty> sourceEdgeProperties, Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldParallelism) { // initial parallelism must have been set by this time // parallelism update is recorded in history only for change from an initialized value @@ -1128,7 +1129,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, removeTasks(newParallelism); } Preconditions.checkState(this.numTasks == newParallelism, getLogIdentifier()); - this.recoveredSourceEdgeManagers = sourceEdgeManagers; + this.recoveredSourceEdgeProperties = sourceEdgeProperties; this.recoveredRootInputSpecUpdates = rootInputSpecUpdates; } @@ -1167,7 +1168,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, (VertexParallelismUpdatedEvent) historyEvent; int oldNumTasks = numTasks; int newNumTasks = updatedEvent.getNumTasks(); - handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeManagers(), + handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeProperties(), updatedEvent.getRootInputSpecUpdates(), oldNumTasks); Preconditions.checkState(this.numTasks == newNumTasks, getLogIdentifier()); if (updatedEvent.getVertexLocationHint() != null) { @@ -1300,32 +1301,59 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, writeLock.unlock(); } } + + @Override + public void reconfigureVertex(int parallelism, + @Nullable VertexLocationHint locationHint, + @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws AMUserCodeException { + setParallelism(parallelism, locationHint, sourceEdgeProperties, null, false, true); + } @Override public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, - Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean fromVertexManager) - throws AMUserCodeException { - setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, + Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean fromVertexManager) + throws AMUserCodeException { + // temporarily support conversion of edge manager to edge property + Map<String, EdgeProperty> sourceEdgeProperties = Maps.newHashMap(); + readLock.lock(); + try { + if (sourceEdgeManagers != null && !sourceEdgeManagers.isEmpty()) { + for (Edge e : sourceVertices.values()) { + EdgeManagerPluginDescriptor newEdge = sourceEdgeManagers.get(e.getSourceVertexName()); + EdgeProperty oldEdge = e.getEdgeProperty(); + if (newEdge != null) { + sourceEdgeProperties.put( + e.getSourceVertexName(), + EdgeProperty.create(newEdge, oldEdge.getDataSourceType(), + oldEdge.getSchedulingType(), oldEdge.getEdgeSource(), + oldEdge.getEdgeDestination())); + } + } + } + } finally { + readLock.unlock(); + } + setParallelism(parallelism, vertexLocationHint, sourceEdgeProperties, rootInputSpecUpdates, false, fromVertexManager); } private void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, - Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, + Map<String, EdgeProperty> sourceEdgeProperties, Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean recovering, boolean fromVertexManager) throws AMUserCodeException { if (recovering) { writeLock.lock(); try { - if (sourceEdgeManagers != null) { - for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : - sourceEdgeManagers.entrySet()) { + if (sourceEdgeProperties != null) { + for(Map.Entry<String, EdgeProperty> entry : + sourceEdgeProperties.entrySet()) { LOG.info("Recovering edge manager for source:" + entry.getKey() + " destination: " + getLogIdentifier()); Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey()); Edge edge = sourceVertices.get(sourceVertex); try { - edge.setCustomEdgeManager(entry.getValue()); + edge.setEdgeProperty(entry.getValue()); } catch (Exception e) { throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge," + "sourceVertex:" + edge.getSourceVertexName() @@ -1378,16 +1406,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, + " for vertex: " + logIdentifier); } - if(sourceEdgeManagers != null) { - for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) { + if(sourceEdgeProperties != null) { + for(Map.Entry<String, EdgeProperty> entry : sourceEdgeProperties.entrySet()) { LOG.info("Replacing edge manager for source:" + entry.getKey() + " destination: " + getLogIdentifier()); Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey()); Edge edge = sourceVertices.get(sourceVertex); try { - edge.setCustomEdgeManager(entry.getValue()); + edge.setEdgeProperty(entry.getValue()); } catch (Exception e) { - throw new TezUncheckedException("Fail to setCustomEdgeManage for Edge," + throw new TezUncheckedException("Fail to update EdgeProperty for Edge," + "sourceVertex:" + edge.getSourceVertexName() + "destinationVertex:" + edge.getDestinationVertexName(), e); } @@ -1438,7 +1466,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, if (parallelism == numTasks) { LOG.info("setParallelism same as current value: " + parallelism + " for vertex: " + logIdentifier); - Preconditions.checkArgument(sourceEdgeManagers != null, + Preconditions.checkArgument(sourceEdgeProperties != null, "Source edge managers or RootInputSpecs must be set when not changing parallelism"); } else { LOG.info("Resetting vertex location hints due to change in parallelism for vertex: " @@ -1465,14 +1493,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, assert tasks.size() == numTasks; // set new edge managers - if(sourceEdgeManagers != null) { - for(Map.Entry<String, EdgeManagerPluginDescriptor> entry : sourceEdgeManagers.entrySet()) { + if(sourceEdgeProperties != null) { + for(Map.Entry<String, EdgeProperty> entry : sourceEdgeProperties.entrySet()) { LOG.info("Replacing edge manager for source:" + entry.getKey() + " destination: " + getLogIdentifier()); Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey()); Edge edge = sourceVertices.get(sourceVertex); try { - edge.setCustomEdgeManager(entry.getValue()); + edge.setEdgeProperty(entry.getValue()); } catch (Exception e) { throw new TezUncheckedException(e); } @@ -1481,7 +1509,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // update history VertexParallelismUpdatedEvent parallelismUpdatedEvent = new VertexParallelismUpdatedEvent( - vertexId, numTasks, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, + vertexId, numTasks, vertexLocationHint, sourceEdgeProperties, rootInputSpecUpdates, oldNumTasks); appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGId(), parallelismUpdatedEvent)); @@ -2739,7 +2767,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, try { // recovering only edge manager vertex.setParallelism(0, - null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true, false); + null, vertex.recoveredSourceEdgeProperties, vertex.recoveredRootInputSpecUpdates, true, false); successSetParallelism = true; } catch (Exception e) { successSetParallelism = false; @@ -2796,7 +2824,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, break; } try { - vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, + vertex.setParallelism(vertex.numTasks, null, vertex.recoveredSourceEdgeProperties, vertex.recoveredRootInputSpecUpdates, true, false); successSetParallelism = true; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 95d714b..4bf51a1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -44,6 +44,7 @@ import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; @@ -156,6 +157,19 @@ public class VertexManager { throw new TezUncheckedException(e); } } + + @Override + public synchronized void reconfigureVertex(int parallelism, + @Nullable VertexLocationHint locationHint, + @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws TezException { + checkAndThrowIfDone(); + try { + managedVertex.reconfigureVertex(parallelism, locationHint, sourceEdgeProperties); + } catch (AMUserCodeException e) { + // convert it to TezException which would be caught in VM + throw new TezException(e); + } + } @Override public synchronized void scheduleVertexTasks(List<TaskWithLocationHint> tasks) { http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java index ef21537..456e2a5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.tez.dag.api.DagTypeConverters; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -44,7 +44,7 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent { private int numTasks; private int oldNumTasks; private VertexLocationHint vertexLocationHint; - private Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers; + private Map<String, EdgeProperty> sourceEdgeProperties; private Map<String, InputSpecUpdate> rootInputSpecUpdates; private long updateTime; @@ -53,12 +53,12 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent { public VertexParallelismUpdatedEvent(TezVertexID vertexID, int numTasks, VertexLocationHint vertexLocationHint, - Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, + Map<String, EdgeProperty> sourceEdgeProperties, Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldNumTasks) { this.vertexID = vertexID; this.numTasks = numTasks; this.vertexLocationHint = vertexLocationHint; - this.sourceEdgeManagers = sourceEdgeManagers; + this.sourceEdgeProperties = sourceEdgeProperties; this.rootInputSpecUpdates = rootInputSpecUpdates; this.updateTime = System.currentTimeMillis(); this.oldNumTasks = oldNumTasks; @@ -88,14 +88,13 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent { builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto( this.vertexLocationHint)); } - if (sourceEdgeManagers != null) { - for (Entry<String, EdgeManagerPluginDescriptor> entry : - sourceEdgeManagers.entrySet()) { + if (sourceEdgeProperties != null) { + for (Entry<String, EdgeProperty> entry : + sourceEdgeProperties.entrySet()) { EdgeManagerDescriptorProto.Builder edgeMgrBuilder = EdgeManagerDescriptorProto.newBuilder(); edgeMgrBuilder.setEdgeName(entry.getKey()); - edgeMgrBuilder.setEntityDescriptor( - DagTypeConverters.convertToDAGPlan(entry.getValue())); + edgeMgrBuilder.setEdgeProperty(DagTypeConverters.convertToProto(entry.getValue())); builder.addEdgeManagerDescriptors(edgeMgrBuilder.build()); } } @@ -121,15 +120,15 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent { proto.getVertexLocationHint()); } if (proto.getEdgeManagerDescriptorsCount() > 0) { - this.sourceEdgeManagers = new HashMap<String, EdgeManagerPluginDescriptor>( + this.sourceEdgeProperties = new HashMap<String, EdgeProperty>( proto.getEdgeManagerDescriptorsCount()); for (EdgeManagerDescriptorProto edgeManagerProto : proto.getEdgeManagerDescriptorsList()) { - EdgeManagerPluginDescriptor edgeManagerDescriptor = - DagTypeConverters.convertEdgeManagerPluginDescriptorFromDAGPlan( - edgeManagerProto.getEntityDescriptor()); - sourceEdgeManagers.put(edgeManagerProto.getEdgeName(), - edgeManagerDescriptor); + EdgeProperty edgeProperty = + DagTypeConverters.convertFromProto( + edgeManagerProto.getEdgeProperty()); + sourceEdgeProperties.put(edgeManagerProto.getEdgeName(), + edgeProperty); } } if (proto.getRootInputSpecUpdatesCount() > 0) { @@ -169,7 +168,7 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent { + ", vertexLocationHint=" + (vertexLocationHint == null? "null" : vertexLocationHint) + ", edgeManagersCount=" + - (sourceEdgeManagers == null? "null" : sourceEdgeManagers.size() + (sourceEdgeProperties == null? "null" : sourceEdgeProperties.size() + ", rootInputSpecUpdateCount=" + (rootInputSpecUpdates == null ? "null" : rootInputSpecUpdates.size())); } @@ -186,8 +185,8 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent { return vertexLocationHint; } - public Map<String, EdgeManagerPluginDescriptor> getSourceEdgeManagers() { - return sourceEdgeManagers; + public Map<String, EdgeProperty> getSourceEdgeProperties() { + return sourceEdgeProperties; } public Map<String, InputSpecUpdate> getRootInputSpecUpdates() { http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 79a0c34..22d95d8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -20,11 +20,10 @@ package org.apache.tez.dag.history.logging.impl; import java.util.Map; import java.util.Map.Entry; - import java.util.TreeMap; import org.apache.tez.common.ATSConstants; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.events.AMLaunchedEvent; @@ -722,12 +721,12 @@ public class HistoryEventJsonConversion { JSONObject eventInfo = new JSONObject(); eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks()); eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks()); - if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) { + if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) { JSONObject updatedEdgeManagers = new JSONObject(); - for (Entry<String, EdgeManagerPluginDescriptor> entry : - event.getSourceEdgeManagers().entrySet()) { + for (Entry<String, EdgeProperty> entry : + event.getSourceEdgeProperties().entrySet()) { updatedEdgeManagers.put(entry.getKey(), - new JSONObject(DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue()))); + new JSONObject(DAGUtils.convertEdgeProperty(entry.getValue()))); } eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers); } http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java index 0f34811..3ec9900 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java @@ -36,14 +36,13 @@ import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo; -import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.impl.VertexStats; import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -234,7 +233,7 @@ public class DAGUtils { edgeMap.put(INPUT_USER_PAYLOAD_AS_TEXT, DagTypeConverters.getHistoryTextFromProto( edgePlan.getEdgeDestination())); - } + } // TEZ-2286 this is missing edgemanager descriptor for custom edge edgesList.add(edgeMap); } putInto(dagMap, EDGES_KEY, edgesList); @@ -352,6 +351,36 @@ public class DAGUtils { return vertexStatsMap; } + public static Map<String,Object> convertEdgeProperty( + EdgeProperty edge) { + Map<String, Object> jsonDescriptor = new HashMap<String, Object>(); + + jsonDescriptor.put(DATA_MOVEMENT_TYPE_KEY, + edge.getDataMovementType().name()); + jsonDescriptor.put(DATA_SOURCE_TYPE_KEY, edge.getDataSourceType().name()); + jsonDescriptor.put(SCHEDULING_TYPE_KEY, edge.getSchedulingType().name()); + jsonDescriptor.put(EDGE_SOURCE_CLASS_KEY, + edge.getEdgeSource().getClassName()); + jsonDescriptor.put(EDGE_DESTINATION_CLASS_KEY, + edge.getEdgeDestination().getClassName()); + String history = edge.getEdgeSource().getHistoryText(); + if (history != null) { + jsonDescriptor.put(OUTPUT_USER_PAYLOAD_AS_TEXT, history); + } + history = edge.getEdgeDestination().getHistoryText(); + if (history != null) { + jsonDescriptor.put(INPUT_USER_PAYLOAD_AS_TEXT, history); + } + EdgeManagerPluginDescriptor descriptor = edge.getEdgeManagerDescriptor(); + if (descriptor != null) { + jsonDescriptor.put(EDGE_MANAGER_CLASS_KEY, descriptor.getClassName()); + if (descriptor.getHistoryText() != null && !descriptor.getHistoryText().isEmpty()) { + jsonDescriptor.put(USER_PAYLOAD_AS_TEXT, descriptor.getHistoryText()); + } + } + return jsonDescriptor; + } + public static Map<String,Object> convertEdgeManagerPluginDescriptor( EdgeManagerPluginDescriptor descriptor) { Map<String, Object> jsonDescriptor = new HashMap<String, Object>(); http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index 45e9582..617a644 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -95,7 +95,7 @@ message VertexStartedProto { message EdgeManagerDescriptorProto { optional string edge_name = 1; - optional TezEntityDescriptorProto entity_descriptor = 2; + optional PlanEdgeProperty edge_property = 2; } message RootInputSpecUpdateProto { http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index e643a5b..5ad320e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -2441,7 +2441,7 @@ public class TestVertexImpl { startVertex(v1); try { // cannot reconfigure a fully configured vertex without first notifying - v3.setParallelism(1, null, null, null, true); + v3.reconfigureVertex(1, null, null); Assert.fail(); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("context.vertexReconfigurationPlanned() before re-configuring")); @@ -2496,10 +2496,13 @@ public class TestVertexImpl { EdgeManagerPluginDescriptor mockEdgeManagerDescriptor = EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName()); - Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors = + EdgeProperty edgeProp = EdgeProperty.create(mockEdgeManagerDescriptor, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), + InputDescriptor.create("In")); + Map<String, EdgeProperty> edgeManagerDescriptors = Collections.singletonMap( - v1.getName(), mockEdgeManagerDescriptor); - v3.setParallelism(1, null, edgeManagerDescriptors, null, true); + v1.getName(), edgeProp); + v3.reconfigureVertex(1, null, edgeManagerDescriptors); v3.doneReconfiguringVertex(); assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof EdgeManagerForTest); @@ -2520,11 +2523,13 @@ public class TestVertexImpl { EdgeManagerPluginDescriptor mockEdgeManagerDescriptor = EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName()); - - Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors = + EdgeProperty edgeProp = EdgeProperty.create(mockEdgeManagerDescriptor, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), + InputDescriptor.create("In")); + Map<String, EdgeProperty> edgeManagerDescriptors = Collections.singletonMap( - v1.getName(), mockEdgeManagerDescriptor); - v3.setParallelism(10, null, edgeManagerDescriptors, null, true); + v1.getName(), edgeProp); + v3.reconfigureVertex(10, null, edgeManagerDescriptors); v3.doneReconfiguringVertex(); assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof EdgeManagerForTest); @@ -2543,10 +2548,10 @@ public class TestVertexImpl { VertexImpl v1 = vertices.get("vertex1"); startVertex(vertices.get("vertex2")); startVertex(v1); - v3.setParallelism(10, null, null, null, true); + v3.reconfigureVertex(10, null, null); checkTasks(v3, 10); - v3.setParallelism(5, null, null, null, true); + v3.reconfigureVertex(5, null, null); checkTasks(v3, 5); v3.doneReconfiguringVertex(); } @@ -2563,12 +2568,12 @@ public class TestVertexImpl { VertexImpl v1 = vertices.get("vertex1"); startVertex(vertices.get("vertex2")); startVertex(v1); - v3.setParallelism(10, null, null, null, true); + v3.reconfigureVertex(10, null, null); checkTasks(v3, 10); v3.doneReconfiguringVertex(); try { - v3.setParallelism(5, null, null, null, true); + v3.reconfigureVertex(5, null, null); Assert.fail(); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("Vertex is fully configured but still")); @@ -2587,11 +2592,11 @@ public class TestVertexImpl { VertexImpl v1 = vertices.get("vertex1"); startVertex(vertices.get("vertex2")); startVertex(v1); - v3.setParallelism(10, null, null, null, true); + v3.reconfigureVertex(10, null, null); checkTasks(v3, 10); v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null))); try { - v3.setParallelism(5, null, null, null, true); + v3.reconfigureVertex(5, null, null); Assert.fail(); } catch (TezUncheckedException e) { Assert.assertTrue(e.getMessage().contains("setParallelism cannot be called after scheduling")); @@ -2612,7 +2617,7 @@ public class TestVertexImpl { startVertex(v1); v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null))); try { - v3.setParallelism(5, null, null, null, true); + v3.reconfigureVertex(5, null, null); Assert.fail(); } catch (TezUncheckedException e) { Assert.assertTrue(e.getMessage().contains("setParallelism cannot be called after scheduling")); @@ -2671,12 +2676,15 @@ public class TestVertexImpl { EdgeManagerPluginDescriptor edgeManagerDescriptor = EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName()); edgeManagerDescriptor.setUserPayload(userPayload); + EdgeProperty edgeProp = EdgeProperty.create(edgeManagerDescriptor, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), + InputDescriptor.create("In")); Vertex v3 = vertices.get("vertex3"); - Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors = - Collections.singletonMap(v3.getName(), edgeManagerDescriptor); - v5.setParallelism(v5.getTotalTasks() - 1, null, edgeManagerDescriptors, null, true); // Must decrease. + Map<String, EdgeProperty> edgeManagerDescriptors = + Collections.singletonMap(v3.getName(), edgeProp); + v5.reconfigureVertex(v5.getTotalTasks() - 1, null, edgeManagerDescriptors); v5.doneReconfiguringVertex(); VertexImpl v5Impl = (VertexImpl) v5; @@ -3411,8 +3419,8 @@ public class TestVertexImpl { Assert.assertEquals(-1, v1.getTotalTasks()); Assert.assertEquals(VertexState.INITIALIZING, v1.getState()); // set the parallelism - v1.setParallelism(numTasks, null, null, null, true); - v2.setParallelism(numTasks, null, null, null, true); + v1.reconfigureVertex(numTasks, null, null); + v2.reconfigureVertex(numTasks, null, null); dispatcher.await(); // parallelism set and vertex starts with pending start event Assert.assertEquals(numTasks, v1.getTotalTasks()); @@ -3427,7 +3435,7 @@ public class TestVertexImpl { // v3 still initializing with source vertex started. So should start running // once num tasks is defined Assert.assertEquals(VertexState.INITIALIZING, v3.getState()); - v3.setParallelism(numTasks, null, null, null, false); + v3.reconfigureVertex(numTasks, null, null); dispatcher.await(); Assert.assertEquals(numTasks, v3.getTotalTasks()); Assert.assertEquals(VertexState.RUNNING, v3.getState()); @@ -3550,7 +3558,7 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState()); // change parallelism int newNumTasks = 3; - v1.setParallelism(newNumTasks, null, null, null, true); + v1.reconfigureVertex(newNumTasks, null, null); v1.doneReconfiguringVertex(); dispatcher.await(); Assert.assertEquals(newNumTasks, vertices.get("vertex2").getTotalTasks()); @@ -3584,7 +3592,7 @@ public class TestVertexImpl { Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks()); // change parallelism int newNumTasks = 3; - v1.setParallelism(newNumTasks, null, null, null, true); + v1.reconfigureVertex(newNumTasks, null, null); v1.doneReconfiguringVertex(); dispatcher.await(); Assert.assertEquals(newNumTasks, vertices.get("vertex2").getTotalTasks()); @@ -5039,9 +5047,12 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.INITIALIZING, vB.getState()); Assert.assertEquals(VertexState.INITIALIZING, vC.getState()); - Map<String, EdgeManagerPluginDescriptor> edges = Maps.newHashMap(); - edges.put("B", mockEdgeManagerDescriptor); - vC.setParallelism(2, vertexLocationHint, edges, null, true); + EdgeProperty edgeProp = EdgeProperty.create(mockEdgeManagerDescriptor, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), + InputDescriptor.create("In")); + Map<String, EdgeProperty> edges = Maps.newHashMap(); + edges.put("B", edgeProp); + vC.reconfigureVertex(2, vertexLocationHint, edges); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, vA.getState()); @@ -5105,9 +5116,12 @@ public class TestVertexImpl { Assert.assertEquals(0, listener.events.size()); // complete configuration and verify getting configured signal from vB - Map<String, EdgeManagerPluginDescriptor> edges = Maps.newHashMap(); - edges.put("B", mockEdgeManagerDescriptor); - vC.setParallelism(2, vertexLocationHint, edges, null, true); + EdgeProperty edgeProp = EdgeProperty.create(mockEdgeManagerDescriptor, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), + InputDescriptor.create("In")); + Map<String, EdgeProperty> edges = Maps.newHashMap(); + edges.put("B", edgeProp); + vC.reconfigureVertex(2, vertexLocationHint, edges); dispatcher.await(); Assert.assertEquals(1, listener.events.size()); http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index a91c0f8..bf61ff0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -33,6 +33,12 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.dag.api.EdgeProperty.DataSourceType; +import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.TaskLocationHint; @@ -302,8 +308,8 @@ public class TestHistoryEventsProtoConversion { testProtoConversion(event); Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks()); - Assert.assertEquals(event.getSourceEdgeManagers(), - deserializedEvent.getSourceEdgeManagers()); + Assert.assertEquals(event.getSourceEdgeProperties(), + deserializedEvent.getSourceEdgeProperties()); Assert.assertEquals(event.getVertexLocationHint(), deserializedEvent.getVertexLocationHint()); Assert.assertEquals(event.getRootInputSpecUpdates().size(), deserializedEvent @@ -321,12 +327,17 @@ public class TestHistoryEventsProtoConversion { logEvents(event, deserializedEvent); } { - Map<String,EdgeManagerPluginDescriptor> sourceEdgeManagers - = new LinkedHashMap<String, EdgeManagerPluginDescriptor>(); - sourceEdgeManagers.put("foo", EdgeManagerPluginDescriptor.create("bar")); - sourceEdgeManagers.put("foo1", EdgeManagerPluginDescriptor.create("bar1") + Map<String, EdgeProperty> sourceEdgeManagers + = new LinkedHashMap<String, EdgeProperty>(); + // add standard and custom edge + sourceEdgeManagers.put("foo", EdgeProperty.create(DataMovementType.SCATTER_GATHER, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create("Out1"), InputDescriptor.create("in1"))); + sourceEdgeManagers.put("foo1", EdgeProperty.create(EdgeManagerPluginDescriptor.create("bar1") .setUserPayload( - UserPayload.create(ByteBuffer.wrap(new String("payload").getBytes()), 100))); + UserPayload.create(ByteBuffer.wrap(new String("payload").getBytes()), 100)), + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create("Out1"), InputDescriptor.create("in1"))); VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent( TezVertexID.getInstance( @@ -336,24 +347,26 @@ public class TestHistoryEventsProtoConversion { new HashSet<String>(Arrays.asList("r1"))))), sourceEdgeManagers, null, 1); - VertexParallelismUpdatedEvent deserializedEvent = (VertexParallelismUpdatedEvent) - testProtoConversion(event); + VertexParallelismUpdatedEvent deserializedEvent = + (VertexParallelismUpdatedEvent) testProtoConversion(event); Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks()); - Assert.assertEquals(event.getSourceEdgeManagers().size(), - deserializedEvent.getSourceEdgeManagers().size()); - Assert.assertEquals(event.getSourceEdgeManagers().get("foo").getClassName(), - deserializedEvent.getSourceEdgeManagers().get("foo").getClassName()); - Assert.assertNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload()); - Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getClassName(), - deserializedEvent.getSourceEdgeManagers().get("foo1").getClassName()); - Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload().getVersion(), - deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload().getVersion()); - Assert.assertArrayEquals( - event.getSourceEdgeManagers().get("foo1").getUserPayload().deepCopyAsArray(), - deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload().deepCopyAsArray()); - Assert.assertEquals(event.getVertexLocationHint(), - deserializedEvent.getVertexLocationHint()); + Assert.assertEquals(event.getSourceEdgeProperties().size(), deserializedEvent + .getSourceEdgeProperties().size()); + Assert.assertEquals(event.getSourceEdgeProperties().get("foo").getDataMovementType(), + deserializedEvent.getSourceEdgeProperties().get("foo").getDataMovementType()); + Assert.assertNull(deserializedEvent.getSourceEdgeProperties().get("foo") + .getEdgeManagerDescriptor()); + Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getDataMovementType(), + deserializedEvent.getSourceEdgeProperties().get("foo1").getDataMovementType()); + Assert.assertEquals(event.getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor() + .getUserPayload().getVersion(), deserializedEvent.getSourceEdgeProperties().get("foo1") + .getEdgeManagerDescriptor().getUserPayload().getVersion()); + Assert.assertArrayEquals(event.getSourceEdgeProperties().get("foo1") + .getEdgeManagerDescriptor().getUserPayload().deepCopyAsArray(), deserializedEvent + .getSourceEdgeProperties().get("foo1").getEdgeManagerDescriptor().getUserPayload() + .deepCopyAsArray()); + Assert.assertEquals(event.getVertexLocationHint(), deserializedEvent.getVertexLocationHint()); logEvents(event, deserializedEvent); } } http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index 0eeba0d..bbf29e3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -29,6 +29,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.ATSConstants; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.EdgeProperty.DataSourceType; +import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; @@ -195,9 +201,12 @@ public class TestHistoryEventJsonConversion { TezVertexID vId = TezVertexID.getInstance( TezDAGID.getInstance( ApplicationId.newInstance(1l, 1), 1), 1); - Map<String, EdgeManagerPluginDescriptor> edgeMgrs = - new HashMap<String, EdgeManagerPluginDescriptor>(); - edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text")); + Map<String, EdgeProperty> edgeMgrs = + new HashMap<String, EdgeProperty>(); + + edgeMgrs.put("a", EdgeProperty.create(EdgeManagerPluginDescriptor.create("a.class") + .setHistoryText("text"), DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create("Out"), InputDescriptor.create("In"))); VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null, edgeMgrs, null, 10); @@ -223,6 +232,9 @@ public class TestHistoryEventJsonConversion { Assert.assertNotNull(updatedEdgeMgrs.getJSONObject("a")); JSONObject updatedEdgeMgr = updatedEdgeMgrs.getJSONObject("a"); + Assert.assertEquals(DataMovementType.CUSTOM.name(), + updatedEdgeMgr.getString(DAGUtils.DATA_MOVEMENT_TYPE_KEY)); + Assert.assertEquals("In", updatedEdgeMgr.getString(DAGUtils.EDGE_DESTINATION_CLASS_KEY)); Assert.assertEquals("a.class", updatedEdgeMgr.getString(DAGUtils.EDGE_MANAGER_CLASS_KEY)); JSONObject otherInfo = jsonObject.getJSONObject(ATSConstants.OTHER_INFO); http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index e1c2a72..fdd8f19 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -27,7 +27,7 @@ import java.util.TreeMap; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.tez.common.ATSConstants; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; @@ -603,12 +603,12 @@ public class HistoryEventTimelineConversion { updateEvt.setTimestamp(event.getUpdateTime()); Map<String,Object> eventInfo = new HashMap<String, Object>(); - if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty()) { + if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) { Map<String, Object> updatedEdgeManagers = new HashMap<String, Object>(); - for (Entry<String, EdgeManagerPluginDescriptor> entry : - event.getSourceEdgeManagers().entrySet()) { + for (Entry<String, EdgeProperty> entry : + event.getSourceEdgeProperties().entrySet()) { updatedEdgeManagers.put(entry.getKey(), - DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue())); + DAGUtils.convertEdgeProperty(entry.getValue())); } eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers); } http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index a2b0f89..14330ba 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -34,6 +34,12 @@ import org.apache.tez.common.ATSConstants; import org.apache.tez.common.VersionInfo; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.dag.api.EdgeProperty.DataSourceType; +import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; @@ -799,9 +805,12 @@ public class TestHistoryEventTimelineConversion { @Test(timeout = 5000) public void testConvertVertexParallelismUpdatedEvent() { TezVertexID vId = tezVertexID; - Map<String, EdgeManagerPluginDescriptor> edgeMgrs = - new HashMap<String, EdgeManagerPluginDescriptor>(); - edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text")); + Map<String, EdgeProperty> edgeMgrs = + new HashMap<String, EdgeProperty>(); + + edgeMgrs.put("a", EdgeProperty.create(EdgeManagerPluginDescriptor.create("a.class") + .setHistoryText("text"), DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create("Out"), InputDescriptor.create("In"))); VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null, edgeMgrs, null, 10); @@ -829,6 +838,9 @@ public class TestHistoryEventTimelineConversion { Assert.assertTrue(updatedEdgeMgrs.containsKey("a")); Map<String, Object> updatedEdgeMgr = (Map<String, Object>) updatedEdgeMgrs.get("a"); + Assert.assertEquals(DataMovementType.CUSTOM.name(), + updatedEdgeMgr.get(DAGUtils.DATA_MOVEMENT_TYPE_KEY)); + Assert.assertEquals("In", updatedEdgeMgr.get(DAGUtils.EDGE_DESTINATION_CLASS_KEY)); Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY)); Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)); http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java index 98a5873..8671161 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.VertexManagerPlugin; @@ -123,7 +124,12 @@ public class InputReadyVertexManager extends VertexManagerPlugin { // must change parallelism to make them the same LOG.info("Update parallelism of vertex: " + getContext().getVertexName() + " to " + oneToOneSrcTaskCount + " to match source 1-1 vertices."); - getContext().setVertexParallelism(oneToOneSrcTaskCount, null, null, null); + try { + getContext().reconfigureVertex(oneToOneSrcTaskCount, null, null); + } catch (TezException e) { + // TODO fail vertex - TEZ-2292 + LOG.warn("Failed to change parallelism in: " + getContext().getVertexName(), e); + } } oneToOneSrcTasksDoneCount = new int[oneToOneSrcTaskCount]; oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount]; http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index b6d69dc..9be9986 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -39,6 +39,7 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexManagerPlugin; @@ -510,11 +511,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin { if(finalTaskParallelism < currentParallelism) { // final parallelism is less than actual parallelism - Map<String, EdgeManagerPluginDescriptor> edgeManagers = - new HashMap<String, EdgeManagerPluginDescriptor>(bipartiteSources); + Map<String, EdgeProperty> edgeProperties = + new HashMap<String, EdgeProperty>(bipartiteSources); Iterable<Map.Entry<String, SourceVertexInfo>> bipartiteItr = getBipartiteInfo(); for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) { String vertex = entry.getKey(); + EdgeProperty oldEdgeProp = entry.getValue().edgeProperty; // use currentParallelism for numSourceTasks to maintain original state // for the source tasks CustomShuffleEdgeManagerConfig edgeManagerConfig = @@ -525,10 +527,18 @@ public class ShuffleVertexManager extends VertexManagerPlugin { EdgeManagerPluginDescriptor edgeManagerDescriptor = EdgeManagerPluginDescriptor.create(CustomShuffleEdgeManager.class.getName()); edgeManagerDescriptor.setUserPayload(edgeManagerConfig.toUserPayload()); - edgeManagers.put(vertex, edgeManagerDescriptor); + EdgeProperty newEdgeProp = EdgeProperty.create(edgeManagerDescriptor, + oldEdgeProp.getDataSourceType(), oldEdgeProp.getSchedulingType(), + oldEdgeProp.getEdgeSource(), oldEdgeProp.getEdgeDestination()); + edgeProperties.put(vertex, newEdgeProp); } - getContext().setVertexParallelism(finalTaskParallelism, null, edgeManagers, null); + try { + getContext().reconfigureVertex(finalTaskParallelism, null, edgeProperties); + } catch (TezException e) { + // TODO fail vertex - TEZ-2292 + LOG.warn("Failed to change parallelism in: " + getContext().getVertexName(), e); + } updatePendingTasks(); } return true; http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java index 411ea71..b164a6d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java @@ -257,7 +257,7 @@ public class TestInputReadyVertexManager { manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - verify(mockContext, times(1)).setVertexParallelism(3, null, null, null); + verify(mockContext, times(1)).reconfigureVertex(3, null, null); verify(mockContext, times(1)).doneReconfiguringVertex(); manager.onVertexStarted(initialCompletions); @@ -275,8 +275,8 @@ public class TestInputReadyVertexManager { } catch (TezUncheckedException e) { e.getMessage().contains("1-1 source vertices must have identical concurrency"); } - verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(), - anyMap(), anyMap()); // not invoked + verify(mockContext, times(1)).reconfigureVertex(anyInt(), (VertexLocationHint) any(), + anyMap()); // not invoked when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3); @@ -288,8 +288,8 @@ public class TestInputReadyVertexManager { manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(), - anyMap(), anyMap()); // not invoked + verify(mockContext, times(1)).reconfigureVertex(anyInt(), (VertexLocationHint) any(), + anyMap()); // not invoked verify(mockContext, times(2)).doneReconfiguringVertex(); manager.onVertexStarted(initialCompletions); // all 1-1 0's done but not scheduled because v1 is not done http://git-wip-us.apache.org/repos/asf/tez/blob/c636dc22/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 4d9302e..27cd292 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -30,6 +30,7 @@ import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; @@ -170,11 +171,11 @@ public class TestShuffleVertexManager { public Object answer(InvocationOnMock invocation) throws Exception { when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2); newEdgeManagers.clear(); - for (Entry<String, EdgeManagerPluginDescriptor> entry : - ((Map<String, EdgeManagerPluginDescriptor>)invocation.getArguments()[2]).entrySet()) { + for (Entry<String, EdgeProperty> entry : + ((Map<String, EdgeProperty>)invocation.getArguments()[2]).entrySet()) { - - final UserPayload userPayload = entry.getValue().getUserPayload(); + EdgeManagerPluginDescriptor pluginDesc = entry.getValue().getEdgeManagerDescriptor(); + final UserPayload userPayload = pluginDesc.getUserPayload(); EdgeManagerPluginContext emContext = new EdgeManagerPluginContext() { @Override public UserPayload getUserPayload() { @@ -202,13 +203,13 @@ public class TestShuffleVertexManager { } }; EdgeManagerPlugin edgeManager = ReflectionUtils - .createClazzInstance(entry.getValue().getClassName(), + .createClazzInstance(pluginDesc.getClassName(), new Class[]{EdgeManagerPluginContext.class}, new Object[]{emContext}); edgeManager.initialize(); newEdgeManagers.put(entry.getKey(), edgeManager); } return null; - }}).when(mockContext).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap(), anyMap()); + }}).when(mockContext).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); // check initialization manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig @@ -264,11 +265,11 @@ public class TestShuffleVertexManager { manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); - verify(mockContext, times(0)).setVertexParallelism(anyInt(), any(VertexLocationHint.class), anyMap(), anyMap()); + verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); verify(mockContext, times(2)).doneReconfiguringVertex(); // trigger scheduling manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - verify(mockContext, times(0)).setVertexParallelism(anyInt(), any(VertexLocationHint.class), anyMap(), anyMap()); + verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled Assert.assertEquals(4, scheduledTasks.size()); @@ -320,9 +321,7 @@ public class TestShuffleVertexManager { vmEvent = VertexManagerEvent.create("Vertex", payload); manager.onVertexManagerEventReceived(vmEvent); Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined - verify(mockContext, times(1)).setVertexParallelism(eq(2), any(VertexLocationHint.class), - anyMap(), - anyMap()); + verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); @@ -357,17 +356,13 @@ public class TestShuffleVertexManager { manager.onVertexManagerEventReceived(vmEvent); //small payload manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(i)); //should not change parallelism - verify(mockContext, times(0)).setVertexParallelism(eq(4), any(VertexLocationHint.class), - anyMap(), - anyMap()); + verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap()); } //send 8th event with payload size as 100 manager.onVertexManagerEventReceived(vmEvent); manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(8)); //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism - verify(mockContext, times(1)).setVertexParallelism(eq(4), any(VertexLocationHint.class), - anyMap(), - anyMap()); + verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap()); //reset context for next test when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); @@ -409,9 +404,7 @@ public class TestShuffleVertexManager { manager.onVertexManagerEventReceived(vmEvent); manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); // managedVertex tasks reduced - verify(mockContext, times(2)).setVertexParallelism(eq(2), any(VertexLocationHint.class), - anyMap(), - anyMap()); + verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); // TODO improve tests for parallelism Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled @@ -424,9 +417,7 @@ public class TestShuffleVertexManager { // more completions dont cause recalculation of parallelism manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); - verify(mockContext, times(2)).setVertexParallelism(eq(2), any(VertexLocationHint.class), - anyMap(), - anyMap()); + verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next(); @@ -742,52 +733,6 @@ public class TestShuffleVertexManager { when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3); when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3); - final Map<String, EdgeManagerPlugin> edgeManagerR2 = - new HashMap<String, EdgeManagerPlugin>(); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) throws Exception { - when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(2); - edgeManagerR2.clear(); - for (Entry<String, EdgeManagerPluginDescriptor> entry : - ((Map<String, EdgeManagerPluginDescriptor>)invocation.getArguments()[2]).entrySet()) { - - - final UserPayload userPayload = entry.getValue().getUserPayload(); - EdgeManagerPluginContext emContext = new EdgeManagerPluginContext() { - @Override - public UserPayload getUserPayload() { - return userPayload == null ? null : userPayload; - } - - @Override - public String getSourceVertexName() { - return null; - } - - @Override - public String getDestinationVertexName() { - return null; - } - - @Override - public int getSourceVertexNumTasks() { - return 2; - } - - @Override - public int getDestinationVertexNumTasks() { - return 2; - } - }; - EdgeManagerPlugin edgeManager = ReflectionUtils - .createClazzInstance(entry.getValue().getClassName(), - new Class[]{EdgeManagerPluginContext.class}, new Object[]{emContext}); - edgeManager.initialize(); - edgeManagerR2.put(entry.getKey(), edgeManager); - } - return null; - }}).when(mockContext_R2).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap(), anyMap()); - ByteBuffer payload = VertexManagerEventPayloadProto.newBuilder().setOutputSize(50L).build().toByteString().asReadOnlyByteBuffer(); VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload); @@ -834,15 +779,18 @@ public class TestShuffleVertexManager { Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); //Ensure that setVertexParallelism is not called for R2. - verify(mockContext_R2, times(0)).setVertexParallelism(anyInt(), any(VertexLocationHint.class), - anyMap(), - anyMap()); + try { + verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), + anyMap()); + // complete configuration of r1 triggers the scheduling + manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); + verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), any(VertexLocationHint.class), + anyMap()); + } catch (TezException e) { + e.printStackTrace(); + Assert.fail(); // should not happen + } - // complete configuration of r1 triggers the scheduling - manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); - verify(mockContext_R2, times(1)).setVertexParallelism(eq(1), any(VertexLocationHint.class), - anyMap(), - anyMap()); Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3);
