[ 
https://issues.apache.org/jira/browse/SPARK-24613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-24613:
--------------------------------
    Description: 
When caching a query, we generate its execution plan from the query's logical 
plan. However, the logical plan we get from the Dataset has already been 
analyzed, and when we try the get the execution plan, this already analyzed 
logical plan will be analyzed again in the new QueryExecution object, and 
unfortunately some rules have side effects if applied multiple times, which in 
this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan now has 
an extra null-check and can't be matched against the same plan. The following 
test would fail since {{df2}}'s execution plan inside the CacheManager does not 
depend on {{df1}}.
{code:java}
test("cache UDF result correctly 2") {
  val expensiveUDF = udf({x: Int => Thread.sleep(10000); x})
  val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
  val df2 = df.agg(sum(df("b")))

  df.cache()
  df.count()
  df2.cache()

  // udf has been evaluated during caching, and thus should not be re-evaluated 
here
  failAfter(5 seconds) {
    df2.collect()
  }
}
{code}

While it might be worth re-visiting such analysis rules, we can make also fix 
the CacheManager to avoid these potential problems.

  was:
When caching a query, we generate its execution plan from the query's logical 
plan. However, the logical plan we get from the Dataset has already been 
analyzed, and when we try the get the execution plan, this already analyzed 
logical plan will be analyzed again in the new QueryExecution object, and 
unfortunately some rules have side effects if applied multiple times, which in 
this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan now has 
an extra null-check and can't be matched against the same plan. The following 
test would fail since {{df2}}'s execution plan inside the CacheManager does not 
depend on {{df1}}.
{code:java}
test("cache UDF result correctly 2") {
  val expensiveUDF = udf({x: Int => Thread.sleep(10000); x})
  val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
  val df2 = df.agg(sum(df("b")))

  df.cache()
  df.count()
  df2.cache()

  // udf has been evaluated during caching, and thus should not be re-evaluated 
here
  failAfter(5 seconds) {
    df2.collect()
  }
}
{code}


> Cache with UDF could not be matched with subsequent dependent caches
> --------------------------------------------------------------------
>
>                 Key: SPARK-24613
>                 URL: https://issues.apache.org/jira/browse/SPARK-24613
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Maryann Xue
>            Priority: Minor
>             Fix For: 2.4.0
>
>
> When caching a query, we generate its execution plan from the query's logical 
> plan. However, the logical plan we get from the Dataset has already been 
> analyzed, and when we try the get the execution plan, this already analyzed 
> logical plan will be analyzed again in the new QueryExecution object, and 
> unfortunately some rules have side effects if applied multiple times, which 
> in this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan 
> now has an extra null-check and can't be matched against the same plan. The 
> following test would fail since {{df2}}'s execution plan inside the 
> CacheManager does not depend on {{df1}}.
> {code:java}
> test("cache UDF result correctly 2") {
>   val expensiveUDF = udf({x: Int => Thread.sleep(10000); x})
>   val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
>   val df2 = df.agg(sum(df("b")))
>   df.cache()
>   df.count()
>   df2.cache()
>   // udf has been evaluated during caching, and thus should not be 
> re-evaluated here
>   failAfter(5 seconds) {
>     df2.collect()
>   }
> }
> {code}
> While it might be worth re-visiting such analysis rules, we can make also fix 
> the CacheManager to avoid these potential problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to