Adrian, Sorry for the late reply. I'm out until second week in January. Not being completely familiar with how full cross product edge was implemented as part of TEZ-2104, I would want to compare the solutions you present and weigh the pro/cons. I can definitely see how your two solutions differ especially with messaging and amount of effort.
https://issues.apache.org/jira/browse/TEZ-2104 On Thu, Dec 6, 2018 at 7:58 PM Adrian Nicoara <[email protected]> wrote: > > Hello Tez devs, > > I will start with an example of vertex output full cross join, and then > circle back to the root case: > > V1[2] V2[3] > | | > DME1(2) DME2(3) > \ / > V3[2x3] > > Assume we have: > 1. Producer vertex V1, with 2 tasks, in which every task generates one > physical output. > 2. Once the tasks of V1 complete, they each raise a DataMovementEvent, for a > total of 2 DMEs, called DME1. > 3. Producer vertex V2, with 3 tasks, in which every task generates one > physical output. > 4. Once the tasks of V2 complete, they each raise a DataMovementEvent, for a > total of 3 DMEs, called DME2. > 5. Consumer vertex V3, with 6 tasks (2x3), one for each of the V1xV2 output > combinations. > 6. It is the responsibility of the EdgeManager on the edge V1->V3, to take > the DME1 events {0} and {1}, and broadcast them to the tasks {0, *} and {1, > *} respectively in V3 (assuming we view V3 as a 2D array). > 7. It is the responsibility of the EdgeManager on the edge V2->V3, to take > the DME2 events {0}, {1}, and {2} and broadcast them to the tasks {*, 0}, {*, > 1} and {*, 2} respectively in V3 (assuming we view V3 as a 2D array). > > Now, consider the example of a full cross join of two root inputs: > > R1[2] R2[3] > | | > IDIE1(?) IDIE2(?) > \ / > V3[2x3] > > Then we have: > 1. Root input R1, with 2 physical partitions. > 2. Root input R2, with 3 physical partitions. > 3. A full cross join, with a task setup to process 1 physical partition from > each input, would result in V3 having again 6 tasks. > > Problem 1 - configuring the number of tasks in V3, and how many > InputDataInformationEvents to send: > > At a first glance, each InputInitializer would have to result in 6 > InputDataInformationEvents, one for each task. > This is in contrast with the DataMovementEvent model, where the number of > **generated/stored** (as opposed to transmitted downstream) events is > relative to the number of physical outputs. > Furthermore, configuring the vertex V3 requires global information (all of > the inputs to the vertex), while an InputInitializer functions on local > information (the respective root input). > R1 then takes a dependency on R2, and R2 takes a dependency on R1. > The delay that the InputInitializers experience in obtaining the global > information can be made larger, by adding another input from a vertex V4, and > delaying configuration up until V4 is configured. > > Problem 2 - the VertexManagerPlugin doesn't have a proper chance of mutating > the InputInitializer events > > While there is VertexManagerPluginContext#addRootInputEvents: > https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java#L259-L272 > that allows a VertexManagerPlugin to add root input events, that API can only > be used within VertexManagerPlugin#onRootVertexInitialized: > https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java#L125-L133 > Because the queued events get processed by the > VertexManagerRootInputInitializedCallback: > https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java#L589-L591 > which runs independently of the calls the VertexManagerPlugin does to > VertexManagerPluginContext#addRootInputEvents. > > So the only chance of acting and mutating the events is during that one > function call, at which point the VertexManagerPlugin might not have all the > required global information to actually do the mutation. > > Solution 1: > Modify VertexManagerPluginContext#addRootInputEvents to be dependent on the > VertexManagerPluginContext#vertexReconfigurationPlanned and > VertexManagerPluginContext#doneReconfiguringVertex calls, and be able to send > a VertexEventInputDataInformation event to the VertexImpl, similar to how the > VertexManagerPluginContext#reconfigureVertex calls work; or overload a > #reconfigureVertex function to take root input events also. > > Benefits: > 1. Small contained change. > 2. Delegates responsibility of configuration to the user code. > > Drawbacks: > 1. Routing, which is usually delegated/implemented in the EdgeManagerPlugin > layer must now be done in the VertexManagerPlugin for root inputs only. > 2. Duplicate events must be stored. > > Solution 2: > Treat data sources as properties of the graph rather than as properties of a > Vertex. To that end, they are similar to a Vertex that directly completes > everything, once InputInitializer completion happens. > > Benefits: > 1. The same event movement model is used across root inputs/vertex inputs. > This makes for more uniform code. > 2. The amount of stored events is proportional to the number of "physical > partitions" that the data source represents, rather than the number of times > these partitions/events have to be replicated. > > Drawbacks: > 1. Fundamental change to the design of the Tez graph. > > Appreciate any suggestions. > > Thanks, > Adrian
