Hi Michael,

Thanks a lot for your prompt answer. I greatly appreciate it.

Having said that, I think we might be...cough...cough...wrong :)

I think the "issue" is in QueryPlan.sameResult [1] as its scaladoc says:

   * Since its likely undecidable to generally determine if two given
plans will produce the same
   * results, it is okay for this function to return false, even if
the results are actually
   * the same.  Such behavior will not affect correctness, only the
application of performance
   * enhancements like caching.  However, it is not acceptable to
return true if the results could
   * possibly be different.

   * By default this function performs a modified version of equality
that is tolerant of cosmetic
   * differences like attribute naming and or expression id
differences. Operators that
   * can do better should override this function.

[1] 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L370

I don't think there is anything in Analyzer or SparkOptimizer to
prevent the cached optimization.

What do you think about:

1. Adding few TRACE messages in sameResult? (I'm doing it anyway to
hunt down the "issue")?
2. Defining an override for sameResult in Range (as LocalRelation and
other logical operators)?

Somehow I feel Spark could do better. Please guide (and help me get
better at this low-level infra of Spark SQL). Thanks!

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Nov 20, 2016 at 3:52 AM, Michael Armbrust
<mich...@databricks.com> wrote:
> You are hitting a weird optimization in withColumn.  Specifically, to avoid
> building up huge trees with chained calls to this method, we collapse
> projections eagerly (instead of waiting for the optimizer).
>
> Typically we look for cached data in between analysis and optimization, so
> that optimizations won't change out ability to recognized cached query
> plans.  However, in this case the eager optimization is thwarting that.
>
> On Sat, Nov 19, 2016 at 12:19 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> Hi,
>>
>> There might be a bug in how analyzing Datasets or looking up cached
>> Datasets works. I'm on master.
>>
>> ➜  spark git:(master) ✗ ./bin/spark-submit --version
>> Welcome to
>>       ____              __
>>      / __/__  ___ _____/ /__
>>     _\ \/ _ \/ _ `/ __/  '_/
>>    /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
>>       /_/
>>
>> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112
>> Branch master
>> Compiled by user jacek on 2016-11-19T08:39:43Z
>> Revision 2a40de408b5eb47edba92f9fe92a42ed1e78bf98
>> Url https://github.com/apache/spark.git
>> Type --help for more information.
>>
>> After reviewing CacheManager and how caching works for Datasets I
>> thought the following query would use the cached Dataset but it does
>> not.
>>
>> // Cache Dataset -- it is lazy
>> scala> val df = spark.range(1).cache
>> df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
>>
>> // Trigger caching
>> scala> df.show
>> +---+
>> | id|
>> +---+
>> |  0|
>> +---+
>>
>> // Visit http://localhost:4040/storage to see the Dataset cached. And it
>> is.
>>
>> // Use the cached Dataset in another query
>> // Notice InMemoryRelation in use for cached queries
>> // It works as expected.
>> scala> df.withColumn("newId", 'id).explain(extended = true)
>> == Parsed Logical Plan ==
>> 'Project [*, 'id AS newId#16]
>> +- Range (0, 1, step=1, splits=Some(8))
>>
>> == Analyzed Logical Plan ==
>> id: bigint, newId: bigint
>> Project [id#0L, id#0L AS newId#16L]
>> +- Range (0, 1, step=1, splits=Some(8))
>>
>> == Optimized Logical Plan ==
>> Project [id#0L, id#0L AS newId#16L]
>> +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory,
>> deserialized, 1 replicas)
>>       +- *Range (0, 1, step=1, splits=Some(8))
>>
>> == Physical Plan ==
>> *Project [id#0L, id#0L AS newId#16L]
>> +- InMemoryTableScan [id#0L]
>>       +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk,
>> memory, deserialized, 1 replicas)
>>             +- *Range (0, 1, step=1, splits=Some(8))
>>
>> I hoped that the following query would use the cached one but it does
>> not. Should it? I thought that QueryExecution.withCachedData [1] would
>> do the trick.
>>
>> [1]
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L70
>>
>> // The following snippet uses spark.range(1) which is the same as the
>> one cached above
>> // Why does the physical plan not use InMemoryTableScan and
>> InMemoryRelation?
>> scala> spark.range(1).withColumn("new", 'id).explain(extended = true)
>> == Parsed Logical Plan ==
>> 'Project [*, 'id AS new#29]
>> +- Range (0, 1, step=1, splits=Some(8))
>>
>> == Analyzed Logical Plan ==
>> id: bigint, new: bigint
>> Project [id#26L, id#26L AS new#29L]
>> +- Range (0, 1, step=1, splits=Some(8))
>>
>> == Optimized Logical Plan ==
>> Project [id#26L, id#26L AS new#29L]
>> +- Range (0, 1, step=1, splits=Some(8))
>>
>> == Physical Plan ==
>> *Project [id#26L, id#26L AS new#29L]
>> +- *Range (0, 1, step=1, splits=Some(8))
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to