This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 6468f96ea42 [SPARK-45386][SQL][3.5] Fix correctness issue with persist 
using StorageLevel.NONE on Dataset
6468f96ea42 is described below

commit 6468f96ea42f6efe42033507c4e26600b751bfcc
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Thu Oct 5 09:41:08 2023 +0900

    [SPARK-45386][SQL][3.5] Fix correctness issue with persist using 
StorageLevel.NONE on Dataset
    
    ### What changes were proposed in this pull request?
    Support for InMememoryTableScanExec in AQE was added in #39624, but this 
patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`. 
Before that patch a query like:
    ```
    import org.apache.spark.storage.StorageLevel
    spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count()
    ```
    would correctly return 2. But after that patch it incorrectly returns 0. 
This is because AQE incorrectly determines based on the runtime statistics that 
are collected here:
    
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294
    that the input is empty. The problem is that the action that should make 
sure the statistics are collected here
    
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291
    never use the iterator and when we have `StorageLevel.NONE` the persisting 
will also not use the iterator and we will not gather the correct statistics.
    
    The proposed fix in the patch just make calling persist with 
StorageLevel.NONE a no-op. Changing the action since it always "emptied" the 
iterator would also work but seems like that would be unnecessary work in a lot 
of normal circumstances.
    
    ### Why are the changes needed?
    The current code has a correctness issue.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, fixes the correctness issue.
    
    ### How was this patch tested?
    New and existing unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43213 from eejbyfeldt/SPARK-45386-branch-3.5.
    
    Authored-by: Emil Ejbyfeldt <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../main/scala/org/apache/spark/sql/execution/CacheManager.scala    | 4 +++-
 sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala     | 6 ++++++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 064819275e0..e906c74f8a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -113,7 +113,9 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
       planToCache: LogicalPlan,
       tableName: Option[String],
       storageLevel: StorageLevel): Unit = {
-    if (lookupCachedData(planToCache).nonEmpty) {
+    if (storageLevel == StorageLevel.NONE) {
+      // Do nothing for StorageLevel.NONE since it will not actually cache any 
data.
+    } else if (lookupCachedData(planToCache).nonEmpty) {
       logWarning("Asked to cache already cached data.")
     } else {
       val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index c967540541a..6d9c43f866a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -45,6 +45,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
+import org.apache.spark.storage.StorageLevel
 
 case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2)
 case class TestDataPoint2(x: Int, s: String)
@@ -2535,6 +2536,11 @@ class DatasetSuite extends QueryTest
 
     checkDataset(ds.filter(f(col("_1"))), Tuple1(ValueClass(2)))
   }
+
+  test("SPARK-45386: persist with StorageLevel.NONE should give correct 
count") {
+    val ds = Seq(1, 2).toDS().persist(StorageLevel.NONE)
+    assert(ds.count() == 2)
+  }
 }
 
 class DatasetLargeResultCollectingSuite extends QueryTest


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to