Hello Tez devs,

I will start with an example of vertex output full cross join, and then circle 
back to the root case:

V1[2]       V2[3]
 |           |
DME1(2)     DME2(3)
    \      /
     V3[2x3]

Assume we have:
1. Producer vertex V1, with 2 tasks, in which every task generates one physical 
output.
2. Once the tasks of V1 complete, they each raise a DataMovementEvent, for a 
total of 2 DMEs, called DME1.
3. Producer vertex V2, with 3 tasks, in which every task generates one physical 
output.
4. Once the tasks of V2 complete, they each raise a DataMovementEvent, for a 
total of 3 DMEs, called DME2.
5. Consumer vertex V3, with 6 tasks (2x3), one for each of the V1xV2 output 
combinations.
6. It is the responsibility of the EdgeManager on the edge V1->V3, to take the 
DME1 events {0} and {1}, and broadcast them to the tasks {0, *} and {1, *} 
respectively in V3 (assuming we view V3 as a 2D array).
7. It is the responsibility of the EdgeManager on the edge V2->V3, to take the 
DME2 events {0}, {1}, and {2} and broadcast them to the tasks {*, 0}, {*, 1} 
and {*, 2} respectively in V3 (assuming we view V3 as a 2D array).

Now, consider the example of a full cross join of two root inputs:

R1[2]       R2[3]
 |           |    
IDIE1(?)    IDIE2(?)
     \     /
      V3[2x3]

Then we have:
1. Root input R1, with 2 physical partitions.
2. Root input R2, with 3 physical partitions.
3. A full cross join, with a task setup to process 1 physical partition from 
each input, would result in V3 having again 6 tasks.

Problem 1 - configuring the number of tasks in V3, and how many 
InputDataInformationEvents to send:

At a first glance, each InputInitializer would have to result in 6 
InputDataInformationEvents, one for each task.
This is in contrast with the DataMovementEvent model, where the number of 
**generated/stored** (as opposed to transmitted downstream) events is relative 
to the number of physical outputs.
Furthermore, configuring the vertex V3 requires global information (all of the 
inputs to the vertex), while an InputInitializer functions on local information 
(the respective root input).
R1 then takes a dependency on R2, and R2 takes a dependency on R1.
The delay that the InputInitializers experience in obtaining the global 
information can be made larger, by adding another input from a vertex V4, and 
delaying configuration up until V4 is configured.

Problem 2 - the VertexManagerPlugin doesn't have a proper chance of mutating 
the InputInitializer events

While there is VertexManagerPluginContext#addRootInputEvents:
https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java#L259-L272
that allows a VertexManagerPlugin to add root input events, that API can only 
be used within VertexManagerPlugin#onRootVertexInitialized:
https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java#L125-L133
Because the queued events get processed by the 
VertexManagerRootInputInitializedCallback:
https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java#L589-L591
which runs independently of the calls the VertexManagerPlugin does to 
VertexManagerPluginContext#addRootInputEvents.

So the only chance of acting and mutating the events is during that one 
function call, at which point the VertexManagerPlugin might not have all the 
required global information to actually do the mutation.

Solution 1:
Modify VertexManagerPluginContext#addRootInputEvents to be dependent on the 
VertexManagerPluginContext#vertexReconfigurationPlanned and 
VertexManagerPluginContext#doneReconfiguringVertex calls, and be able to send a 
VertexEventInputDataInformation event to the VertexImpl, similar to how the 
VertexManagerPluginContext#reconfigureVertex calls work; or overload a 
#reconfigureVertex function to take root input events also.

Benefits:
1. Small contained change.
2. Delegates responsibility of configuration to the user code.

Drawbacks:
1. Routing, which is usually delegated/implemented in the EdgeManagerPlugin 
layer must now be done in the VertexManagerPlugin for root inputs only.
2. Duplicate events must be stored.

Solution 2:
Treat data sources as properties of the graph rather than as properties of a 
Vertex. To that end, they are similar to a Vertex that directly completes 
everything, once InputInitializer completion happens.

Benefits:
1. The same event movement model is used across root inputs/vertex inputs. This 
makes for more uniform code.
2. The amount of stored events is proportional to the number of "physical 
partitions" that the data source represents, rather than the number of times 
these partitions/events have to be replicated.

Drawbacks:
1. Fundamental change to the design of the Tez graph.

Appreciate any suggestions.

Thanks,
Adrian

Reply via email to