Hey Cody,

Thanks for doing this!  Will look at your PR later today.

Michael

On Wed, Sep 10, 2014 at 9:31 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Tested the patch against a cluster with some real data.  Initial results
> seem like going from one table to a union of 2 tables is now closer to a
> doubling of query time as expected, instead of 5 to 10x.
>
> Let me know if you see any issues with that PR.
>
> On Wed, Sep 10, 2014 at 8:19 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> So the obvious thing I was missing is that the analyzer has already
>> resolved attributes by the time the optimizer runs, so the references in
>> the filter / projection need to be fixed up to match the children.
>>
>> Created a PR, let me know if there's a better way to do it.  I'll see
>> about testing performance against some actual data sets.
>>
>> On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Ok, so looking at the optimizer code for the first time and trying the
>>> simplest rule that could possibly work,
>>>
>>> object UnionPushdown extends Rule[LogicalPlan] {
>>>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>>>     // Push down filter into
>>> union
>>>     case f @ Filter(condition, u @ Union(left, right)) =>
>>>
>>>       u.copy(left = f.copy(child = left), right = f.copy(child =
>>> right))
>>>
>>>
>>>     // Push down projection into
>>> union
>>>     case p @ Project(projectList, u @ Union(left, right)) =>
>>>       u.copy(left = p.copy(child = left), right = p.copy(child =
>>> right))
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> If I try manually applying that rule to a logical plan in the repl, it
>>> produces the query shape I'd expect, and executing that plan results in
>>> parquet pushdowns as I'd expect.
>>>
>>> But adding those cases to ColumnPruning results in a runtime exception
>>> (below)
>>>
>>> I can keep digging, but it seems like I'm missing some obvious initial
>>> context around naming of attributes.  If you can provide any pointers to
>>> speed me on my way I'd appreciate it.
>>>
>>>
>>> java.lang.AssertionError: assertion failed: ArrayBuffer() +
>>> ArrayBuffer() != WrappedArray(name#6, age#7), List(name#9, age#10,
>>> phones#11)
>>>         at scala.Predef$.assert(Predef.scala:179)
>>>         at
>>> org.apache.spark.sql.parquet.ParquetTableScan.<init>(ParquetTableOperations.scala:75)
>>>         at
>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>>>         at
>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>>>         at
>>> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
>>>         at
>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
>>>         at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>>         at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>         at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>>>         at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>>>         at
>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>>>         at
>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>>>         at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>         at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>         at scala.collection.immutable.List.foreach(List.scala:318)
>>>         at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>         at
>>> scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>>         at
>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
>>>         at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>>         at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>         at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>>>         at
>>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
>>>         at
>>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
>>>         at
>>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
>>>         at
>>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
>>>         at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:431)
>>>
>>>
>>>
>>>
>>> On Tue, Sep 9, 2014 at 3:02 PM, Michael Armbrust <mich...@databricks.com
>>> > wrote:
>>>
>>>> What Patrick said is correct.  Two other points:
>>>>  - In the 1.2 release we are hoping to beef up the support for working
>>>> with partitioned parquet independent of the metastore.
>>>>  - You can actually do operations like INSERT INTO for parquet tables
>>>> to add data.  This creates new parquet files for each insertion.  This will
>>>> break if there are multiple concurrent writers to the same table.
>>>>
>>>> On Tue, Sep 9, 2014 at 12:09 PM, Patrick Wendell <pwend...@gmail.com>
>>>> wrote:
>>>>
>>>>> I think what Michael means is people often use this to read existing
>>>>> partitioned Parquet tables that are defined in a Hive metastore rather
>>>>> than data generated directly from within Spark and then reading it
>>>>> back as a table. I'd expect the latter case to become more common, but
>>>>> for now most users connect to an existing metastore.
>>>>>
>>>>> I think you could go this route by creating a partitioned external
>>>>> table based on the on-disk layout you create. The downside is that
>>>>> you'd have to go through a hive metastore whereas what you are doing
>>>>> now doesn't need hive at all.
>>>>>
>>>>> We should also just fix the case you are mentioning where a union is
>>>>> used directly from within spark. But that's the context.
>>>>>
>>>>> - Patrick
>>>>>
>>>>> On Tue, Sep 9, 2014 at 12:01 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>> > Maybe I'm missing something, I thought parquet was generally a
>>>>> write-once
>>>>> > format and the sqlContext interface to it seems that way as well.
>>>>> >
>>>>> > d1.saveAsParquetFile("/foo/d1")
>>>>> >
>>>>> > // another day, another table, with same schema
>>>>> > d2.saveAsParquetFile("/foo/d2")
>>>>> >
>>>>> > Will give a directory structure like
>>>>> >
>>>>> > /foo/d1/_metadata
>>>>> > /foo/d1/part-r-1.parquet
>>>>> > /foo/d1/part-r-2.parquet
>>>>> > /foo/d1/_SUCCESS
>>>>> >
>>>>> > /foo/d2/_metadata
>>>>> > /foo/d2/part-r-1.parquet
>>>>> > /foo/d2/part-r-2.parquet
>>>>> > /foo/d2/_SUCCESS
>>>>> >
>>>>> > // ParquetFileReader will fail, because /foo/d1 is a directory, not a
>>>>> > parquet partition
>>>>> > sqlContext.parquetFile("/foo")
>>>>> >
>>>>> > // works, but has the noted lack of pushdown
>>>>> >
>>>>> sqlContext.parquetFile("/foo/d1").unionAll(sqlContext.parquetFile("/foo/d2"))
>>>>> >
>>>>> >
>>>>> > Is there another alternative?
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Tue, Sep 9, 2014 at 1:29 PM, Michael Armbrust <
>>>>> mich...@databricks.com>
>>>>> > wrote:
>>>>> >
>>>>> >> I think usually people add these directories as multiple partitions
>>>>> of the
>>>>> >> same table instead of union.  This actually allows us to
>>>>> efficiently prune
>>>>> >> directories when reading in addition to standard column pruning.
>>>>> >>
>>>>> >> On Tue, Sep 9, 2014 at 11:26 AM, Gary Malouf <malouf.g...@gmail.com
>>>>> >
>>>>> >> wrote:
>>>>> >>
>>>>> >>> I'm kind of surprised this was not run into before.  Do people not
>>>>> >>> segregate their data by day/week in the HDFS directory structure?
>>>>> >>>
>>>>> >>>
>>>>> >>> On Tue, Sep 9, 2014 at 2:08 PM, Michael Armbrust <
>>>>> mich...@databricks.com>
>>>>> >>> wrote:
>>>>> >>>
>>>>> >>>> Thanks!
>>>>> >>>>
>>>>> >>>> On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger <
>>>>> c...@koeninger.org>
>>>>> >>>> wrote:
>>>>> >>>>
>>>>> >>>> > Opened
>>>>> >>>> >
>>>>> >>>> > https://issues.apache.org/jira/browse/SPARK-3462
>>>>> >>>> >
>>>>> >>>> > I'll take a look at ColumnPruning and see what I can do
>>>>> >>>> >
>>>>> >>>> > On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust <
>>>>> >>>> mich...@databricks.com>
>>>>> >>>> > wrote:
>>>>> >>>> >
>>>>> >>>> >> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger <
>>>>> c...@koeninger.org>
>>>>> >>>> >> wrote:
>>>>> >>>> >>>
>>>>> >>>> >>> Is there a reason in general not to push projections and
>>>>> predicates
>>>>> >>>> down
>>>>> >>>> >>> into the individual ParquetTableScans in a union?
>>>>> >>>> >>>
>>>>> >>>> >>
>>>>> >>>> >> This would be a great case to add to ColumnPruning.  Would be
>>>>> awesome
>>>>> >>>> if
>>>>> >>>> >> you could open a JIRA or even a PR :)
>>>>> >>>> >>
>>>>> >>>> >
>>>>> >>>> >
>>>>> >>>>
>>>>> >>>
>>>>> >>>
>>>>> >>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to