This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3db117e  [SPARK-27407][SQL] File source V2: Invalidate cache data on 
overwrite/append
3db117e is described below

commit 3db117e43e06c9de9f4158d53166829fb5b93f30
Author: Gengliang Wang <[email protected]>
AuthorDate: Tue Apr 9 09:25:37 2019 -0700

    [SPARK-27407][SQL] File source V2: Invalidate cache data on overwrite/append
    
    ## What changes were proposed in this pull request?
    
    File source V2 currently incorrectly continues to use cached data even if 
the underlying data is overwritten.
    We should follow https://github.com/apache/spark/pull/13566 and fix it by 
invalidating and refreshes all the cached data (and the associated metadata) 
for any Dataframe that contains the given data source path.
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #24318 from gengliangwang/invalidCache.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/sql/execution/CacheManager.scala  | 30 +++++++++++++++-----
 .../spark/sql/FileBasedDataSourceSuite.scala       | 32 ++++++++++++++++++++++
 .../datasources/parquet/ParquetQuerySuite.scala    | 24 ----------------
 3 files changed, 55 insertions(+), 31 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index d1f096b..b3c253b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeMap, Subqu
 import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, 
LogicalPlan, ResolvedHint}
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
FileTable}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
 
@@ -253,15 +254,30 @@ class CacheManager extends Logging {
     plan match {
       case lr: LogicalRelation => lr.relation match {
         case hr: HadoopFsRelation =>
-          val prefixToInvalidate = qualifiedPath.toString
-          val invalidate = hr.location.rootPaths
-            .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
-            .exists(_.startsWith(prefixToInvalidate))
-          if (invalidate) hr.location.refresh()
-          invalidate
+          refreshFileIndexIfNecessary(hr.location, fs, qualifiedPath)
         case _ => false
       }
+
+      case DataSourceV2Relation(fileTable: FileTable, _, _) =>
+        refreshFileIndexIfNecessary(fileTable.fileIndex, fs, qualifiedPath)
+
       case _ => false
     }
   }
+
+  /**
+   * Refresh the given [[FileIndex]] if any of its root paths starts with 
`qualifiedPath`.
+   * @return whether the [[FileIndex]] is refreshed.
+   */
+  private def refreshFileIndexIfNecessary(
+      fileIndex: FileIndex,
+      fs: FileSystem,
+      qualifiedPath: Path): Boolean = {
+    val prefixToInvalidate = qualifiedPath.toString
+    val needToRefresh = fileIndex.rootPaths
+      .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
+      .exists(_.startsWith(prefixToInvalidate))
+    if (needToRefresh) fileIndex.refresh()
+    needToRefresh
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index add8a30..506156d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -494,6 +494,38 @@ class FileBasedDataSourceSuite extends QueryTest with 
SharedSQLContext with Befo
     }
   }
 
+  test("Do not use cache on overwrite") {
+    Seq("", "orc").foreach { useV1SourceReaderList =>
+      withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> 
useV1SourceReaderList) {
+        withTempDir { dir =>
+          val path = dir.toString
+          spark.range(1000).write.mode("overwrite").orc(path)
+          val df = spark.read.orc(path).cache()
+          assert(df.count() == 1000)
+          spark.range(10).write.mode("overwrite").orc(path)
+          assert(df.count() == 10)
+          assert(spark.read.orc(path).count() == 10)
+        }
+      }
+    }
+  }
+
+  test("Do not use cache on append") {
+    Seq("", "orc").foreach { useV1SourceReaderList =>
+      withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> 
useV1SourceReaderList) {
+        withTempDir { dir =>
+          val path = dir.toString
+          spark.range(1000).write.mode("append").orc(path)
+          val df = spark.read.orc(path).cache()
+          assert(df.count() == 1000)
+          spark.range(10).write.mode("append").orc(path)
+          assert(df.count() == 1010)
+          assert(spark.read.orc(path).count() == 1010)
+        }
+      }
+    }
+  }
+
   test("Return correct results when data columns overlap with partition 
columns") {
     Seq("parquet", "orc", "json").foreach { format =>
       withTempPath { path =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index beb89d9..8cc3bee 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -70,30 +70,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
       TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
   }
 
-  test("SPARK-15678: not use cache on overwrite") {
-    withTempDir { dir =>
-      val path = dir.toString
-      spark.range(1000).write.mode("overwrite").parquet(path)
-      val df = spark.read.parquet(path).cache()
-      assert(df.count() == 1000)
-      spark.range(10).write.mode("overwrite").parquet(path)
-      assert(df.count() == 10)
-      assert(spark.read.parquet(path).count() == 10)
-    }
-  }
-
-  test("SPARK-15678: not use cache on append") {
-    withTempDir { dir =>
-      val path = dir.toString
-      spark.range(1000).write.mode("append").parquet(path)
-      val df = spark.read.parquet(path).cache()
-      assert(df.count() == 1000)
-      spark.range(10).write.mode("append").parquet(path)
-      assert(df.count() == 1010)
-      assert(spark.read.parquet(path).count() == 1010)
-    }
-  }
-
   test("self-join") {
     // 4 rows, cells of column 1 of row 2 and row 4 are null
     val data = (1 to 4).map { i =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to