Hello Tez devs,
I will start with an example of vertex output full cross join, and then circle
back to the root case:
V1[2] V2[3]
| |
DME1(2) DME2(3)
\ /
V3[2x3]
Assume we have:
1. Producer vertex V1, with 2 tasks, in which every task generates one physical
output.
2. Once the tasks of V1 complete, they each raise a DataMovementEvent, for a
total of 2 DMEs, called DME1.
3. Producer vertex V2, with 3 tasks, in which every task generates one physical
output.
4. Once the tasks of V2 complete, they each raise a DataMovementEvent, for a
total of 3 DMEs, called DME2.
5. Consumer vertex V3, with 6 tasks (2x3), one for each of the V1xV2 output
combinations.
6. It is the responsibility of the EdgeManager on the edge V1->V3, to take the
DME1 events {0} and {1}, and broadcast them to the tasks {0, *} and {1, *}
respectively in V3 (assuming we view V3 as a 2D array).
7. It is the responsibility of the EdgeManager on the edge V2->V3, to take the
DME2 events {0}, {1}, and {2} and broadcast them to the tasks {*, 0}, {*, 1}
and {*, 2} respectively in V3 (assuming we view V3 as a 2D array).
Now, consider the example of a full cross join of two root inputs:
R1[2] R2[3]
| |
IDIE1(?) IDIE2(?)
\ /
V3[2x3]
Then we have:
1. Root input R1, with 2 physical partitions.
2. Root input R2, with 3 physical partitions.
3. A full cross join, with a task setup to process 1 physical partition from
each input, would result in V3 having again 6 tasks.
Problem 1 - configuring the number of tasks in V3, and how many
InputDataInformationEvents to send:
At a first glance, each InputInitializer would have to result in 6
InputDataInformationEvents, one for each task.
This is in contrast with the DataMovementEvent model, where the number of
**generated/stored** (as opposed to transmitted downstream) events is relative
to the number of physical outputs.
Furthermore, configuring the vertex V3 requires global information (all of the
inputs to the vertex), while an InputInitializer functions on local information
(the respective root input).
R1 then takes a dependency on R2, and R2 takes a dependency on R1.
The delay that the InputInitializers experience in obtaining the global
information can be made larger, by adding another input from a vertex V4, and
delaying configuration up until V4 is configured.
Problem 2 - the VertexManagerPlugin doesn't have a proper chance of mutating
the InputInitializer events
While there is VertexManagerPluginContext#addRootInputEvents:
https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java#L259-L272
that allows a VertexManagerPlugin to add root input events, that API can only
be used within VertexManagerPlugin#onRootVertexInitialized:
https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java#L125-L133
Because the queued events get processed by the
VertexManagerRootInputInitializedCallback:
https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java#L589-L591
which runs independently of the calls the VertexManagerPlugin does to
VertexManagerPluginContext#addRootInputEvents.
So the only chance of acting and mutating the events is during that one
function call, at which point the VertexManagerPlugin might not have all the
required global information to actually do the mutation.
Solution 1:
Modify VertexManagerPluginContext#addRootInputEvents to be dependent on the
VertexManagerPluginContext#vertexReconfigurationPlanned and
VertexManagerPluginContext#doneReconfiguringVertex calls, and be able to send a
VertexEventInputDataInformation event to the VertexImpl, similar to how the
VertexManagerPluginContext#reconfigureVertex calls work; or overload a
#reconfigureVertex function to take root input events also.
Benefits:
1. Small contained change.
2. Delegates responsibility of configuration to the user code.
Drawbacks:
1. Routing, which is usually delegated/implemented in the EdgeManagerPlugin
layer must now be done in the VertexManagerPlugin for root inputs only.
2. Duplicate events must be stored.
Solution 2:
Treat data sources as properties of the graph rather than as properties of a
Vertex. To that end, they are similar to a Vertex that directly completes
everything, once InputInitializer completion happens.
Benefits:
1. The same event movement model is used across root inputs/vertex inputs. This
makes for more uniform code.
2. The amount of stored events is proportional to the number of "physical
partitions" that the data source represents, rather than the number of times
these partitions/events have to be replicated.
Drawbacks:
1. Fundamental change to the design of the Tez graph.
Appreciate any suggestions.
Thanks,
Adrian