Repository: spark
Updated Branches:
  refs/heads/master 4e09a0d5e -> 0ce01635c


[SPARK-13774][SQL] - Improve error message for non-existent paths and add tests

SPARK-13774: IllegalArgumentException: Can not create a Path from an empty 
string for incorrect file path

**Overview:**
-       If a non-existent path is given in this call
``
scala> sqlContext.read.format("csv").load("file-path-is-incorrect.csv")
``
it throws the following error:
`java.lang.IllegalArgumentException: Can not create a Path from an empty 
string` …..
`It gets called from inferSchema call in 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation`

-       The purpose of this JIRA is to throw a better error message.
-       With the fix, you will now get a _Path does not exist_ error message.
```
scala> sqlContext.read.format("csv").load("file-path-is-incorrect.csv")
org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/Users/ksunitha/trunk/spark/file-path-is-incorrect.csv;
  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:215)
  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:204)
  ...
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:204)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:131)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:141)
  ... 49 elided
```

**Details**
_Changes include:_
-       Check if path exists or not in resolveRelation in DataSource, and throw 
an AnalysisException with message like “Path does not exist: $path”
-       AnalysisException is thrown similar to the exceptions thrown in 
resolveRelation.
-       The glob path and the non glob path is checked with minimal calls to 
path exists. If the globPath is empty, then it is a nonexistent glob pattern 
and an error will be thrown. In the scenario that it is not globPath, it is 
necessary to only check if the first element in the Seq is valid or not.

_Test modifications:_
-       Changes went in for 3 tests to account for this error checking.
-       SQLQuerySuite:test("run sql directly on files") – Error message 
needed to be updated.
-       2 tests failed in MetastoreDataSourcesSuite because they had a dummy 
path and so test is modified to give a tempdir and allow it to move past so it 
can continue to test the codepath it meant to test

_New Tests:_
2 new tests are added to DataFrameSuite to validate that glob and non-glob path 
will throw the new error message.

_Testing:_
Unit tests were run with the fix.

**Notes/Questions to reviewers:**
-       There is some code duplication in DataSource.scala in resolveRelation 
method and also createSource with respect to getting the paths.  I have not 
made any changes to the createSource codepath.  Should we make the change there 
as well ?

-       From other JIRAs, I know there is restructuring and changes going on in 
this area, not sure how that will affect these changes, but since this seemed 
like a starter issue, I looked into it.  If we prefer not to add the overhead 
of the checks, or if there is a better place to do so, let me know.

I would appreciate your review. Thanks for your time and comments.

Author: Sunitha Kambhampati <[email protected]>

Closes #11775 from skambha/improve_errmsg.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ce01635
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ce01635
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ce01635

Branch: refs/heads/master
Commit: 0ce01635cc66ca5f9d8962235054335b16f7507e
Parents: 4e09a0d
Author: Sunitha Kambhampati <[email protected]>
Authored: Tue Mar 22 20:47:57 2016 +0800
Committer: Cheng Lian <[email protected]>
Committed: Tue Mar 22 20:47:57 2016 +0800

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 11 ++-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 16 ++++
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  2 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 97 ++++++++++----------
 4 files changed, 78 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0ce01635/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index e2d5f42..e2a14ed 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -205,7 +205,16 @@ case class DataSource(
           val hdfsPath = new Path(path)
           val fs = 
hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
           val qualified = hdfsPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-          SparkHadoopUtil.get.globPathIfNecessary(qualified)
+          val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
+
+          if (globPath.isEmpty) {
+            throw new AnalysisException(s"Path does not exist: $qualified")
+          }
+          // Sufficient to check head of the globPath seq for non-glob scenario
+          if (!fs.exists(globPath.head)) {
+            throw new AnalysisException(s"Path does not exist: 
${globPath.head}")
+          }
+          globPath
         }.toArray
 
         // If they gave a schema, then we try and figure out the types of the 
partition columns

http://git-wip-us.apache.org/repos/asf/spark/blob/0ce01635/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index d03597e..f60c5ea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1397,4 +1397,20 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 
     assert(e.getStackTrace.head.getClassName != 
classOf[QueryExecution].getName)
   }
+
+  test("SPARK-13774: Check error message for non existent path without globbed 
paths") {
+    val e = intercept[AnalysisException] (sqlContext.read.format("csv").
+      load("/xyz/file2", "/xyz/file21", "/abc/files555", "a")).getMessage()
+    assert(e.startsWith("Path does not exist"))
+   }
+
+  test("SPARK-13774: Check error message for not existent globbed paths") {
+    val e = intercept[AnalysisException] (sqlContext.read.format("text").
+      load( "/xyz/*")).getMessage()
+    assert(e.startsWith("Path does not exist"))
+
+    val e1 = intercept[AnalysisException] 
(sqlContext.read.json("/mnt/*/*-xyz.json").rdd).
+      getMessage()
+    assert(e1.startsWith("Path does not exist"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0ce01635/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 9f2233d..2733ae7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1744,7 +1744,7 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
     val e3 = intercept[AnalysisException] {
       sql("select * from json.invalid_file")
     }
-    assert(e3.message.contains("Unable to infer schema"))
+    assert(e3.message.contains("Path does not exist"))
   }
 
   test("SortMergeJoin returns wrong results when using UnsafeRows") {

http://git-wip-us.apache.org/repos/asf/spark/blob/0ce01635/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index a80c35c..3f3d069 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -693,23 +693,25 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
   test("SPARK-6024 wide schema support") {
     withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") {
       withTable("wide_schema") {
-        // We will need 80 splits for this schema if the threshold is 4000.
-        val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", 
StringType, true)))
-
-        // Manually create a metastore data source table.
-        sessionState.catalog.createDataSourceTable(
-          tableIdent = TableIdentifier("wide_schema"),
-          userSpecifiedSchema = Some(schema),
-          partitionColumns = Array.empty[String],
-          bucketSpec = None,
-          provider = "json",
-          options = Map("path" -> "just a dummy path"),
-          isExternal = false)
-
-        invalidateTable("wide_schema")
-
-        val actualSchema = table("wide_schema").schema
-        assert(schema === actualSchema)
+        withTempDir( tempDir => {
+          // We will need 80 splits for this schema if the threshold is 4000.
+          val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", 
StringType, true)))
+
+          // Manually create a metastore data source table.
+          sessionState.catalog.createDataSourceTable(
+            tableIdent = TableIdentifier("wide_schema"),
+            userSpecifiedSchema = Some(schema),
+            partitionColumns = Array.empty[String],
+            bucketSpec = None,
+            provider = "json",
+            options = Map("path" -> tempDir.getCanonicalPath),
+            isExternal = false)
+
+          invalidateTable("wide_schema")
+
+          val actualSchema = table("wide_schema").schema
+          assert(schema === actualSchema)
+        })
       }
     }
   }
@@ -899,35 +901,38 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
     sqlContext.sql("""drop database if exists testdb8156 CASCADE""")
   }
 
+
   test("skip hive metadata on table creation") {
-    val schema = StructType((1 to 5).map(i => StructField(s"c_$i", 
StringType)))
-
-    sessionState.catalog.createDataSourceTable(
-      tableIdent = TableIdentifier("not_skip_hive_metadata"),
-      userSpecifiedSchema = Some(schema),
-      partitionColumns = Array.empty[String],
-      bucketSpec = None,
-      provider = "parquet",
-      options = Map("path" -> "just a dummy path", "skipHiveMetadata" -> 
"false"),
-      isExternal = false)
-
-    // As a proxy for verifying that the table was stored in Hive compatible 
format, we verify that
-    // each column of the table is of native type StringType.
-    assert(sessionState.catalog.client.getTable("default", 
"not_skip_hive_metadata").schema
-      .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == 
StringType))
-
-    sessionState.catalog.createDataSourceTable(
-      tableIdent = TableIdentifier("skip_hive_metadata"),
-      userSpecifiedSchema = Some(schema),
-      partitionColumns = Array.empty[String],
-      bucketSpec = None,
-      provider = "parquet",
-      options = Map("path" -> "just a dummy path", "skipHiveMetadata" -> 
"true"),
-      isExternal = false)
-
-    // As a proxy for verifying that the table was stored in SparkSQL format, 
we verify that
-    // the table has a column type as array of StringType.
-    assert(sessionState.catalog.client.getTable("default", 
"skip_hive_metadata").schema
-      .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == 
ArrayType(StringType)))
+    withTempDir(tempPath => {
+      val schema = StructType((1 to 5).map(i => StructField(s"c_$i", 
StringType)))
+
+      sessionState.catalog.createDataSourceTable(
+        tableIdent = TableIdentifier("not_skip_hive_metadata"),
+        userSpecifiedSchema = Some(schema),
+        partitionColumns = Array.empty[String],
+        bucketSpec = None,
+        provider = "parquet",
+        options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" 
-> "false"),
+        isExternal = false)
+
+      // As a proxy for verifying that the table was stored in Hive compatible 
format,
+      // we verify that each column of the table is of native type StringType.
+      assert(sessionState.catalog.client.getTable("default", 
"not_skip_hive_metadata").schema
+        .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == 
StringType))
+
+      sessionState.catalog.createDataSourceTable(
+        tableIdent = TableIdentifier("skip_hive_metadata"),
+        userSpecifiedSchema = Some(schema),
+        partitionColumns = Array.empty[String],
+        bucketSpec = None,
+        provider = "parquet",
+        options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" 
-> "true"),
+        isExternal = false)
+
+      // As a proxy for verifying that the table was stored in SparkSQL 
format, we verify that
+      // the table has a column type as array of StringType.
+      assert(sessionState.catalog.client.getTable("default", 
"skip_hive_metadata").schema
+        .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == 
ArrayType(StringType)))
+    })
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to