Hi Michael, Thanks a lot for your prompt answer. I greatly appreciate it.
Having said that, I think we might be...cough...cough...wrong :) I think the "issue" is in QueryPlan.sameResult [1] as its scaladoc says: * Since its likely undecidable to generally determine if two given plans will produce the same * results, it is okay for this function to return false, even if the results are actually * the same. Such behavior will not affect correctness, only the application of performance * enhancements like caching. However, it is not acceptable to return true if the results could * possibly be different. * By default this function performs a modified version of equality that is tolerant of cosmetic * differences like attribute naming and or expression id differences. Operators that * can do better should override this function. [1] https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L370 I don't think there is anything in Analyzer or SparkOptimizer to prevent the cached optimization. What do you think about: 1. Adding few TRACE messages in sameResult? (I'm doing it anyway to hunt down the "issue")? 2. Defining an override for sameResult in Range (as LocalRelation and other logical operators)? Somehow I feel Spark could do better. Please guide (and help me get better at this low-level infra of Spark SQL). Thanks! Pozdrawiam, Jacek Laskowski ---- https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Sun, Nov 20, 2016 at 3:52 AM, Michael Armbrust <mich...@databricks.com> wrote: > You are hitting a weird optimization in withColumn. Specifically, to avoid > building up huge trees with chained calls to this method, we collapse > projections eagerly (instead of waiting for the optimizer). > > Typically we look for cached data in between analysis and optimization, so > that optimizations won't change out ability to recognized cached query > plans. However, in this case the eager optimization is thwarting that. > > On Sat, Nov 19, 2016 at 12:19 PM, Jacek Laskowski <ja...@japila.pl> wrote: >> >> Hi, >> >> There might be a bug in how analyzing Datasets or looking up cached >> Datasets works. I'm on master. >> >> ➜ spark git:(master) ✗ ./bin/spark-submit --version >> Welcome to >> ____ __ >> / __/__ ___ _____/ /__ >> _\ \/ _ \/ _ `/ __/ '_/ >> /___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT >> /_/ >> >> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112 >> Branch master >> Compiled by user jacek on 2016-11-19T08:39:43Z >> Revision 2a40de408b5eb47edba92f9fe92a42ed1e78bf98 >> Url https://github.com/apache/spark.git >> Type --help for more information. >> >> After reviewing CacheManager and how caching works for Datasets I >> thought the following query would use the cached Dataset but it does >> not. >> >> // Cache Dataset -- it is lazy >> scala> val df = spark.range(1).cache >> df: org.apache.spark.sql.Dataset[Long] = [id: bigint] >> >> // Trigger caching >> scala> df.show >> +---+ >> | id| >> +---+ >> | 0| >> +---+ >> >> // Visit http://localhost:4040/storage to see the Dataset cached. And it >> is. >> >> // Use the cached Dataset in another query >> // Notice InMemoryRelation in use for cached queries >> // It works as expected. >> scala> df.withColumn("newId", 'id).explain(extended = true) >> == Parsed Logical Plan == >> 'Project [*, 'id AS newId#16] >> +- Range (0, 1, step=1, splits=Some(8)) >> >> == Analyzed Logical Plan == >> id: bigint, newId: bigint >> Project [id#0L, id#0L AS newId#16L] >> +- Range (0, 1, step=1, splits=Some(8)) >> >> == Optimized Logical Plan == >> Project [id#0L, id#0L AS newId#16L] >> +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, >> deserialized, 1 replicas) >> +- *Range (0, 1, step=1, splits=Some(8)) >> >> == Physical Plan == >> *Project [id#0L, id#0L AS newId#16L] >> +- InMemoryTableScan [id#0L] >> +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, >> memory, deserialized, 1 replicas) >> +- *Range (0, 1, step=1, splits=Some(8)) >> >> I hoped that the following query would use the cached one but it does >> not. Should it? I thought that QueryExecution.withCachedData [1] would >> do the trick. >> >> [1] >> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L70 >> >> // The following snippet uses spark.range(1) which is the same as the >> one cached above >> // Why does the physical plan not use InMemoryTableScan and >> InMemoryRelation? >> scala> spark.range(1).withColumn("new", 'id).explain(extended = true) >> == Parsed Logical Plan == >> 'Project [*, 'id AS new#29] >> +- Range (0, 1, step=1, splits=Some(8)) >> >> == Analyzed Logical Plan == >> id: bigint, new: bigint >> Project [id#26L, id#26L AS new#29L] >> +- Range (0, 1, step=1, splits=Some(8)) >> >> == Optimized Logical Plan == >> Project [id#26L, id#26L AS new#29L] >> +- Range (0, 1, step=1, splits=Some(8)) >> >> == Physical Plan == >> *Project [id#26L, id#26L AS new#29L] >> +- *Range (0, 1, step=1, splits=Some(8)) >> >> Pozdrawiam, >> Jacek Laskowski >> ---- >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> > --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org