This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 31488e1 [SPARK-27504][SQL] File source V2: support refreshing
metadata cache
31488e1 is described below
commit 31488e1ca506efd34459e6bc9a08b6d0956c8d44
Author: Gengliang Wang <[email protected]>
AuthorDate: Fri Apr 19 18:26:03 2019 +0800
[SPARK-27504][SQL] File source V2: support refreshing metadata cache
## What changes were proposed in this pull request?
In file source V1, if some file is deleted manually, reading the
DataFrame/Table will throws an exception with suggestion message
```
It is possible the underlying files have been updated. You can explicitly
invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in
SQL or by recreating the Dataset/DataFrame involved.
```
After refreshing the table/DataFrame, the reads should return correct
results.
We should follow it in file source V2 as well.
## How was this patch tested?
Unit test
Closes #24401 from gengliangwang/refreshFileTable.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../datasources/v2/DataSourceV2Relation.scala | 5 +++
.../datasources/v2/FilePartitionReader.scala | 38 ++++++++++++----------
.../org/apache/spark/sql/MetadataCacheSuite.scala | 35 ++++++++++++++------
3 files changed, 50 insertions(+), 28 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 4119957..e7e0be0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -66,6 +66,11 @@ case class DataSourceV2Relation(
override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}
+
+ override def refresh(): Unit = table match {
+ case table: FileTable => table.fileIndex.refresh()
+ case _ => // Do nothing.
+ }
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index 7c7b468..d4bad29 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -34,25 +34,27 @@ class FilePartitionReader[T](readers:
Iterator[PartitionedFileReader[T]])
override def next(): Boolean = {
if (currentReader == null) {
if (readers.hasNext) {
- if (ignoreMissingFiles || ignoreCorruptFiles) {
- try {
- currentReader = getNextReader()
- } catch {
- case e: FileNotFoundException if ignoreMissingFiles =>
- logWarning(s"Skipped missing file: $currentReader", e)
- currentReader = null
- return false
- // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
- case e: FileNotFoundException if !ignoreMissingFiles => throw e
- case e @ (_: RuntimeException | _: IOException) if
ignoreCorruptFiles =>
- logWarning(
- s"Skipped the rest of the content in the corrupted file:
$currentReader", e)
- currentReader = null
- InputFileBlockHolder.unset()
- return false
- }
- } else {
+ try {
currentReader = getNextReader()
+ } catch {
+ case e: FileNotFoundException if ignoreMissingFiles =>
+ logWarning(s"Skipped missing file: $currentReader", e)
+ currentReader = null
+ return false
+ // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+ case e: FileNotFoundException if !ignoreMissingFiles =>
+ throw new FileNotFoundException(
+ e.getMessage + "\n" +
+ "It is possible the underlying files have been updated. " +
+ "You can explicitly invalidate the cache in Spark by " +
+ "running 'REFRESH TABLE tableName' command in SQL or " +
+ "by recreating the Dataset/DataFrame involved.")
+ case e @ (_: RuntimeException | _: IOException) if
ignoreCorruptFiles =>
+ logWarning(
+ s"Skipped the rest of the content in the corrupted file:
$currentReader", e)
+ currentReader = null
+ InputFileBlockHolder.unset()
+ return false
}
} else {
return false
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
index 98aa447..664d59c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -19,14 +19,14 @@ package org.apache.spark.sql
import java.io.File
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
/**
* Test suite to handle metadata cache related.
*/
-class MetadataCacheSuite extends QueryTest with SharedSQLContext {
+abstract class MetadataCacheSuite extends QueryTest with SharedSQLContext {
/** Removes one data file in the given directory. */
private def deleteOneFileInDirectory(dir: File): Unit = {
@@ -38,14 +38,15 @@ class MetadataCacheSuite extends QueryTest with
SharedSQLContext {
oneFile.foreach(_.delete())
}
- test("SPARK-16336 Suggest doing table refresh when encountering
FileNotFoundException") {
+ test("SPARK-16336,SPARK-27504 Suggest doing table refresh " +
+ "when encountering FileNotFoundException") {
withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
- .write.parquet(location.getAbsolutePath)
+ .write.orc(location.getAbsolutePath)
// Read the directory in
- val df = spark.read.parquet(location.getAbsolutePath)
+ val df = spark.read.orc(location.getAbsolutePath)
assert(df.count() == 100)
// Delete a file
@@ -60,14 +61,14 @@ class MetadataCacheSuite extends QueryTest with
SharedSQLContext {
}
}
- test("SPARK-16337 temporary view refresh") {
+ test("SPARK-16337,SPARK-27504 temporary view refresh") {
withTempView("view_refresh") { withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
- .write.parquet(location.getAbsolutePath)
+ .write.orc(location.getAbsolutePath)
// Read the directory in
-
spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
+
spark.read.orc(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
assert(sql("select count(*) from view_refresh").first().getLong(0) ==
100)
// Delete a file
@@ -93,10 +94,10 @@ class MetadataCacheSuite extends QueryTest with
SharedSQLContext {
withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
- .write.parquet(location.getAbsolutePath)
+ .write.orc(location.getAbsolutePath)
// Read the directory in
-
spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
+
spark.read.orc(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
// Delete a file
deleteOneFileInDirectory(location)
@@ -111,3 +112,17 @@ class MetadataCacheSuite extends QueryTest with
SharedSQLContext {
}
}
}
+
+class MetadataCacheV1Suite extends MetadataCacheSuite {
+ override protected def sparkConf: SparkConf =
+ super
+ .sparkConf
+ .set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
+}
+
+class MetadataCacheV2Suite extends MetadataCacheSuite {
+ override protected def sparkConf: SparkConf =
+ super
+ .sparkConf
+ .set(SQLConf.USE_V1_SOURCE_READER_LIST, "")
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]