Adrian,

Sorry for the late reply. I'm out until second week in January. Not
being completely familiar with how full cross product edge was
implemented as part of TEZ-2104, I would want to compare the solutions
you present and weigh the pro/cons. I can definitely see how your two
solutions differ especially with messaging and amount of effort.

https://issues.apache.org/jira/browse/TEZ-2104

On Thu, Dec 6, 2018 at 7:58 PM Adrian Nicoara
<[email protected]> wrote:
>
> Hello Tez devs,
>
> I will start with an example of vertex output full cross join, and then 
> circle back to the root case:
>
> V1[2]       V2[3]
>  |           |
> DME1(2)     DME2(3)
>     \      /
>      V3[2x3]
>
> Assume we have:
> 1. Producer vertex V1, with 2 tasks, in which every task generates one 
> physical output.
> 2. Once the tasks of V1 complete, they each raise a DataMovementEvent, for a 
> total of 2 DMEs, called DME1.
> 3. Producer vertex V2, with 3 tasks, in which every task generates one 
> physical output.
> 4. Once the tasks of V2 complete, they each raise a DataMovementEvent, for a 
> total of 3 DMEs, called DME2.
> 5. Consumer vertex V3, with 6 tasks (2x3), one for each of the V1xV2 output 
> combinations.
> 6. It is the responsibility of the EdgeManager on the edge V1->V3, to take 
> the DME1 events {0} and {1}, and broadcast them to the tasks {0, *} and {1, 
> *} respectively in V3 (assuming we view V3 as a 2D array).
> 7. It is the responsibility of the EdgeManager on the edge V2->V3, to take 
> the DME2 events {0}, {1}, and {2} and broadcast them to the tasks {*, 0}, {*, 
> 1} and {*, 2} respectively in V3 (assuming we view V3 as a 2D array).
>
> Now, consider the example of a full cross join of two root inputs:
>
> R1[2]       R2[3]
>  |           |
> IDIE1(?)    IDIE2(?)
>      \     /
>       V3[2x3]
>
> Then we have:
> 1. Root input R1, with 2 physical partitions.
> 2. Root input R2, with 3 physical partitions.
> 3. A full cross join, with a task setup to process 1 physical partition from 
> each input, would result in V3 having again 6 tasks.
>
> Problem 1 - configuring the number of tasks in V3, and how many 
> InputDataInformationEvents to send:
>
> At a first glance, each InputInitializer would have to result in 6 
> InputDataInformationEvents, one for each task.
> This is in contrast with the DataMovementEvent model, where the number of 
> **generated/stored** (as opposed to transmitted downstream) events is 
> relative to the number of physical outputs.
> Furthermore, configuring the vertex V3 requires global information (all of 
> the inputs to the vertex), while an InputInitializer functions on local 
> information (the respective root input).
> R1 then takes a dependency on R2, and R2 takes a dependency on R1.
> The delay that the InputInitializers experience in obtaining the global 
> information can be made larger, by adding another input from a vertex V4, and 
> delaying configuration up until V4 is configured.
>
> Problem 2 - the VertexManagerPlugin doesn't have a proper chance of mutating 
> the InputInitializer events
>
> While there is VertexManagerPluginContext#addRootInputEvents:
> https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java#L259-L272
> that allows a VertexManagerPlugin to add root input events, that API can only 
> be used within VertexManagerPlugin#onRootVertexInitialized:
> https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java#L125-L133
> Because the queued events get processed by the 
> VertexManagerRootInputInitializedCallback:
> https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java#L589-L591
> which runs independently of the calls the VertexManagerPlugin does to 
> VertexManagerPluginContext#addRootInputEvents.
>
> So the only chance of acting and mutating the events is during that one 
> function call, at which point the VertexManagerPlugin might not have all the 
> required global information to actually do the mutation.
>
> Solution 1:
> Modify VertexManagerPluginContext#addRootInputEvents to be dependent on the 
> VertexManagerPluginContext#vertexReconfigurationPlanned and 
> VertexManagerPluginContext#doneReconfiguringVertex calls, and be able to send 
> a VertexEventInputDataInformation event to the VertexImpl, similar to how the 
> VertexManagerPluginContext#reconfigureVertex calls work; or overload a 
> #reconfigureVertex function to take root input events also.
>
> Benefits:
> 1. Small contained change.
> 2. Delegates responsibility of configuration to the user code.
>
> Drawbacks:
> 1. Routing, which is usually delegated/implemented in the EdgeManagerPlugin 
> layer must now be done in the VertexManagerPlugin for root inputs only.
> 2. Duplicate events must be stored.
>
> Solution 2:
> Treat data sources as properties of the graph rather than as properties of a 
> Vertex. To that end, they are similar to a Vertex that directly completes 
> everything, once InputInitializer completion happens.
>
> Benefits:
> 1. The same event movement model is used across root inputs/vertex inputs. 
> This makes for more uniform code.
> 2. The amount of stored events is proportional to the number of "physical 
> partitions" that the data source represents, rather than the number of times 
> these partitions/events have to be replicated.
>
> Drawbacks:
> 1. Fundamental change to the design of the Tez graph.
>
> Appreciate any suggestions.
>
> Thanks,
> Adrian

Reply via email to