This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3db117e [SPARK-27407][SQL] File source V2: Invalidate cache data on
overwrite/append
3db117e is described below
commit 3db117e43e06c9de9f4158d53166829fb5b93f30
Author: Gengliang Wang <[email protected]>
AuthorDate: Tue Apr 9 09:25:37 2019 -0700
[SPARK-27407][SQL] File source V2: Invalidate cache data on overwrite/append
## What changes were proposed in this pull request?
File source V2 currently incorrectly continues to use cached data even if
the underlying data is overwritten.
We should follow https://github.com/apache/spark/pull/13566 and fix it by
invalidating and refreshes all the cached data (and the associated metadata)
for any Dataframe that contains the given data source path.
## How was this patch tested?
Unit test
Closes #24318 from gengliangwang/invalidCache.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/execution/CacheManager.scala | 30 +++++++++++++++-----
.../spark/sql/FileBasedDataSourceSuite.scala | 32 ++++++++++++++++++++++
.../datasources/parquet/ParquetQuerySuite.scala | 24 ----------------
3 files changed, 55 insertions(+), 31 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index d1f096b..b3c253b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeMap, Subqu
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData,
LogicalPlan, ResolvedHint}
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{FileIndex,
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
FileTable}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
@@ -253,15 +254,30 @@ class CacheManager extends Logging {
plan match {
case lr: LogicalRelation => lr.relation match {
case hr: HadoopFsRelation =>
- val prefixToInvalidate = qualifiedPath.toString
- val invalidate = hr.location.rootPaths
- .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
- .exists(_.startsWith(prefixToInvalidate))
- if (invalidate) hr.location.refresh()
- invalidate
+ refreshFileIndexIfNecessary(hr.location, fs, qualifiedPath)
case _ => false
}
+
+ case DataSourceV2Relation(fileTable: FileTable, _, _) =>
+ refreshFileIndexIfNecessary(fileTable.fileIndex, fs, qualifiedPath)
+
case _ => false
}
}
+
+ /**
+ * Refresh the given [[FileIndex]] if any of its root paths starts with
`qualifiedPath`.
+ * @return whether the [[FileIndex]] is refreshed.
+ */
+ private def refreshFileIndexIfNecessary(
+ fileIndex: FileIndex,
+ fs: FileSystem,
+ qualifiedPath: Path): Boolean = {
+ val prefixToInvalidate = qualifiedPath.toString
+ val needToRefresh = fileIndex.rootPaths
+ .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
+ .exists(_.startsWith(prefixToInvalidate))
+ if (needToRefresh) fileIndex.refresh()
+ needToRefresh
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index add8a30..506156d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -494,6 +494,38 @@ class FileBasedDataSourceSuite extends QueryTest with
SharedSQLContext with Befo
}
}
+ test("Do not use cache on overwrite") {
+ Seq("", "orc").foreach { useV1SourceReaderList =>
+ withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key ->
useV1SourceReaderList) {
+ withTempDir { dir =>
+ val path = dir.toString
+ spark.range(1000).write.mode("overwrite").orc(path)
+ val df = spark.read.orc(path).cache()
+ assert(df.count() == 1000)
+ spark.range(10).write.mode("overwrite").orc(path)
+ assert(df.count() == 10)
+ assert(spark.read.orc(path).count() == 10)
+ }
+ }
+ }
+ }
+
+ test("Do not use cache on append") {
+ Seq("", "orc").foreach { useV1SourceReaderList =>
+ withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key ->
useV1SourceReaderList) {
+ withTempDir { dir =>
+ val path = dir.toString
+ spark.range(1000).write.mode("append").orc(path)
+ val df = spark.read.orc(path).cache()
+ assert(df.count() == 1000)
+ spark.range(10).write.mode("append").orc(path)
+ assert(df.count() == 1010)
+ assert(spark.read.orc(path).count() == 1010)
+ }
+ }
+ }
+ }
+
test("Return correct results when data columns overlap with partition
columns") {
Seq("parquet", "orc", "json").foreach { format =>
withTempPath { path =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index beb89d9..8cc3bee 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -70,30 +70,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest
with SharedSQLContext
TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}
- test("SPARK-15678: not use cache on overwrite") {
- withTempDir { dir =>
- val path = dir.toString
- spark.range(1000).write.mode("overwrite").parquet(path)
- val df = spark.read.parquet(path).cache()
- assert(df.count() == 1000)
- spark.range(10).write.mode("overwrite").parquet(path)
- assert(df.count() == 10)
- assert(spark.read.parquet(path).count() == 10)
- }
- }
-
- test("SPARK-15678: not use cache on append") {
- withTempDir { dir =>
- val path = dir.toString
- spark.range(1000).write.mode("append").parquet(path)
- val df = spark.read.parquet(path).cache()
- assert(df.count() == 1000)
- spark.range(10).write.mode("append").parquet(path)
- assert(df.count() == 1010)
- assert(spark.read.parquet(path).count() == 1010)
- }
- }
-
test("self-join") {
// 4 rows, cells of column 1 of row 2 and row 4 are null
val data = (1 to 4).map { i =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]