So the obvious thing I was missing is that the analyzer has already resolved attributes by the time the optimizer runs, so the references in the filter / projection need to be fixed up to match the children.
Created a PR, let me know if there's a better way to do it. I'll see about testing performance against some actual data sets. On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger <c...@koeninger.org> wrote: > Ok, so looking at the optimizer code for the first time and trying the > simplest rule that could possibly work, > > object UnionPushdown extends Rule[LogicalPlan] { > def apply(plan: LogicalPlan): LogicalPlan = plan transform { > // Push down filter into > union > case f @ Filter(condition, u @ Union(left, right)) => > > u.copy(left = f.copy(child = left), right = f.copy(child = > right)) > > > // Push down projection into > union > case p @ Project(projectList, u @ Union(left, right)) => > u.copy(left = p.copy(child = left), right = p.copy(child = > right)) > > } > > } > > > If I try manually applying that rule to a logical plan in the repl, it > produces the query shape I'd expect, and executing that plan results in > parquet pushdowns as I'd expect. > > But adding those cases to ColumnPruning results in a runtime exception > (below) > > I can keep digging, but it seems like I'm missing some obvious initial > context around naming of attributes. If you can provide any pointers to > speed me on my way I'd appreciate it. > > > java.lang.AssertionError: assertion failed: ArrayBuffer() + ArrayBuffer() > != WrappedArray(name#6, age#7), List(name#9, age#10, phones#11) > at scala.Predef$.assert(Predef.scala:179) > at > org.apache.spark.sql.parquet.ParquetTableScan.<init>(ParquetTableOperations.scala:75) > at > org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234) > at > org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234) > at > org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367) > at > org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) > at > org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282) > at > org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) > at > org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) > at > org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) > at > org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) > at > org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) > at > org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:431) > > > > > On Tue, Sep 9, 2014 at 3:02 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> What Patrick said is correct. Two other points: >> - In the 1.2 release we are hoping to beef up the support for working >> with partitioned parquet independent of the metastore. >> - You can actually do operations like INSERT INTO for parquet tables to >> add data. This creates new parquet files for each insertion. This will >> break if there are multiple concurrent writers to the same table. >> >> On Tue, Sep 9, 2014 at 12:09 PM, Patrick Wendell <pwend...@gmail.com> >> wrote: >> >>> I think what Michael means is people often use this to read existing >>> partitioned Parquet tables that are defined in a Hive metastore rather >>> than data generated directly from within Spark and then reading it >>> back as a table. I'd expect the latter case to become more common, but >>> for now most users connect to an existing metastore. >>> >>> I think you could go this route by creating a partitioned external >>> table based on the on-disk layout you create. The downside is that >>> you'd have to go through a hive metastore whereas what you are doing >>> now doesn't need hive at all. >>> >>> We should also just fix the case you are mentioning where a union is >>> used directly from within spark. But that's the context. >>> >>> - Patrick >>> >>> On Tue, Sep 9, 2014 at 12:01 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> > Maybe I'm missing something, I thought parquet was generally a >>> write-once >>> > format and the sqlContext interface to it seems that way as well. >>> > >>> > d1.saveAsParquetFile("/foo/d1") >>> > >>> > // another day, another table, with same schema >>> > d2.saveAsParquetFile("/foo/d2") >>> > >>> > Will give a directory structure like >>> > >>> > /foo/d1/_metadata >>> > /foo/d1/part-r-1.parquet >>> > /foo/d1/part-r-2.parquet >>> > /foo/d1/_SUCCESS >>> > >>> > /foo/d2/_metadata >>> > /foo/d2/part-r-1.parquet >>> > /foo/d2/part-r-2.parquet >>> > /foo/d2/_SUCCESS >>> > >>> > // ParquetFileReader will fail, because /foo/d1 is a directory, not a >>> > parquet partition >>> > sqlContext.parquetFile("/foo") >>> > >>> > // works, but has the noted lack of pushdown >>> > >>> sqlContext.parquetFile("/foo/d1").unionAll(sqlContext.parquetFile("/foo/d2")) >>> > >>> > >>> > Is there another alternative? >>> > >>> > >>> > >>> > On Tue, Sep 9, 2014 at 1:29 PM, Michael Armbrust < >>> mich...@databricks.com> >>> > wrote: >>> > >>> >> I think usually people add these directories as multiple partitions >>> of the >>> >> same table instead of union. This actually allows us to efficiently >>> prune >>> >> directories when reading in addition to standard column pruning. >>> >> >>> >> On Tue, Sep 9, 2014 at 11:26 AM, Gary Malouf <malouf.g...@gmail.com> >>> >> wrote: >>> >> >>> >>> I'm kind of surprised this was not run into before. Do people not >>> >>> segregate their data by day/week in the HDFS directory structure? >>> >>> >>> >>> >>> >>> On Tue, Sep 9, 2014 at 2:08 PM, Michael Armbrust < >>> mich...@databricks.com> >>> >>> wrote: >>> >>> >>> >>>> Thanks! >>> >>>> >>> >>>> On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger <c...@koeninger.org >>> > >>> >>>> wrote: >>> >>>> >>> >>>> > Opened >>> >>>> > >>> >>>> > https://issues.apache.org/jira/browse/SPARK-3462 >>> >>>> > >>> >>>> > I'll take a look at ColumnPruning and see what I can do >>> >>>> > >>> >>>> > On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust < >>> >>>> mich...@databricks.com> >>> >>>> > wrote: >>> >>>> > >>> >>>> >> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger < >>> c...@koeninger.org> >>> >>>> >> wrote: >>> >>>> >>> >>> >>>> >>> Is there a reason in general not to push projections and >>> predicates >>> >>>> down >>> >>>> >>> into the individual ParquetTableScans in a union? >>> >>>> >>> >>> >>>> >> >>> >>>> >> This would be a great case to add to ColumnPruning. Would be >>> awesome >>> >>>> if >>> >>>> >> you could open a JIRA or even a PR :) >>> >>>> >> >>> >>>> > >>> >>>> > >>> >>>> >>> >>> >>> >>> >>> >> >>> >> >> >