Repository: spark
Updated Branches:
  refs/heads/branch-2.0 03008e049 -> 4dc7d377f


[SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException

## What changes were proposed in this pull request?
This patch appends a message to suggest users running refresh table or 
reloading data frames when Spark sees a FileNotFoundException due to stale, 
cached metadata.

## How was this patch tested?
Added a unit test for this in MetadataCacheSuite.

Author: petermaxlee <petermax...@gmail.com>

Closes #14003 from petermaxlee/SPARK-16336.

(cherry picked from commit fb41670c9263a89ec233861cc91a19cf1bb19073)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dc7d377
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dc7d377
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dc7d377

Branch: refs/heads/branch-2.0
Commit: 4dc7d377fba39147d8820a5a2866a2fbcb73db98
Parents: 03008e0
Author: petermaxlee <petermax...@gmail.com>
Authored: Thu Jun 30 16:49:59 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Jun 30 16:50:06 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/FileScanRDD.scala | 15 +++-
 .../apache/spark/sql/MetadataCacheSuite.scala   | 88 ++++++++++++++++++++
 2 files changed, 102 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4dc7d377/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index f7f68b1..1314c94 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -111,7 +111,20 @@ class FileScanRDD(
           currentFile = files.next()
           logInfo(s"Reading File $currentFile")
           InputFileNameHolder.setInputFileName(currentFile.filePath)
-          currentIterator = readFunction(currentFile)
+
+          try {
+            currentIterator = readFunction(currentFile)
+          } catch {
+            case e: java.io.FileNotFoundException =>
+              throw new java.io.FileNotFoundException(
+                e.getMessage + "\n" +
+                  "It is possible the underlying files have been updated. " +
+                  "You can explicitly invalidate the cache in Spark by " +
+                  "running 'REFRESH TABLE tableName' command in SQL or " +
+                  "by recreating the Dataset/DataFrame involved."
+              )
+          }
+
           hasNext
         } else {
           currentFile = null

http://git-wip-us.apache.org/repos/asf/spark/blob/4dc7d377/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
new file mode 100644
index 0000000..d872f4b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.test.SharedSQLContext
+
+/**
+ * Test suite to handle metadata cache related.
+ */
+class MetadataCacheSuite extends QueryTest with SharedSQLContext {
+
+  /** Removes one data file in the given directory. */
+  private def deleteOneFileInDirectory(dir: File): Unit = {
+    assert(dir.isDirectory)
+    val oneFile = dir.listFiles().find { file =>
+      !file.getName.startsWith("_") && !file.getName.startsWith(".")
+    }
+    assert(oneFile.isDefined)
+    oneFile.foreach(_.delete())
+  }
+
+  test("SPARK-16336 Suggest doing table refresh when encountering 
FileNotFoundException") {
+    withTempPath { (location: File) =>
+      // Create a Parquet directory
+      spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
+        .write.parquet(location.getAbsolutePath)
+
+      // Read the directory in
+      val df = spark.read.parquet(location.getAbsolutePath)
+      assert(df.count() == 100)
+
+      // Delete a file
+      deleteOneFileInDirectory(location)
+
+      // Read it again and now we should see a FileNotFoundException
+      val e = intercept[SparkException] {
+        df.count()
+      }
+      assert(e.getMessage.contains("FileNotFoundException"))
+      assert(e.getMessage.contains("REFRESH"))
+    }
+  }
+
+  ignore("SPARK-16337 temporary view refresh") {
+    withTempPath { (location: File) =>
+      // Create a Parquet directory
+      spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
+        .write.parquet(location.getAbsolutePath)
+
+      // Read the directory in
+      
spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
+      assert(sql("select count(*) from view_refresh").first().getLong(0) == 
100)
+
+      // Delete a file
+      deleteOneFileInDirectory(location)
+
+      // Read it again and now we should see a FileNotFoundException
+      val e = intercept[SparkException] {
+        sql("select count(*) from view_refresh").first()
+      }
+      assert(e.getMessage.contains("FileNotFoundException"))
+      assert(e.getMessage.contains("refresh()"))
+
+      // Refresh and we should be able to read it again.
+      spark.catalog.refreshTable("view_refresh")
+      val newCount = sql("select count(*) from 
view_refresh").first().getLong(0)
+      assert(newCount > 0 && newCount < 100)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to