TEZ-1303. Change Inputs, Outputs, InputInitializer, OutputCommitter, VertexManagerPlugin, EdgeManager to require constructors for creation, and remove the initialize methods. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2213c109 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2213c109 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2213c109 Branch: refs/heads/master Commit: 2213c109edce1380dfab9fe103d7441bb5e1e399 Parents: 770e305 Author: Siddharth Seth <[email protected]> Authored: Wed Jul 30 16:26:43 2014 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Jul 30 16:26:43 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/tez/dag/api/EdgeManager.java | 36 +++++++-- .../apache/tez/dag/api/VertexManagerPlugin.java | 29 ++++++- .../tez/runtime/api/AbstractLogicalInput.java | 52 +++++++++---- .../tez/runtime/api/AbstractLogicalOutput.java | 43 +++++++---- .../runtime/api/InputFrameworkInterface.java | 9 +-- .../api/LogicalInputFrameworkInterface.java | 9 --- .../api/LogicalOutputFrameworkInterface.java | 9 --- .../tez/runtime/api/MergedLogicalInput.java | 27 +++++-- .../apache/tez/runtime/api/OutputCommitter.java | 28 ++++++- .../runtime/api/OutputFrameworkInterface.java | 10 +-- .../runtime/api/TezRootInputInitializer.java | 28 ++++++- .../app/dag/RootInputInitializerManager.java | 34 ++++---- .../dag/app/dag/impl/BroadcastEdgeManager.java | 16 ++-- .../org/apache/tez/dag/app/dag/impl/Edge.java | 29 +++---- .../dag/impl/ImmediateStartVertexManager.java | 16 ++-- .../dag/app/dag/impl/OneToOneEdgeManager.java | 8 +- .../app/dag/impl/RootInputVertexManager.java | 26 ++++--- .../app/dag/impl/ScatterGatherEdgeManager.java | 16 ++-- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 21 +++-- .../tez/dag/app/dag/impl/VertexManager.java | 17 +--- .../tez/dag/app/dag/impl/TestDAGImpl.java | 5 +- .../dag/impl/TestRootInputVertexManager.java | 8 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 65 +++++++++------- .../org/apache/tez/test/EdgeManagerForTest.java | 17 ++-- .../tez/test/VertexManagerPluginForTest.java | 6 +- .../mapreduce/committer/MROutputCommitter.java | 24 +++--- .../common/MRInputAMSplitGenerator.java | 18 ++--- .../common/MRInputSplitDistributor.java | 12 +-- .../org/apache/tez/mapreduce/input/MRInput.java | 7 +- .../tez/mapreduce/input/MRInputLegacy.java | 5 ++ .../tez/mapreduce/input/MultiMRInput.java | 6 +- .../tez/mapreduce/input/base/MRInputBase.java | 5 ++ .../apache/tez/mapreduce/output/MROutput.java | 7 +- .../tez/mapreduce/output/MROutputLegacy.java | 5 ++ .../common/TestMRInputSplitDistributor.java | 8 +- .../tez/mapreduce/input/TestMultiMRInput.java | 15 ++-- .../runtime/LogicalIOProcessorRuntimeTask.java | 81 ++++++++++++-------- .../runtime/api/impl/TezInputContextImpl.java | 29 +++---- .../api/impl/TezMergedInputContextImpl.java | 20 +++-- .../tez/runtime/TestInputReadyTracker.java | 32 ++++++-- .../TestLogicalIOProcessorRuntimeTask.java | 9 ++- .../vertexmanager/InputReadyVertexManager.java | 26 ++++--- .../vertexmanager/ShuffleVertexManager.java | 44 +++++------ .../input/ConcatenatedMergedKeyValueInput.java | 7 ++ .../input/ConcatenatedMergedKeyValuesInput.java | 7 ++ .../runtime/library/input/LocalMergedInput.java | 5 ++ .../library/input/ShuffledMergedInput.java | 5 ++ .../input/ShuffledMergedInputLegacy.java | 5 ++ .../library/input/ShuffledUnorderedKVInput.java | 4 +- .../library/input/SortedGroupedMergedInput.java | 5 ++ .../library/output/LocalOnFileSorterOutput.java | 4 + .../library/output/OnFileSortedOutput.java | 6 +- .../library/output/OnFileUnorderedKVOutput.java | 12 +-- .../OnFileUnorderedPartitionedKVOutput.java | 31 +++----- .../TestInputReadyVertexManager.java | 20 ++--- .../vertexmanager/TestShuffleVertexManager.java | 18 +++-- .../input/TestSortedGroupedMergedInput.java | 43 +++++------ .../library/output/TestOnFileSortedOutput.java | 40 +++++----- .../output/TestOnFileUnorderedKVOutput.java | 12 ++- .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 13 +++- .../java/org/apache/tez/test/TestInput.java | 41 +++++----- .../java/org/apache/tez/test/TestOutput.java | 7 +- .../apache/tez/test/dag/MultiAttemptDAG.java | 60 ++++++++++----- 64 files changed, 763 insertions(+), 501 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4b93e3f..203382e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -36,6 +36,8 @@ INCOMPATIBLE CHANGES includes Hadoop libs. TEZ-1278. TezClient#waitTillReady() should not swallow interrupts TEZ-1058. Replace user land interfaces with abstract classes + TEZ-1303. Change Inputs, Outputs, InputInitializer, OutputCommitter, VertexManagerPlugin, EdgeManager + to require constructors for creation, and remove the initialize methods. Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java index 92188e5..c447ca5 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java @@ -34,7 +34,22 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent; */ @InterfaceStability.Unstable public abstract class EdgeManager { - + + private final EdgeManagerContext context; + + /** + * Crete an instance of the VertexManagerPlugin. Classes extending this one to create a + * VertexManagerPlugin, must provide the same constructor so that Tez can create an instance of + * the class at runtime. + * + * @param context the context within which this EdgeManager will run. Includes + * information like configuration which the user may have specified + * while setting up the edge. + */ + public EdgeManager(EdgeManagerContext context) { + this.context = context; + } + /** * Initializes the EdgeManager. This method is called in the following * circumstances </p> 1. when initializing an Edge Manager for the first time. @@ -42,13 +57,9 @@ public abstract class EdgeManager { * EdgeManager instance is created and setup by the user. The initialize * method will be called with the original {@link EdgeManagerContext} when the * edgeManager is replaced. - * - * @param edgeManagerContext - * the context within which this EdgeManager will run. Includes - * information like configuration which the user may have specified - * while setting up the edge. + * */ - public abstract void initialize(EdgeManagerContext edgeManagerContext); + public abstract void initialize(); /** * Get the number of physical inputs on the destination task @@ -124,5 +135,14 @@ public abstract class EdgeManager { */ public abstract int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex, int destinationFailedInputIndex); - + + /** + * Return ahe {@link org.apache.tez.dag.api.EdgeManagerContext} for this specific instance of + * the vertex manager. + * + * @return the {@link org.apache.tez.dag.api.EdgeManagerContext} for the input + */ + public EdgeManagerContext getContext() { + return this.context; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java index b628e74..6aec78e 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java @@ -33,12 +33,27 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent; */ @InterfaceStability.Unstable public abstract class VertexManagerPlugin { + + private final VertexManagerPluginContext context; + + /** + * Crete an instance of the VertexManagerPlugin. Classes extending this one to create a + * VertexManagerPlugin, must provide the same constructor so that Tez can create an instance of + * the class at runtime. + * + * @param context vertex manager plugin context which can be used to access the payload, + * vertex + * properties, etc + */ + public VertexManagerPlugin(VertexManagerPluginContext context) { + this.context = context; + } + /** * Initialize the plugin. Called when the vertex is initializing. This happens * after all source vertices and inputs have initialized - * @param context */ - public abstract void initialize(VertexManagerPluginContext context); + public abstract void initialize(); /** * Notification that the vertex is ready to start running tasks @@ -67,4 +82,14 @@ public abstract class VertexManagerPlugin { */ public abstract void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events); + + /** + * Return ahe {@link org.apache.tez.dag.api.VertexManagerPluginContext} for this specific instance of + * the vertex manager. + * + * @return the {@link org.apache.tez.dag.api.VertexManagerPluginContext} for the input + */ + public final VertexManagerPluginContext getContext() { + return this.context; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java index 7afd61d..8b6edda 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java @@ -26,31 +26,55 @@ import java.util.List; * implemented by all LogicalInputs. * * This includes default implementations of a new method for convenience. - * + * + * <code>Input</code> classes must provide a 2 argument public constructor for Tez to create the + * Input. The parameters to this constructor are 1) an instance of + * {@link org.apache.tez.runtime.api.TezInputContext} and 2) an integer which is used to + * setup the number of physical inputs that the logical input will see. + * Tez will take care of initializing and closing the Input after a {@link Processor} completes. </p> + * <p/> + * */ public abstract class AbstractLogicalInput implements LogicalInput, LogicalInputFrameworkInterface { - protected int numPhysicalInputs; - protected TezInputContext inputContext; + private final int numPhysicalInputs; + private final TezInputContext inputContext; - @Override - public void setNumPhysicalInputs(int numInputs) { - this.numPhysicalInputs = numInputs; + /** + * Constructor an instance of the LogicalInput. Classes extending this one to create a + * LogicalInput, must provide the same constructor so that Tez can create an instance of the + * class at runtime. + * + * @param inputContext the {@link org.apache.tez.runtime.api.TezInputContext} which provides + * the Input with context information within the running task. + * @param numPhysicalInputs the number of physical inputs that the logical input will + * receive. This is typically determined by Edge Routing. + */ + public AbstractLogicalInput(TezInputContext inputContext, int numPhysicalInputs) { + this.inputContext = inputContext; + this.numPhysicalInputs = numPhysicalInputs; } @Override - public List<Event> initialize(TezInputContext _inputContext) throws Exception { - this.inputContext = _inputContext; - return initialize(); - } - public abstract List<Event> initialize() throws Exception; - public int getNumPhysicalInputs() { + /** + * Get the number of physical inputs that this LogicalInput will receive. This is + * typically determined by Edge routing, and number of upstream tasks + * + * @return the number of physical inputs + */ + public final int getNumPhysicalInputs() { return numPhysicalInputs; } - public TezInputContext getContext() { + /** + * Return ahe {@link org.apache.tez.runtime.api.TezInputContext} for this specific instance of + * the LogicalInput + * + * @return the {@link org.apache.tez.runtime.api.TezInputContext} for the input + */ + public final TezInputContext getContext() { return inputContext; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java index c9ae11d..d88e57f 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java @@ -30,27 +30,44 @@ import java.util.List; */ public abstract class AbstractLogicalOutput implements LogicalOutput, LogicalOutputFrameworkInterface { - protected int numPhysicalOutputs; - protected TezOutputContext outputContext; + private final int numPhysicalOutputs; + private final TezOutputContext outputContext; - @Override - public void setNumPhysicalOutputs(int numOutputs) { - this.numPhysicalOutputs = numOutputs; + /** + * Constructor an instance of the LogicalOutput. Classes extending this one to create a + * LogicalOutput, must provide the same constructor so that Tez can create an instance of the + * class at runtime. + * + * @param outputContext the {@link org.apache.tez.runtime.api.TezOutputContext} which + * provides + * the Output with context information within the running task. + * @param numPhysicalOutputs the number of physical outputs that the logical output will + * generate. This is typically determined by Edge Routing. + */ + public AbstractLogicalOutput(TezOutputContext outputContext, int numPhysicalOutputs) { + this.outputContext = outputContext; + this.numPhysicalOutputs = numPhysicalOutputs; } - @Override - public List<Event> initialize(TezOutputContext _outputContext) throws Exception { - this.outputContext = _outputContext; - return initialize(); - } - public abstract List<Event> initialize() throws Exception; - public int getNumPhysicalOutputs() { + /** + * Get the number of physical outputs that this LogicalOutput is expected to generate. This is + * typically determined by Edge routing, and number of downstream tasks + * + * @return the number of physical outputs + */ + public final int getNumPhysicalOutputs() { return numPhysicalOutputs; } - public TezOutputContext getContext() { + /** + * Return ahe {@link org.apache.tez.runtime.api.TezOutputContext} for this specific instance of + * the LogicalOutput + * + * @return the {@link org.apache.tez.runtime.api.TezOutputContext} for the output + */ + public final TezOutputContext getContext() { return outputContext; } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java index bf5d373..5192252 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java @@ -34,10 +34,6 @@ import java.util.List; * {@link TezInputContext}.requestInitialMemory * <p/> * - * <code>Input</code> classes must have a 0 argument public constructor for Tez - * to construct the <code>Input</code>. Tez will take care of initializing and - * closing the Input after a {@link Processor} completes. </p> - * <p/> * * Inputs must also inform the framework once they are ready to be consumed. * This typically means that the Processor will not block when reading from the @@ -48,14 +44,11 @@ public interface InputFrameworkInterface { /** * Initializes the <code>Input</code>. * - * @param inputContext - * the {@link TezInputContext} * @return list of events that were generated during initialization * @throws Exception * if an error occurs */ - public List<Event> initialize(TezInputContext inputContext) - throws Exception; + public List<Event> initialize() throws Exception; /** * Handles user and system generated {@link Event}s, which typically carry http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java index 96aa256..33a0f40 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java @@ -21,13 +21,4 @@ package org.apache.tez.runtime.api; public interface LogicalInputFrameworkInterface extends InputFrameworkInterface { - /** - * Sets the number of physical inputs that this <code>LogicalInput</code> will - * receive. This will be called by the Tez framework before initializing the - * <code>LogicalInput</code> - * - * @param numInputs - * the number of physical inputs. - */ - public void setNumPhysicalInputs(int numInputs); } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java index bc110c8..b5855de 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java @@ -21,13 +21,4 @@ package org.apache.tez.runtime.api; public interface LogicalOutputFrameworkInterface extends OutputFrameworkInterface { - /** - * Sets the number of physical outputs that this <code>LogicalOutput</code> - * will receive. This will be called by the Tez framework before initializing - * the <code>LogicalOutput</code> - * - * @param numOutputs - * the number of physical outputs - */ - public void setNumPhysicalOutputs(int numOutputs); } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java index 4503418..6f7b14c 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java @@ -23,23 +23,40 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; /** - * A LogicalInput that is used to merge the data from multiple inputs and provide a + * A LogicalInput that is used to merge the data from multiple inputs and provide a * single <code>Reader</code> to read that data. - * This Input is not initialized or closed. It is only expected to provide a + * This Input is not initialized or closed. It is only expected to provide a * merged view of the real inputs. It cannot send or receive events + * <p/> + * <code>MergedLogicalInput</code> implementations must provide a 2 argument public constructor for + * Tez to create the Input. The parameters to this constructor are 1) an instance of {@link + * org.apache.tez.runtime.api.TezMergedInputContext} and 2) a list of constituent inputs. Tez will + * take care of initializing and closing the Input after a {@link Processor} completes. </p> + * <p/> */ public abstract class MergedLogicalInput implements LogicalInput { + private AtomicBoolean notifiedInputReady = new AtomicBoolean(false); private List<Input> inputs; private final AtomicBoolean isStarted = new AtomicBoolean(false); - private TezMergedInputContext context; + private final TezMergedInputContext context; - public final void initialize(List<Input> inputs, TezMergedInputContext context) { + /** + * Constructor an instance of the MergedLogicalInputs. Classes extending this one to create a + * MergedLogicalInput, must provide the same constructor so that Tez can create an instance of + * the + * class at runtime. + * + * @param context the {@link org.apache.tez.runtime.api.TezMergedInputContext} which provides + * the Input with context information within the running task. + * @param inputs the list of constituen Inputs. + */ + public MergedLogicalInput(TezMergedInputContext context, List<Input> inputs) { this.inputs = Collections.unmodifiableList(inputs); this.context = context; } - + public final List<Input> getInputs() { return inputs; } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java index 445a69a..7e21345 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java @@ -28,14 +28,26 @@ import org.apache.tez.dag.api.client.VertexStatus; @InterfaceStability.Unstable public abstract class OutputCommitter { + private final OutputCommitterContext outputCommitterContext; + + /** + * Constructor an instance of the OutputCommitter. Classes extending this one to create an + * OutputCommitter, must provide the same constructor so that Tez can create an instance of + * the class at runtime. + * + * @param committerContext committer context which can be used to access the payload, vertex + * properties, etc + */ + public OutputCommitter(OutputCommitterContext committerContext) { + this.outputCommitterContext = committerContext; + } + /** * Setup up the Output committer. * - * @param context Context of the output that is being acted upon * @throws java.lang.Exception */ - public abstract void initialize(OutputCommitterContext context) - throws Exception; + public abstract void initialize() throws Exception; /** * For the framework to setup the output during initialization. This is @@ -89,4 +101,14 @@ public abstract class OutputCommitter { public void recoverTask(int taskIndex, int previousDAGAttempt) throws Exception { } + /** + * Return ahe {@link org.apache.tez.runtime.api.OutputCommitterContext} for this specific instance of + * the Committer. + * + * @return the {@link org.apache.tez.runtime.api.OutputCommitterContext} for the input + */ + public final OutputCommitterContext getContext() { + return this.outputCommitterContext; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java index ed37f6d..873fa0c 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java @@ -33,25 +33,17 @@ import java.util.List; * {@link TezOutputContext}.requestInitialMemory * <p/> * - * <code>Output</code> classes must have a 0 argument public constructor for Tez - * to construct the <code>Output</code>. Tez will take care of initializing and - * closing the Output after a {@link Processor} completes. </p> - * <p/> - * */ public interface OutputFrameworkInterface { /** * Initializes the <code>Output</code> * - * @param outputContext - * the {@link TezOutputContext} * @return list of events that were generated during initialization * @throws Exception * if an error occurs */ - public List<Event> initialize(TezOutputContext outputContext) - throws Exception; + public List<Event> initialize() throws Exception; /** * Handles user and system generated {@link Event}s, which typically carry http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java index d73531a..cfd192e 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializer.java @@ -32,19 +32,31 @@ import org.apache.tez.runtime.api.events.RootInputInitializerEvent; @InterfaceStability.Unstable public abstract class TezRootInputInitializer { + private final TezRootInputInitializerContext initializerContext; + + /** + * Constructor an instance of the RootInputInitializer. Classes extending this one to create a + * RootInputInitializer, must provide the same constructor so that Tez can create an instance of + * the class at runtime. + * + * @param initializerContext initializer context which can be used to access the payload, vertex + * properties, etc + */ + public TezRootInputInitializer(TezRootInputInitializerContext initializerContext) { + this.initializerContext = initializerContext; + } + /** * Run the root input initializer. This is the main method where initialization takes place. If an * Initializer is written to accept events, a notification mechanism should be setup, with the * heavy lifting of processing the event being done via this method. The moment this method * returns a list of events, RootInputInitialization is considered to be complete. * - * @param inputVertexContext initializer context which can be used to access the payload, vertex - * properties, etc * @return a list of events which are eventually routed to a {@link org.apache.tez.dag.api.VertexManagerPlugin} * for routing * @throws Exception */ - public abstract List<Event> initialize(TezRootInputInitializerContext inputVertexContext) + public abstract List<Event> initialize() throws Exception; /** @@ -57,5 +69,15 @@ public abstract class TezRootInputInitializer { * @throws Exception */ public abstract void handleInputInitializerEvent(List<RootInputInitializerEvent> events) throws Exception; + + /** + * Return ahe {@link org.apache.tez.runtime.api.TezRootInputInitializerContext} for this specific instance of + * the Initializer. + * + * @return the {@link org.apache.tez.runtime.api.TezRootInputInitializerContext} for the input + */ + public final TezRootInputInitializerContext getContext() { + return this.initializerContext; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index 5c604cb..a1ae243 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -88,8 +88,12 @@ public class RootInputInitializerManager { public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs) { for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) { - TezRootInputInitializer initializer = createInitializer(input); - InitializerWrapper initializerWrapper = new InitializerWrapper(input, initializer, vertex, appContext); + + TezRootInputInitializerContext context = + new TezRootInputInitializerContextImpl(input, vertex, appContext); + TezRootInputInitializer initializer = createInitializer(input, context); + + InitializerWrapper initializerWrapper = new InitializerWrapper(input, initializer, context, vertex); initializerMap.put(input.getName(), initializerWrapper); ListenableFuture<List<Event>> future = executor .submit(new InputInitializerCallable(initializerWrapper, dagUgi)); @@ -100,20 +104,10 @@ public class RootInputInitializerManager { @VisibleForTesting protected TezRootInputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> - input) { - String className = input.getControllerDescriptor().getClassName(); - @SuppressWarnings("unchecked") - Class<? extends TezRootInputInitializer> clazz = - (Class<? extends TezRootInputInitializer>) ReflectionUtils - .getClazz(className); - TezRootInputInitializer initializer = null; - try { - initializer = clazz.newInstance(); - } catch (InstantiationException e) { - throw new TezUncheckedException("Failed to create input initializerWrapper", e); - } catch (IllegalAccessException e) { - throw new TezUncheckedException("Failed to create input initializerWrapper", e); - } + input, TezRootInputInitializerContext context) { + TezRootInputInitializer initializer = ReflectionUtils + .createClazzInstance(input.getControllerDescriptor().getClassName(), + new Class[]{TezRootInputInitializerContext.class}, new Object[]{context}); return initializer; } @@ -178,7 +172,7 @@ public class RootInputInitializerManager { LOG.info( "Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() + " on vertex " + initializerWrapper.getVertexLogIdentifier()); - return initializerWrapper.getInitializer().initialize(initializerWrapper.context); + return initializerWrapper.getInitializer().initialize(); } }); return events; @@ -234,11 +228,11 @@ public class RootInputInitializerManager { private final String vertexLogIdentifier; InitializerWrapper(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input, - TezRootInputInitializer initializer, Vertex vertex, - AppContext appContext) { + TezRootInputInitializer initializer, TezRootInputInitializerContext context, + Vertex vertex) { this.input = input; this.initializer = initializer; - this.context = new TezRootInputInitializerContextImpl(input, vertex, appContext); + this.context = context; this.vertexLogIdentifier = vertex.getLogIdentifier(); } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java index 4650faa..074b1bc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java @@ -29,15 +29,17 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent; public class BroadcastEdgeManager extends EdgeManager { - EdgeManagerContext context; + public BroadcastEdgeManager(EdgeManagerContext context) { + super(context); + } + @Override - public void initialize(EdgeManagerContext edgeManagerContext) { - this.context = edgeManagerContext; + public void initialize() { } @Override public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) { - return context.getSourceVertexNumTasks(); + return getContext().getSourceVertexNumTasks(); } @Override @@ -52,7 +54,7 @@ public class BroadcastEdgeManager extends EdgeManager { List<Integer> inputIndices = Collections.unmodifiableList(Collections.singletonList(sourceTaskIndex)); // for each task make the i-th source task as the i-th physical input - for (int i=0; i<context.getDestinationVertexNumTasks(); ++i) { + for (int i=0; i<getContext().getDestinationVertexNumTasks(); ++i) { destinationTaskAndInputIndices.put(i, inputIndices); } } @@ -63,7 +65,7 @@ public class BroadcastEdgeManager extends EdgeManager { List<Integer> inputIndices = Collections.unmodifiableList(Collections.singletonList(sourceTaskIndex)); // for each task make the i-th source task as the i-th physical input - for (int i=0; i<context.getDestinationVertexNumTasks(); ++i) { + for (int i=0; i<getContext().getDestinationVertexNumTasks(); ++i) { destinationTaskAndInputIndices.put(i, inputIndices); } } @@ -76,7 +78,7 @@ public class BroadcastEdgeManager extends EdgeManager { @Override public int getNumDestinationConsumerTasks(int sourceTaskIndex) { - return context.getDestinationVertexNumTasks(); + return getContext().getDestinationVertexNumTasks(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java index e2a9a27..0907b28 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java @@ -33,7 +33,6 @@ import org.apache.tez.dag.api.EdgeManagerContext; import org.apache.tez.dag.api.EdgeManagerDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed; @@ -114,18 +113,28 @@ public class Edge { private void createEdgeManager() { switch (edgeProperty.getDataMovementType()) { case ONE_TO_ONE: - edgeManager = new OneToOneEdgeManager(); + edgeManagerContext = new EdgeManagerContextImpl(null); + edgeManager = new OneToOneEdgeManager(edgeManagerContext); break; case BROADCAST: - edgeManager = new BroadcastEdgeManager(); + edgeManagerContext = new EdgeManagerContextImpl(null); + edgeManager = new BroadcastEdgeManager(edgeManagerContext); break; case SCATTER_GATHER: - edgeManager = new ScatterGatherEdgeManager(); + edgeManagerContext = new EdgeManagerContextImpl(null); + edgeManager = new ScatterGatherEdgeManager(edgeManagerContext); break; case CUSTOM: if (edgeProperty.getEdgeManagerDescriptor() != null) { + byte []bb = null; + if (edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null) { + bb = edgeProperty.getEdgeManagerDescriptor().getUserPayload(); + } + edgeManagerContext = new EdgeManagerContextImpl(bb); String edgeManagerClassName = edgeProperty.getEdgeManagerDescriptor().getClassName(); - edgeManager = ReflectionUtils.createClazzInstance(edgeManagerClassName); + edgeManager = ReflectionUtils + .createClazzInstance(edgeManagerClassName, new Class[]{EdgeManagerContext.class}, + new Object[]{edgeManagerContext}); } break; default: @@ -136,16 +145,8 @@ public class Edge { } public void initialize() { - byte[] bb = null; - if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM) { - if (edgeProperty.getEdgeManagerDescriptor() != null && - edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null) { - bb = edgeProperty.getEdgeManagerDescriptor().getUserPayload(); - } - } - edgeManagerContext = new EdgeManagerContextImpl(bb); if (edgeManager != null) { - edgeManager.initialize(edgeManagerContext); + edgeManager.initialize(); } destinationMetaInfo = new EventMetaData(EventProducerConsumerType.INPUT, destinationVertex.getName(), http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java index 2933d5a..b202d70 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java @@ -34,20 +34,19 @@ import com.google.common.collect.Lists; * Starts all tasks immediately on vertex start */ public class ImmediateStartVertexManager extends VertexManagerPlugin { - - private VertexManagerPluginContext context; - - ImmediateStartVertexManager() { + + public ImmediateStartVertexManager(VertexManagerPluginContext context) { + super(context); } - + @Override public void onVertexStarted(Map<String, List<Integer>> completions) { - int numTasks = context.getVertexNumTasks(context.getVertexName()); + int numTasks = getContext().getVertexNumTasks(getContext().getVertexName()); List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks); for (int i=0; i<numTasks; ++i) { scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null)); } - context.scheduleVertexTasks(scheduledTasks); + getContext().scheduleVertexTasks(scheduledTasks); } @Override @@ -55,8 +54,7 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin { } @Override - public void initialize(VertexManagerPluginContext context) { - this.context = context; + public void initialize() { } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java index 68ab0d3..db2804c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java @@ -30,10 +30,14 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent; public class OneToOneEdgeManager extends EdgeManager { List<Integer> destinationInputIndices = - Collections.unmodifiableList(Collections.singletonList(0)); + Collections.unmodifiableList(Collections.singletonList(0)); + + public OneToOneEdgeManager(EdgeManagerContext context) { + super(context); + } @Override - public void initialize(EdgeManagerContext edgeManagerContext) { + public void initialize() { // Nothing to do. } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java index 637deaa..5b773ba 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java @@ -39,22 +39,24 @@ import com.google.common.collect.Lists; public class RootInputVertexManager extends VertexManagerPlugin { - VertexManagerPluginContext context; private String configuredInputName; + public RootInputVertexManager(VertexManagerPluginContext context) { + super(context); + } + @Override - public void initialize(VertexManagerPluginContext context) { - this.context = context; + public void initialize() { } @Override public void onVertexStarted(Map<String, List<Integer>> completions) { - int numTasks = context.getVertexNumTasks(context.getVertexName()); + int numTasks = getContext().getVertexNumTasks(getContext().getVertexName()); List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks); for (int i=0; i<numTasks; ++i) { scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null)); } - context.scheduleVertexTasks(scheduledTasks); + getContext().scheduleVertexTasks(scheduledTasks); } @Override @@ -74,12 +76,12 @@ public class RootInputVertexManager extends VertexManagerPlugin { if (event instanceof RootInputConfigureVertexTasksEvent) { // No tasks should have been started yet. Checked by initial state check. Preconditions.checkState(dataInformationEventSeen == false); - Preconditions.checkState(context.getVertexNumTasks(context.getVertexName()) == -1, + Preconditions.checkState(getContext().getVertexNumTasks(getContext().getVertexName()) == -1, "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism" - + ", VertexName: " + context.getVertexName()); + + ", VertexName: " + getContext().getVertexName()); Preconditions.checkState(configuredInputName == null, "RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager" - + ", VertexName: " + context.getVertexName() + ", ConfiguredInput: " + + ", VertexName: " + getContext().getVertexName() + ", ConfiguredInput: " + configuredInputName + ", CurrentInput: " + inputName); configuredInputName = inputName; RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event; @@ -88,7 +90,7 @@ public class RootInputVertexManager extends VertexManagerPlugin { inputName, cEvent.getRootInputSpecUpdate() == null ? RootInputSpecUpdate .getDefaultSinglePhysicalInputSpecUpdate() : cEvent.getRootInputSpecUpdate()); - context.setVertexParallelism(cEvent.getNumTasks(), + getContext().setVertexParallelism(cEvent.getNumTasks(), new VertexLocationHint(cEvent.getTaskLocationHints()), null, rootInputSpecUpdate); } if (event instanceof RootInputUpdatePayloadEvent) { @@ -99,11 +101,11 @@ public class RootInputVertexManager extends VertexManagerPlugin { } else if (event instanceof RootInputDataInformationEvent) { dataInformationEventSeen = true; // # Tasks should have been set by this point. - Preconditions.checkState(context.getVertexNumTasks(context.getVertexName()) != 0); + Preconditions.checkState(getContext().getVertexNumTasks(getContext().getVertexName()) != 0); Preconditions.checkState( configuredInputName == null || configuredInputName.equals(inputName), "RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager" - + ", VertexName:" + context.getVertexName() + ", ConfiguredInput: " + + ", VertexName:" + getContext().getVertexName() + ", ConfiguredInput: " + configuredInputName + ", CurrentInput: " + inputName); configuredInputName = inputName; @@ -112,6 +114,6 @@ public class RootInputVertexManager extends VertexManagerPlugin { riEvents.add(rEvent); } } - context.addRootInputEvents(inputName, riEvents); + getContext().addRootInputEvents(inputName, riEvents); } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java index fcc4347..e5e620c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java @@ -29,20 +29,22 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent; public class ScatterGatherEdgeManager extends EdgeManager { - EdgeManagerContext context; + public ScatterGatherEdgeManager(EdgeManagerContext context) { + super(context); + } + @Override - public void initialize(EdgeManagerContext edgeManagerContext) { - this.context = edgeManagerContext; + public void initialize() { } @Override public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) { - return context.getSourceVertexNumTasks(); + return getContext().getSourceVertexNumTasks(); } @Override public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) { - return context.getDestinationVertexNumTasks(); + return getContext().getDestinationVertexNumTasks(); } @Override @@ -56,7 +58,7 @@ public class ScatterGatherEdgeManager extends EdgeManager { @Override public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) { - for (int i=0; i<context.getDestinationVertexNumTasks(); ++i) { + for (int i=0; i<getContext().getDestinationVertexNumTasks(); ++i) { destinationTaskAndInputIndices.put(i, Collections.singletonList(sourceTaskIndex)); } } @@ -69,7 +71,7 @@ public class ScatterGatherEdgeManager extends EdgeManager { @Override public int getNumDestinationConsumerTasks(int sourceTaskIndex) { - return context.getDestinationVertexNumTasks(); + return getContext().getDestinationVertexNumTasks(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 3a9f0fe..e520c2f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1662,8 +1662,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, dagUgi.doAs(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { - OutputCommitter outputCommitter = ReflectionUtils.createClazzInstance( - od.getControllerDescriptor().getClassName()); OutputCommitterContext outputCommitterContext = new OutputCommitterContextImpl(appContext.getApplicationID(), appContext.getApplicationAttemptId().getAttemptId(), @@ -1671,10 +1669,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertexName, od, vertexId.getId()); - + OutputCommitter outputCommitter = ReflectionUtils + .createClazzInstance(od.getControllerDescriptor().getClassName(), + new Class[]{OutputCommitterContext.class}, + new Object[]{outputCommitterContext}); LOG.info("Invoking committer init for output=" + outputName + ", vertexId=" + logIdentifier); - outputCommitter.initialize(outputCommitterContext); + outputCommitter.initialize(); outputCommitters.put(outputName, outputCommitter); LOG.info("Invoking committer setup for output=" + outputName + ", vertexId=" + logIdentifier); @@ -1897,24 +1898,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, if (inputsWithInitializers != null) { LOG.info("Setting vertexManager to RootInputVertexManager for " + logIdentifier); - vertexManager = new VertexManager(new RootInputVertexManager(), + vertexManager = new VertexManager( + new VertexManagerPluginDescriptor(RootInputVertexManager.class.getName()), this, appContext); } else if (hasOneToOne && !hasCustom) { LOG.info("Setting vertexManager to InputReadyVertexManager for " + logIdentifier); - vertexManager = new VertexManager(new InputReadyVertexManager(), + vertexManager = new VertexManager( + new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()), this, appContext); } else if (hasBipartite && !hasCustom) { LOG.info("Setting vertexManager to ShuffleVertexManager for " + logIdentifier); - vertexManager = new VertexManager(new ShuffleVertexManager(), + vertexManager = new VertexManager( + new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName()), this, appContext); } else { // schedule all tasks upon vertex start. Default behavior. LOG.info("Setting vertexManager to ImmediateStartVertexManager for " + logIdentifier); vertexManager = new VertexManager( - new ImmediateStartVertexManager(), this, appContext); + new VertexManagerPluginDescriptor(ImmediateStartVertexManager.class.getName()), + this, appContext); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 5ecc06b..645f440 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -205,17 +205,7 @@ public class VertexManager { return null; } } - - public VertexManager(VertexManagerPlugin plugin, - Vertex managedVertex, AppContext appContext) { - checkNotNull(plugin, "plugin is null"); - checkNotNull(managedVertex, "managedVertex is null"); - checkNotNull(appContext, "appContext is null"); - this.plugin = plugin; - this.managedVertex = managedVertex; - this.appContext = appContext; - } - + public VertexManager(VertexManagerPluginDescriptor pluginDesc, Vertex managedVertex, AppContext appContext) { checkNotNull(pluginDesc, "pluginDesc is null"); @@ -233,7 +223,8 @@ public class VertexManager { public void initialize() { pluginContext = new VertexManagerPluginContextImpl(); if (pluginDesc != null) { - plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName()); + plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(), + new Class[]{VertexManagerPluginContext.class}, new Object[]{pluginContext}); payload = DagTypeConverters.convertToTezUserPayload(pluginDesc.getUserPayload()); } if (payload == null || payload.getPayload() == null) { @@ -246,7 +237,7 @@ public class VertexManager { throw new TezUncheckedException(e); } } - plugin.initialize(pluginContext); + plugin.initialize(); } public void onVertexStarted(List<TezTaskAttemptID> completions) { http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index a50771b..c7fb4b5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -95,6 +95,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.OutputCommitter; +import org.apache.tez.runtime.api.OutputCommitterContext; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -329,8 +330,8 @@ public class TestDAGImpl { public static class TotalCountingOutputCommitter extends CountingOutputCommitter { static int totalCommitCounter = 0; - public TotalCountingOutputCommitter() { - super(); + public TotalCountingOutputCommitter(OutputCommitterContext context) { + super(context); } @Override public void commitOutput() throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java index c37cc00..e55351e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java @@ -43,8 +43,8 @@ public class TestRootInputVertexManager { doReturn("vertex1").when(context).getVertexName(); doReturn(1).when(context).getVertexNumTasks(eq("vertex1")); - RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(); - rootInputVertexManager.initialize(context); + RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(context); + rootInputVertexManager.initialize(); InputDescriptor id1 = mock(InputDescriptor.class); List<Event> events1 = new LinkedList<Event>(); @@ -74,8 +74,8 @@ public class TestRootInputVertexManager { doReturn("vertex1").when(context).getVertexName(); doReturn(-1).when(context).getVertexNumTasks(eq("vertex1")); - RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(); - rootInputVertexManager.initialize(context); + RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(context); + rootInputVertexManager.initialize(); InputDescriptor id1 = mock(InputDescriptor.class); List<Event> events1 = new LinkedList<Event>(); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 7ca8c60..f2adae8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -79,6 +79,7 @@ import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos; @@ -194,8 +195,7 @@ public class TestVertexImpl { private HistoryEventHandler historyEventHandler; private static JavaProfilerOptions javaProfilerOptions; - public static class CountingOutputCommitter extends - OutputCommitter { + public static class CountingOutputCommitter extends OutputCommitter { public int initCounter = 0; public int setupCounter = 0; @@ -205,23 +205,18 @@ public class TestVertexImpl { private boolean throwErrorOnAbort; private boolean throwRuntimeException; - public CountingOutputCommitter(boolean throwError, - boolean throwOnAbort, - boolean throwRuntimeException) { - this.throwError = throwError; - this.throwErrorOnAbort = throwOnAbort; - this.throwRuntimeException = throwRuntimeException; - } - - public CountingOutputCommitter() { - this(false, false, false); + public CountingOutputCommitter(OutputCommitterContext context) { + super(context); + this.throwError = false; + this.throwErrorOnAbort = false; + this.throwRuntimeException = false; } @Override - public void initialize(OutputCommitterContext context) throws IOException { - if (context.getUserPayload() != null) { + public void initialize() throws IOException { + if (getContext().getUserPayload() != null) { CountingOutputCommitterConfig conf = - new CountingOutputCommitterConfig(context.getUserPayload()); + new CountingOutputCommitterConfig(getContext().getUserPayload()); this.throwError = conf.throwError; this.throwErrorOnAbort = conf.throwErrorOnAbort; this.throwRuntimeException = conf.throwRuntimeException; @@ -2702,8 +2697,10 @@ public class TestVertexImpl { initAllVertices(VertexState.INITED); // fudge vertex manager so that tasks dont start running - v1.vertexManager = new VertexManager(new VertexManagerPluginForTest(), + v1.vertexManager = new VertexManager( + new VertexManagerPluginDescriptor(VertexManagerPluginForTest.class.getName()), v1, appContext); + v1.vertexManager.initialize(); startVertex(v1); Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks()); @@ -2737,8 +2734,10 @@ public class TestVertexImpl { initAllVertices(VertexState.INITED); // fudge vertex manager so that tasks dont start running - v1.vertexManager = new VertexManager(new VertexManagerPluginForTest(), + v1.vertexManager = new VertexManager( + new VertexManagerPluginDescriptor(VertexManagerPluginForTest.class.getName()), v1, appContext); + v1.vertexManager.initialize(); Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks()); Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks()); @@ -2818,7 +2817,7 @@ public class TestVertexImpl { @Test(timeout = 10000) public void testRootInputInitializerEvent() throws Exception { useCustomInitializer = true; - customInitializer = new EventHandlingRootInputInitializer(); + customInitializer = new EventHandlingRootInputInitializer(null); EventHandlingRootInputInitializer initializer = (EventHandlingRootInputInitializer) customInitializer; setupPreDagCreation(); @@ -3168,7 +3167,8 @@ public class TestVertexImpl { @Override protected TezRootInputInitializer createInitializer( - RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input) { + RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input, + TezRootInputInitializerContext context) { return presetInitializer; } } @@ -3201,11 +3201,12 @@ public class TestVertexImpl { @Override protected TezRootInputInitializer createInitializer( - RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input) { + RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input, + TezRootInputInitializerContext context) { - return new TezRootInputInitializer() { + return new TezRootInputInitializer(context) { @Override - public List<Event> initialize(TezRootInputInitializerContext inputVertexContext) throws + public List<Event> initialize() throws Exception { return null; } @@ -3393,12 +3394,14 @@ public class TestVertexImpl { @InterfaceAudience.Private public static class RootInputSpecUpdaterVertexManager extends VertexManagerPlugin { - private VertexManagerPluginContext context; private static final int NUM_TASKS = 5; + public RootInputSpecUpdaterVertexManager(VertexManagerPluginContext context) { + super(context); + } + @Override - public void initialize(VertexManagerPluginContext context) { - this.context = context; + public void initialize() { } @Override @@ -3417,7 +3420,7 @@ public class TestVertexImpl { public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) { Map<String, RootInputSpecUpdate> map = new HashMap<String, RootInputSpecUpdate>(); - if (context.getUserPayload()[0] == 0) { + if (getContext().getUserPayload()[0] == 0) { map.put("input3", RootInputSpecUpdate.createAllTaskRootInputSpecUpdate(4)); } else { List<Integer> pInputList = new LinkedList<Integer>(); @@ -3426,7 +3429,7 @@ public class TestVertexImpl { } map.put("input4", RootInputSpecUpdate.createPerTaskRootInputSpecUpdate(pInputList)); } - context.setVertexParallelism(NUM_TASKS, null, null, map); + getContext().setVertexParallelism(NUM_TASKS, null, null, map); } } @@ -3440,9 +3443,13 @@ public class TestVertexImpl { private final ReentrantLock lock = new ReentrantLock(); private final Condition eventCondition = lock.newCondition(); + public EventHandlingRootInputInitializer( + TezRootInputInitializerContext initializerContext) { + super(initializerContext); + } + @Override - public List<Event> initialize(TezRootInputInitializerContext inputVertexContext) throws - Exception { + public List<Event> initialize() throws Exception { initStarted.set(true); lock.lock(); try { http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java index 222c2f5..a777022 100644 --- a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java +++ b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java @@ -28,20 +28,14 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent; public class EdgeManagerForTest extends EdgeManager { - private EdgeManagerContext edgeManagerContext = null; private byte[] userPayload; - public static EdgeManagerForTest createInstance() { - EdgeManagerForTest e = new EdgeManagerForTest(); - return e; + public EdgeManagerForTest(EdgeManagerContext context) { + super(context); } public EdgeManagerContext getEdgeManagerContext() { - return edgeManagerContext; - } - - - public EdgeManagerForTest() { + return getContext(); } public byte[] getUserPayload() { @@ -50,9 +44,8 @@ public class EdgeManagerForTest extends EdgeManager { // Overridden methods @Override - public void initialize(EdgeManagerContext edgeManagerContext) { - this.edgeManagerContext = edgeManagerContext; - this.userPayload = edgeManagerContext.getUserPayload(); + public void initialize() { + this.userPayload = getContext().getUserPayload(); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java index be0d408..422d785 100644 --- a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java +++ b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java @@ -29,8 +29,12 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent; public class VertexManagerPluginForTest extends VertexManagerPlugin { + public VertexManagerPluginForTest(VertexManagerPluginContext context) { + super(context); + } + @Override - public void initialize(VertexManagerPluginContext context) {} + public void initialize() {} @Override public void onVertexStarted(Map<String, List<Integer>> completions) {} http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java index 9311754..1f6c8bd 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java @@ -47,30 +47,32 @@ public class MROutputCommitter extends OutputCommitter { private static final Log LOG = LogFactory.getLog(MROutputCommitter.class); - private OutputCommitterContext context; private org.apache.hadoop.mapreduce.OutputCommitter committer = null; private JobContext jobContext = null; private volatile boolean initialized = false; private JobConf jobConf = null; private boolean newApiCommitter; + public MROutputCommitter(OutputCommitterContext committerContext) { + super(committerContext); + } + @Override - public void initialize(OutputCommitterContext context) throws IOException { - byte[] userPayload = context.getOutputUserPayload(); + public void initialize() throws IOException { + byte[] userPayload = getContext().getOutputUserPayload(); if (userPayload == null) { jobConf = new JobConf(); } else { jobConf = new JobConf( - MRHelpers.createConfFromUserPayload(context.getOutputUserPayload())); + MRHelpers.createConfFromUserPayload(getContext().getOutputUserPayload())); } // Read all credentials into the credentials instance stored in JobConf. jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, - context.getDAGAttemptNumber()); - this.context = context; - committer = getOutputCommitter(this.context); - jobContext = getJobContextFromVertexContext(context); + getContext().getDAGAttemptNumber()); + committer = getOutputCommitter(getContext()); + jobContext = getJobContextFromVertexContext(getContext()); initialized = true; } @@ -196,9 +198,9 @@ public class MROutputCommitter extends OutputCommitter { throw new RuntimeException("Committer not initialized"); } TaskAttemptID taskAttemptID = new TaskAttemptID( - Long.toString(context.getApplicationId().getClusterTimestamp()) - + String.valueOf(context.getVertexIndex()), - context.getApplicationId().getId(), + Long.toString(getContext().getApplicationId().getClusterTimestamp()) + + String.valueOf(getContext().getVertexIndex()), + getContext().getApplicationId().getId(), ((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ? TaskType.MAP : TaskType.REDUCE)), taskIndex, attemptId); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java index ba3ee7a..0777c73 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java @@ -52,21 +52,21 @@ public class MRInputAMSplitGenerator extends TezRootInputInitializer { private boolean sendSerializedEvents; - private static final Log LOG = LogFactory - .getLog(MRInputAMSplitGenerator.class); + private static final Log LOG = LogFactory.getLog(MRInputAMSplitGenerator.class); - public MRInputAMSplitGenerator() { + public MRInputAMSplitGenerator( + TezRootInputInitializerContext initializerContext) { + super(initializerContext); } @Override - public List<Event> initialize(TezRootInputInitializerContext rootInputContext) - throws Exception { + public List<Event> initialize() throws Exception { Stopwatch sw = null; if (LOG.isDebugEnabled()) { sw = new Stopwatch().start(); } MRInputUserPayloadProto userPayloadProto = MRHelpers - .parseMRInputPayload(rootInputContext.getInputUserPayload()); + .parseMRInputPayload(getContext().getInputUserPayload()); if (LOG.isDebugEnabled()) { sw.stop(); LOG.debug("Time to parse MRInput payload into prot: " @@ -91,15 +91,15 @@ public class MRInputAMSplitGenerator extends TezRootInputInitializer { sw.reset().start(); } - int totalResource = rootInputContext.getTotalAvailableResource().getMemory(); - int taskResource = rootInputContext.getVertexTaskResource().getMemory(); + int totalResource = getContext().getTotalAvailableResource().getMemory(); + int taskResource = getContext().getVertexTaskResource().getMemory(); float waves = conf.getFloat( TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES, TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT); int numTasks = (int)((totalResource*waves)/taskResource); - LOG.info("Input " + rootInputContext.getInputName() + " asking for " + numTasks + LOG.info("Input " + getContext().getInputName() + " asking for " + numTasks + " tasks. Headroom: " + totalResource + " Task Resource: " + taskResource + " waves: " + waves); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java index 87d88a6..68c3f05 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java @@ -47,19 +47,19 @@ public class MRInputSplitDistributor extends TezRootInputInitializer { private boolean sendSerializedEvents; - public MRInputSplitDistributor() { - } - private MRSplitsProto splitsProto; + public MRInputSplitDistributor(TezRootInputInitializerContext initializerContext) { + super(initializerContext); + } + @Override - public List<Event> initialize(TezRootInputInitializerContext rootInputContext) - throws IOException { + public List<Event> initialize() throws IOException { Stopwatch sw = null; if (LOG.isDebugEnabled()) { sw = new Stopwatch().start(); } - MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload()); + MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(getContext().getInputUserPayload()); if (LOG.isDebugEnabled()) { sw.stop(); LOG.debug("Time to parse MRInput payload into prot: " http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index 2f7c13e..e36eb4d 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -43,6 +43,7 @@ import org.apache.tez.mapreduce.lib.MRReaderMapred; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; import org.apache.tez.runtime.library.api.KeyValueReader; @@ -77,7 +78,11 @@ public class MRInput extends MRInputBase { @Private volatile boolean splitInfoViaEvents; - + + public MRInput(TezInputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); + } + /** * Helper API to generate the user payload for the MRInput and * MRInputAMSplitGenerator (if used). The InputFormat will be invoked by Tez http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java index 9171492..d999780 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java @@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.RecordReader; +import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; @LimitedPrivate("Hive") @@ -41,6 +42,10 @@ public class MRInputLegacy extends MRInput { private ReentrantLock eventLock = new ReentrantLock(); private Condition eventCondition = eventLock.newCondition(); + public MRInputLegacy(TezInputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); + } + @Private protected void initializeInternal() throws IOException { LOG.info("MRInputLegacy deferring initialization"); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java index 8a759f8..b4df3f0 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java @@ -39,6 +39,7 @@ import org.apache.tez.mapreduce.lib.MRReaderMapred; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; import org.apache.tez.runtime.library.api.KeyValueReader; @@ -46,9 +47,8 @@ public class MultiMRInput extends MRInputBase { private static final Log LOG = LogFactory.getLog(MultiMRInput.class); - @Override - public int getNumPhysicalInputs() { - return super.getNumPhysicalInputs(); + public MultiMRInput(TezInputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); } private final ReentrantLock lock = new ReentrantLock(); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java index a6a0d83..115b45b 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java @@ -34,6 +34,7 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos; import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TezInputContext; import java.io.IOException; import java.util.List; @@ -45,6 +46,10 @@ public abstract class MRInputBase extends AbstractLogicalInput { protected JobConf jobConf; protected TezCounter inputRecordCounter; + public MRInputBase(TezInputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); + } + @Override public Reader getReader() throws Exception { return null; http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 3cca35c..ed473e7 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -48,6 +48,7 @@ import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.AbstractLogicalOutput; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TezOutputContext; import org.apache.tez.runtime.library.api.KeyValueWriter; public class MROutput extends AbstractLogicalOutput { @@ -79,7 +80,11 @@ public class MROutput extends AbstractLogicalOutput { private boolean isMapperOutput; protected OutputCommitter committer; - + + public MROutput(TezOutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + } + /** * Creates the user payload to be set on the OutputDescriptor for MROutput * @param conf Configuration for the OutputFormat http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java index 7c9f804..f6ac07f 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java @@ -19,9 +19,14 @@ package org.apache.tez.mapreduce.output; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.tez.runtime.api.TezOutputContext; public class MROutputLegacy extends MROutput { + public MROutputLegacy(TezOutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + } + public OutputCommitter getOutputCommitter() { return committer; } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java index 84c945e..5573d77 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java @@ -68,9 +68,9 @@ public class TestMRInputSplitDistributor { byte[] userPayload = payloadProto.build().toByteArray(); TezRootInputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload); - MRInputSplitDistributor splitDist = new MRInputSplitDistributor(); + MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context); - List<Event> events = splitDist.initialize(context); + List<Event> events = splitDist.initialize(); assertEquals(3, events.size()); assertTrue(events.get(0) instanceof RootInputUpdatePayloadEvent); @@ -116,9 +116,9 @@ public class TestMRInputSplitDistributor { byte[] userPayload = payloadProto.build().toByteArray(); TezRootInputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload); - MRInputSplitDistributor splitDist = new MRInputSplitDistributor(); + MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context); - List<Event> events = splitDist.initialize(context); + List<Event> events = splitDist.initialize(); assertEquals(3, events.size()); assertTrue(events.get(0) instanceof RootInputUpdatePayloadEvent); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java index 3aa5ddc..57501e3 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java @@ -96,9 +96,8 @@ public class TestMultiMRInput { TezInputContext inputContext = createTezInputContext(payload); - MultiMRInput input = new MultiMRInput(); - input.setNumPhysicalInputs(1); - input.initialize(inputContext); + MultiMRInput input = new MultiMRInput(inputContext, 1); + input.initialize(); List<Event> eventList = new ArrayList<Event>(); String file1 = "file1"; @@ -147,9 +146,8 @@ public class TestMultiMRInput { TezInputContext inputContext = createTezInputContext(payload); - MultiMRInput input = new MultiMRInput(); - input.setNumPhysicalInputs(2); - input.initialize(inputContext); + MultiMRInput input = new MultiMRInput(inputContext, 2); + input.initialize(); List<Event> eventList = new ArrayList<Event>(); LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>(); @@ -212,9 +210,8 @@ public class TestMultiMRInput { TezInputContext inputContext = createTezInputContext(payload); - MultiMRInput input = new MultiMRInput(); - input.setNumPhysicalInputs(1); - input.initialize(inputContext); + MultiMRInput input = new MultiMRInput(inputContext, 1); + input.initialize(); List<Event> eventList = new ArrayList<Event>(); String file1 = "file1";
