Over the last couple years, I’ve noticed a trend toward specialized logical plans and increasing use of RunnableCommand nodes. DataSourceV2 is currently on the same path, and I’d like to make the case that we should avoid these practices.
I think it’s helpful to consider an example I’ve been watching over time, InsertIntoTable. I might not get all the details exactly right here, but the overall story should be familiar. When I first started looking at Spark’s SQL engine, here’s roughly how an insert was supposed to go (the plans have been changed to be simpler): - SQL is parsed and produces the parsed plan: InsertIntoTable(table=UnresolvedRelation(name), data=Project(a,b,d, from=...)) - Analyzer rules: unresolved tables are looked up and replaced: InsertIntoTable(table=HiveRelation(name, schema=a,b,c), data=Project(a,b,d, from=...)) - Analyzer rules: columns are checked against the table and type inconsistencies fixed: InsertIntoTable(table=HiveRelation(name, schema=a,b,c), data=Project(a,b,cast(d as string), from=Project(...))) - The analyzer reaches a fixed point - The planner asserts that the plan is resolved, which verifies that all references are satisfied, including a check in InsertIntoTable that the table’s schema is compatible with the logical plan that produces data. - The optimizer runs: among other changes, the double-project that was inserted by analysis is rewritten - The planner converts InsertIntoTable(HiveRelation, _) to what we’d now call InsertIntoHiveTableExec Over time, this basic structure changed in small but important ways: - Data sources used a similar, but different pre-processing rule to line up the incoming data’s logical plan with a table - Because there were multiple definitions of “resolved” implemented in InsertIntoTable#resolved (data sources and Hive), it was hard to maintain the function - InsertIntoTable was replaced on the data source side with InsertIntoDataSourceCommand and the resolved problem went away - InsertIntoTable was changed so that resolved is never true. It must be replaced with a command node before the optimizer runs for all write paths. - InsertIntoDataSourceCommand never added a similar resolved check, so there is no validation that the pre-processing rules ran, or produced a correct plan What we ended up doing, through a series of fairly reasonable changes, introduced little inconsistencies between different write paths in SQL, even though they start with the same logical plan. Although the plan is identical to begin with, specialized nodes are inserted early, and then analysis and optimizer rules are built around those specialized nodes. Basically, physical plan details leaked into the logical plan. This has a negative unintended consequence for DataSourceV2. The new API uses specialized plan nodes from the start, when DataFrameWriter creates the plan. But there are no rules that are ready to match DSv2 plans; we need to think of all the rules that should match and then go add expressions so they are run. There is no validation before the optimizer runs that ensures the data frame that will be written has the same schema or even the same number of columns as the target table. Another change was to put implementation in the RunnableCommand logical plan nodes, which are added to the physical plan using a generic ExecutedCommandExec. This also makes some sense: why have InsertIntoDataSourceCommand and InsertIntoDataSourceCommandExec when one is just a place-holder? The unintended consequence of using RunnableCommand is that we can now accidentally add logical plans into physical plans. This is what’s happening when the Spark UI only shows a single node and no physical plan details. This broke SQL shown in the UI, and metrics because they weren’t passed up the physical plan tree (because it was split with a logical plan). I can see why we’ve gotten to this place, but I think now is the time to fix this technical debt. To get an idea why, I’ll present our alternate history: Back when the data source write path started using a different pre-process rule, we started maintaining our own set of rules that used the same resolved method and the same InsertIntoTable plan. When we pick up new versions, we change the analyzer rules so that we use the same rules for all InsertIntoTable regardless of the target table’s write path. As a result, when we have consistent behavior to match table columns to data frame columns across all write paths. And when we backported DataSourceV2, all I needed was to change the plan to InsertIntoTable(table=DataSourceV2Relation(), data=...). After that, column resolution worked just like Hive or DataSource tables because all of our rules that match InsertIntoTable started matching the new tables, too. I know that some of the fragmentation has gotten a lot better lately, but we still have to add rules by hand. Case in point: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L386-L395 So here are my recommendations for moving forward, with DataSourceV2 as a starting point: 1. Use well-defined logical plan nodes for all high-level operations: insert, create, CTAS, overwrite table, etc. 2. Use rules that match on these high-level plan nodes, so that it isn’t necessary to create rules to match each eventual code path individually 3. Define Spark’s behavior for these logical plan nodes. Physical nodes should implement that behavior, but all CREATE TABLE OVERWRITE should (eventually) make the same guarantees. 4. Specialize implementation when creating a physical plan, not logical plans. I realize this is really long, but I’d like to hear thoughts about this. I’m sure I’ve left out some additional context, but I think the main idea here is solid: lets standardize logical plans for more consistent behavior and easier maintenance. rb -- Ryan Blue Software Engineer Netflix