So the obvious thing I was missing is that the analyzer has already
resolved attributes by the time the optimizer runs, so the references in
the filter / projection need to be fixed up to match the children.

Created a PR, let me know if there's a better way to do it.  I'll see about
testing performance against some actual data sets.

On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Ok, so looking at the optimizer code for the first time and trying the
> simplest rule that could possibly work,
>
> object UnionPushdown extends Rule[LogicalPlan] {
>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>     // Push down filter into
> union
>     case f @ Filter(condition, u @ Union(left, right)) =>
>
>       u.copy(left = f.copy(child = left), right = f.copy(child =
> right))
>
>
>     // Push down projection into
> union
>     case p @ Project(projectList, u @ Union(left, right)) =>
>       u.copy(left = p.copy(child = left), right = p.copy(child =
> right))
>
> }
>
> }
>
>
> If I try manually applying that rule to a logical plan in the repl, it
> produces the query shape I'd expect, and executing that plan results in
> parquet pushdowns as I'd expect.
>
> But adding those cases to ColumnPruning results in a runtime exception
> (below)
>
> I can keep digging, but it seems like I'm missing some obvious initial
> context around naming of attributes.  If you can provide any pointers to
> speed me on my way I'd appreciate it.
>
>
> java.lang.AssertionError: assertion failed: ArrayBuffer() + ArrayBuffer()
> != WrappedArray(name#6, age#7), List(name#9, age#10, phones#11)
>         at scala.Predef$.assert(Predef.scala:179)
>         at
> org.apache.spark.sql.parquet.ParquetTableScan.<init>(ParquetTableOperations.scala:75)
>         at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>         at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>         at
> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
>         at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
>         at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>         at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>         at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>         at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>         at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>         at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
>         at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>         at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>         at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>         at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
>         at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
>         at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
>         at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
>         at
> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:431)
>
>
>
>
> On Tue, Sep 9, 2014 at 3:02 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> What Patrick said is correct.  Two other points:
>>  - In the 1.2 release we are hoping to beef up the support for working
>> with partitioned parquet independent of the metastore.
>>  - You can actually do operations like INSERT INTO for parquet tables to
>> add data.  This creates new parquet files for each insertion.  This will
>> break if there are multiple concurrent writers to the same table.
>>
>> On Tue, Sep 9, 2014 at 12:09 PM, Patrick Wendell <pwend...@gmail.com>
>> wrote:
>>
>>> I think what Michael means is people often use this to read existing
>>> partitioned Parquet tables that are defined in a Hive metastore rather
>>> than data generated directly from within Spark and then reading it
>>> back as a table. I'd expect the latter case to become more common, but
>>> for now most users connect to an existing metastore.
>>>
>>> I think you could go this route by creating a partitioned external
>>> table based on the on-disk layout you create. The downside is that
>>> you'd have to go through a hive metastore whereas what you are doing
>>> now doesn't need hive at all.
>>>
>>> We should also just fix the case you are mentioning where a union is
>>> used directly from within spark. But that's the context.
>>>
>>> - Patrick
>>>
>>> On Tue, Sep 9, 2014 at 12:01 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>> > Maybe I'm missing something, I thought parquet was generally a
>>> write-once
>>> > format and the sqlContext interface to it seems that way as well.
>>> >
>>> > d1.saveAsParquetFile("/foo/d1")
>>> >
>>> > // another day, another table, with same schema
>>> > d2.saveAsParquetFile("/foo/d2")
>>> >
>>> > Will give a directory structure like
>>> >
>>> > /foo/d1/_metadata
>>> > /foo/d1/part-r-1.parquet
>>> > /foo/d1/part-r-2.parquet
>>> > /foo/d1/_SUCCESS
>>> >
>>> > /foo/d2/_metadata
>>> > /foo/d2/part-r-1.parquet
>>> > /foo/d2/part-r-2.parquet
>>> > /foo/d2/_SUCCESS
>>> >
>>> > // ParquetFileReader will fail, because /foo/d1 is a directory, not a
>>> > parquet partition
>>> > sqlContext.parquetFile("/foo")
>>> >
>>> > // works, but has the noted lack of pushdown
>>> >
>>> sqlContext.parquetFile("/foo/d1").unionAll(sqlContext.parquetFile("/foo/d2"))
>>> >
>>> >
>>> > Is there another alternative?
>>> >
>>> >
>>> >
>>> > On Tue, Sep 9, 2014 at 1:29 PM, Michael Armbrust <
>>> mich...@databricks.com>
>>> > wrote:
>>> >
>>> >> I think usually people add these directories as multiple partitions
>>> of the
>>> >> same table instead of union.  This actually allows us to efficiently
>>> prune
>>> >> directories when reading in addition to standard column pruning.
>>> >>
>>> >> On Tue, Sep 9, 2014 at 11:26 AM, Gary Malouf <malouf.g...@gmail.com>
>>> >> wrote:
>>> >>
>>> >>> I'm kind of surprised this was not run into before.  Do people not
>>> >>> segregate their data by day/week in the HDFS directory structure?
>>> >>>
>>> >>>
>>> >>> On Tue, Sep 9, 2014 at 2:08 PM, Michael Armbrust <
>>> mich...@databricks.com>
>>> >>> wrote:
>>> >>>
>>> >>>> Thanks!
>>> >>>>
>>> >>>> On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger <c...@koeninger.org
>>> >
>>> >>>> wrote:
>>> >>>>
>>> >>>> > Opened
>>> >>>> >
>>> >>>> > https://issues.apache.org/jira/browse/SPARK-3462
>>> >>>> >
>>> >>>> > I'll take a look at ColumnPruning and see what I can do
>>> >>>> >
>>> >>>> > On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust <
>>> >>>> mich...@databricks.com>
>>> >>>> > wrote:
>>> >>>> >
>>> >>>> >> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger <
>>> c...@koeninger.org>
>>> >>>> >> wrote:
>>> >>>> >>>
>>> >>>> >>> Is there a reason in general not to push projections and
>>> predicates
>>> >>>> down
>>> >>>> >>> into the individual ParquetTableScans in a union?
>>> >>>> >>>
>>> >>>> >>
>>> >>>> >> This would be a great case to add to ColumnPruning.  Would be
>>> awesome
>>> >>>> if
>>> >>>> >> you could open a JIRA or even a PR :)
>>> >>>> >>
>>> >>>> >
>>> >>>> >
>>> >>>>
>>> >>>
>>> >>>
>>> >>
>>>
>>
>>
>

Reply via email to