Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0a593db36 -> c2cb84165


[SPARK-17599][SPARK-17569] Backport and to Spark 2.0 branch

## What changes were proposed in this pull request?

This Backports PR #15153 and PR #15122 to Spark 2.0 branch for Structured 
Streaming.
It is structured a bit differently because similar code paths already existed 
in the 2.0 branch. The unit test makes sure that both behaviors don't break.

Author: Burak Yavuz <brk...@gmail.com>

Closes #15202 from brkyvz/backports-to-streaming.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2cb8416
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2cb8416
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2cb8416

Branch: refs/heads/branch-2.0
Commit: c2cb84165960998821c53d6a45507df639aa1425
Parents: 0a593db
Author: Burak Yavuz <brk...@gmail.com>
Authored: Thu Sep 22 17:22:04 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Sep 22 17:22:04 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/FileStreamSource.scala  |  3 +-
 .../datasources/FileCatalogSuite.scala          | 12 +++++
 .../streaming/FileStreamSourceSuite.scala       | 51 +++++++++++++++++++-
 3 files changed, 64 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c2cb8416/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0dc08b1..4515f9a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -133,7 +133,8 @@ class FileStreamSource(
         userSpecifiedSchema = Some(schema),
         className = fileFormatClassName,
         options = sourceOptions.optionMapWithoutPath)
-    Dataset.ofRows(sparkSession, 
LogicalRelation(newDataSource.resolveRelation()))
+    Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
+      checkPathExist = false)))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c2cb8416/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 563f340..4f12df9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -70,6 +70,18 @@ class FileCatalogSuite extends SharedSQLContext {
     }
   }
 
+  test("ListingFileCatalog: folders that don't exist don't throw exceptions") {
+    withTempDir { dir =>
+      val deletedFolder = new File(dir, "deleted")
+      assert(!deletedFolder.exists())
+      val catalog1 = new ListingFileCatalog(
+        spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None,
+        ignoreFileNotFound = true)
+      // doesn't throw an exception
+      assert(catalog1.listLeafFiles(catalog1.paths).isEmpty)
+    }
+  }
+
   test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") 
{
     class MockCatalog(
       override val paths: Seq[Path]) extends 
PartitioningAwareFileCatalog(spark, Map.empty, None) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c2cb8416/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
index c6db2fd..dfe4bb8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -17,9 +17,19 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{File, FileNotFoundException}
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+
 import org.apache.spark.SparkFunSuite
+import 
org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
 
-class FileStreamSourceSuite extends SparkFunSuite {
+class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
 
   import FileStreamSource._
 
@@ -73,4 +83,43 @@ class FileStreamSourceSuite extends SparkFunSuite {
     assert(map.isNewFile(FileEntry("b", 10)))
   }
 
+  testWithUninterruptibleThread("do not recheck that files exist during 
getBatch") {
+    withTempDir { temp =>
+      spark.conf.set(
+        s"fs.$scheme.impl",
+        classOf[ExistsThrowsExceptionFileSystem].getName)
+      // add the metadata entries as a pre-req
+      val dir = new File(temp, "dir") // use non-existent directory to test 
whether log make the dir
+      val metadataLog =
+        new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, 
dir.getAbsolutePath)
+      assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 
0))))
+
+      val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", 
StructType(Nil),
+        dir.getAbsolutePath, Map.empty)
+      // this method should throw an exception if `fs.exists` is called during 
resolveRelation
+      newSource.getBatch(None, LongOffset(1))
+    }
+  }
+}
+
+/** Fake FileSystem to test whether the method `fs.exists` is called during
+ * `DataSource.resolveRelation`.
+ */
+class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
+  override def getUri: URI = {
+    URI.create(s"$scheme:///")
+  }
+
+  override def exists(f: Path): Boolean = {
+    throw new IllegalArgumentException("Exists shouldn't have been called!")
+  }
+
+  /** Simply return an empty file for now. */
+  override def listStatus(file: Path): Array[FileStatus] = {
+    throw new FileNotFoundException("Folder was suddenly deleted but this 
should not make it fail!")
+  }
+}
+
+object ExistsThrowsExceptionFileSystem {
+  val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to