Over the last couple years, I’ve noticed a trend toward specialized logical
plans and increasing use of RunnableCommand nodes. DataSourceV2 is
currently on the same path, and I’d like to make the case that we should
avoid these practices.

I think it’s helpful to consider an example I’ve been watching over time,
InsertIntoTable. I might not get all the details exactly right here, but
the overall story should be familiar.

When I first started looking at Spark’s SQL engine, here’s roughly how an
insert was supposed to go (the plans have been changed to be simpler):

   - SQL is parsed and produces the parsed plan:
   InsertIntoTable(table=UnresolvedRelation(name), data=Project(a,b,d,
   from=...))
   - Analyzer rules: unresolved tables are looked up and replaced:
   InsertIntoTable(table=HiveRelation(name, schema=a,b,c),
   data=Project(a,b,d, from=...))
   -

   Analyzer rules: columns are checked against the table and type
   inconsistencies fixed:
   InsertIntoTable(table=HiveRelation(name, schema=a,b,c),
   data=Project(a,b,cast(d as string), from=Project(...)))
   -

   The analyzer reaches a fixed point
   - The planner asserts that the plan is resolved, which verifies that all
   references are satisfied, including a check in InsertIntoTable that the
   table’s schema is compatible with the logical plan that produces data.
   - The optimizer runs: among other changes, the double-project that was
   inserted by analysis is rewritten
   - The planner converts InsertIntoTable(HiveRelation, _) to what we’d now
   call InsertIntoHiveTableExec

Over time, this basic structure changed in small but important ways:

   - Data sources used a similar, but different pre-processing rule to line
   up the incoming data’s logical plan with a table
   - Because there were multiple definitions of “resolved” implemented in
   InsertIntoTable#resolved (data sources and Hive), it was hard to
   maintain the function
   - InsertIntoTable was replaced on the data source side with
   InsertIntoDataSourceCommand and the resolved problem went away
   - InsertIntoTable was changed so that resolved is never true. It must be
   replaced with a command node before the optimizer runs for all write paths.
   - InsertIntoDataSourceCommand never added a similar resolved check, so
   there is no validation that the pre-processing rules ran, or produced a
   correct plan

What we ended up doing, through a series of fairly reasonable changes,
introduced little inconsistencies between different write paths in SQL,
even though they start with the same logical plan. Although the plan is
identical to begin with, specialized nodes are inserted early, and then
analysis and optimizer rules are built around those specialized nodes.
Basically, physical plan details leaked into the logical plan.

This has a negative unintended consequence for DataSourceV2. The new API
uses specialized plan nodes from the start, when DataFrameWriter creates
the plan. But there are no rules that are ready to match DSv2 plans; we
need to think of all the rules that should match and then go add
expressions so they are run. There is no validation before the optimizer
runs that ensures the data frame that will be written has the same schema
or even the same number of columns as the target table.

Another change was to put implementation in the RunnableCommand logical
plan nodes, which are added to the physical plan using a generic
ExecutedCommandExec. This also makes some sense: why have
InsertIntoDataSourceCommand and InsertIntoDataSourceCommandExec when one is
just a place-holder?

The unintended consequence of using RunnableCommand is that we can now
accidentally add logical plans into physical plans. This is what’s
happening when the Spark UI only shows a single node and no physical plan
details. This broke SQL shown in the UI, and metrics because they weren’t
passed up the physical plan tree (because it was split with a logical plan).

I can see why we’ve gotten to this place, but I think now is the time to
fix this technical debt. To get an idea why, I’ll present our alternate
history:

Back when the data source write path started using a different pre-process
rule, we started maintaining our own set of rules that used the same
resolved method and the same InsertIntoTable plan. When we pick up new
versions, we change the analyzer rules so that we use the same rules for
all InsertIntoTable regardless of the target table’s write path. As a
result, when we have consistent behavior to match table columns to data
frame columns across all write paths. And when we backported DataSourceV2,
all I needed was to change the plan to
InsertIntoTable(table=DataSourceV2Relation(),
data=...). After that, column resolution worked just like Hive or
DataSource tables because all of our rules that match InsertIntoTable
started matching the new tables, too.

I know that some of the fragmentation has gotten a lot better lately, but
we still have to add rules by hand. Case in point:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L386-L395

So here are my recommendations for moving forward, with DataSourceV2 as a
starting point:

   1. Use well-defined logical plan nodes for all high-level operations:
   insert, create, CTAS, overwrite table, etc.
   2. Use rules that match on these high-level plan nodes, so that it isn’t
   necessary to create rules to match each eventual code path individually
   3. Define Spark’s behavior for these logical plan nodes. Physical nodes
   should implement that behavior, but all CREATE TABLE OVERWRITE should
   (eventually) make the same guarantees.
   4. Specialize implementation when creating a physical plan, not logical
   plans.

I realize this is really long, but I’d like to hear thoughts about this.
I’m sure I’ve left out some additional context, but I think the main idea
here is solid: lets standardize logical plans for more consistent behavior
and easier maintenance.

rb
​
-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to