Repository: tez Updated Branches: refs/heads/master c6e400e2d -> ec45c510c
TEZ-2330. Create reconfigureVertex() API for input based initialization (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ec45c510 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ec45c510 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ec45c510 Branch: refs/heads/master Commit: ec45c510c04eead59799813521f5ce0c6868960f Parents: c6e400e Author: Bikas Saha <[email protected]> Authored: Tue Apr 21 19:24:00 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Tue Apr 21 19:24:00 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/api/VertexManagerPluginContext.java | 26 ++++++++++++++++++++ .../java/org/apache/tez/dag/app/dag/Vertex.java | 4 +++ .../app/dag/impl/RootInputVertexManager.java | 9 +++++-- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 12 ++++++--- .../tez/dag/app/dag/impl/VertexManager.java | 12 +++++++++ .../tez/dag/app/dag/impl/TestVertexImpl.java | 2 +- 7 files changed, 59 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0c83c08..35cf312 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2330. Create reconfigureVertex() API for input based initialization TEZ-2292. Add e2e test for error reporting when vertex manager invokes plugin APIs TEZ-2308. Add set/get of record counts in task/vertex statistics http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java index 8b0e89e..345ea43 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java @@ -147,6 +147,7 @@ public interface VertexManagerPluginContext { * @param rootInputSpecUpdate Updated Root Input specifications, if any. * If none specified, a default of 1 physical input is used */ + @Deprecated public void setVertexParallelism(int parallelism, @Nullable VertexLocationHint locationHint, @Nullable Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, @@ -178,6 +179,31 @@ public interface VertexManagerPluginContext { @Nullable Map<String, EdgeProperty> sourceEdgeProperties); /** + * API to reconfigure a {@link Vertex} that is reading root inputs based on + * the data read from the root inputs. Root inputs are external data sources + * that provide the initial data for the DAG and are added to the + * {@link Vertex} using the + * {@link Vertex#addDataSource(String, DataSourceDescriptor)} API. Typically, + * the parallelism of such vertices is determined at runtime by gathering + * information about the data source. This API may be used to set the + * parallelism of the vertex at runtime based on the data sources, as well as + * changing the specification for those inputs. + * @param rootInputSpecUpdate + * The key of the map is the name of the data source and the value is + * the updated {@link InputSpecUpdate} for that data source. If none + * specified, a default value is used. See {@link InputSpecUpdate} + * for details. + * @param locationHint + * the placement policy for tasks specified at + * {@link VertexLocationHint}s + * @param parallelism + * New number of tasks in the vertex + */ + public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate, + @Nullable VertexLocationHint locationHint, + int parallelism); + + /** * Allows a VertexManagerPlugin to assign Events for Root Inputs * * For regular Event Routing changes - the EdgeManager should be configured http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 6c85b85..77ef6e0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -101,6 +101,10 @@ public interface Vertex extends Comparable<Vertex> { public void reconfigureVertex(int parallelism, @Nullable VertexLocationHint locationHint, @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws AMUserCodeException; + + public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate, + int parallelism, + @Nullable VertexLocationHint locationHint) throws AMUserCodeException; void setVertexLocationHint(VertexLocationHint vertexLocationHint); void vertexReconfigurationPlanned(); http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java index e850286..c1e96f3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java @@ -30,12 +30,17 @@ import org.apache.tez.runtime.api.InputSpecUpdate; import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; public class RootInputVertexManager extends ImmediateStartVertexManager { + private static final Logger LOG = + LoggerFactory.getLogger(RootInputVertexManager.class); + private String configuredInputName; public RootInputVertexManager(VertexManagerPluginContext context) { @@ -66,8 +71,8 @@ public class RootInputVertexManager extends ImmediateStartVertexManager { inputName, cEvent.getInputSpecUpdate() == null ? InputSpecUpdate .getDefaultSinglePhysicalInputSpecUpdate() : cEvent.getInputSpecUpdate()); - getContext().setVertexParallelism(cEvent.getNumTasks(), - cEvent.getLocationHint(), null, rootInputSpecUpdate); + getContext().reconfigureVertex(rootInputSpecUpdate, cEvent.getLocationHint(), + cEvent.getNumTasks()); } if (event instanceof InputUpdatePayloadEvent) { // No tasks should have been started yet. Checked by initial state check. http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 5dfcb8e..e22343b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -24,7 +24,6 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -82,7 +81,6 @@ import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.api.client.ProgressBuilder; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; -import org.apache.tez.dag.api.client.VertexStatus.State; import org.apache.tez.dag.api.client.VertexStatusBuilder; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated; @@ -96,7 +94,6 @@ import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; -import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.RootInputInitializerManager; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.Task; @@ -1433,7 +1430,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws AMUserCodeException { setParallelism(parallelism, locationHint, sourceEdgeProperties, null, false, true); } - + + @Override + public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate, + int parallelism, + @Nullable VertexLocationHint locationHint) throws AMUserCodeException { + setParallelism(parallelism, locationHint, null, rootInputSpecUpdate, false, true); + } + @Override public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers, http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 2ac1acf..1ed42fc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -185,6 +185,18 @@ public class VertexManager { throw new TezUncheckedException(e); } } + + @Override + public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate, + @Nullable VertexLocationHint locationHint, + int parallelism) { + checkAndThrowIfDone(); + try { + managedVertex.reconfigureVertex(rootInputSpecUpdate, parallelism, locationHint); + } catch (AMUserCodeException e) { + throw new TezUncheckedException(e); + } + } @Override public synchronized void scheduleVertexTasks(List<TaskWithLocationHint> tasks) { http://git-wip-us.apache.org/repos/asf/tez/blob/ec45c510/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 2403599..3147093 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -5566,7 +5566,7 @@ public class TestVertexImpl { } map.put("input4", InputSpecUpdate.createPerTaskInputSpecUpdate(pInputList)); } - getContext().setVertexParallelism(NUM_TASKS, null, null, map); + getContext().reconfigureVertex(map, null, NUM_TASKS); } }
