Repository: spark
Updated Branches:
  refs/heads/branch-1.1 4c7f082c6 -> 90f8f3eed


[SPARK-3138][SQL] sqlContext.parquetFile should be able to take a single file 
as parameter

```if (!fs.getFileStatus(path).isDir) throw Exception``` make no sense after 
this commit #1370

be careful if someone is working on SPARK-2551, make sure the new change passes 
test case ```test("Read a parquet file instead of a directory")```

Author: chutium <[email protected]>

Closes #2044 from chutium/parquet-singlefile and squashes the following commits:

4ae477f [chutium] [SPARK-3138][SQL] sqlContext.parquetFile should be able to 
take a single file as parameter

(cherry picked from commit 48f42781dedecd38ddcb2dcf67dead92bb4318f5)
Signed-off-by: Michael Armbrust <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90f8f3ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90f8f3ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90f8f3ee

Branch: refs/heads/branch-1.1
Commit: 90f8f3eed026e9c4f1a4b1952e284558c0e3fd23
Parents: 4c7f082
Author: chutium <[email protected]>
Authored: Wed Aug 27 13:13:04 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Wed Aug 27 13:13:12 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/ParquetTypes.scala |  7 ++---
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 27 +++++++++++++++++---
 2 files changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90f8f3ee/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 1a52377..2941b97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -394,17 +394,14 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
       throw new IllegalArgumentException(s"Incorrectly formatted Parquet 
metadata path $origPath")
     }
     val path = origPath.makeQualified(fs)
-    if (!fs.getFileStatus(path).isDir) {
-      throw new IllegalArgumentException(
-        s"Expected $path for be a directory with Parquet files/metadata")
-    }
-    ParquetRelation.enableLogForwarding()
 
     val children = fs.listStatus(path).filterNot { status =>
       val name = status.getPath.getName
       (name(0) == '.' || name(0) == '_') && name != 
ParquetFileWriter.PARQUET_METADATA_FILE
     }
 
+    ParquetRelation.enableLogForwarding()
+
     // NOTE (lian): Parquet "_metadata" file can be very slow if the file 
consists of lots of row
     // groups. Since Parquet schema is replicated among all row groups, we 
only need to touch a
     // single row group to read schema related metadata. Notice that we are 
making assumptions that

http://git-wip-us.apache.org/repos/asf/spark/blob/90f8f3ee/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 4219cc0..42923b6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -35,7 +35,6 @@ import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext._
 import org.apache.spark.util.Utils
 
-
 case class TestRDDEntry(key: Int, value: String)
 
 case class NullReflectData(
@@ -420,8 +419,30 @@ class ParquetQuerySuite extends QueryTest with 
FunSuiteLike with BeforeAndAfterA
     val rdd_copy = sql("SELECT * FROM tmpx").collect()
     val rdd_orig = rdd.collect()
     for(i <- 0 to 99) {
-      assert(rdd_copy(i).apply(0) === rdd_orig(i).key,  s"key error in line 
$i")
-      assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i")
+      assert(rdd_copy(i).apply(0) === rdd_orig(i).key,   s"key error in line 
$i")
+      assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line 
$i")
+    }
+    Utils.deleteRecursively(file)
+  }
+
+  test("Read a parquet file instead of a directory") {
+    val file = getTempFilePath("parquet")
+    val path = file.toString
+    val fsPath = new Path(path)
+    val fs: FileSystem = 
fsPath.getFileSystem(TestSQLContext.sparkContext.hadoopConfiguration)
+    val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+      .map(i => TestRDDEntry(i, s"val_$i"))
+    rdd.coalesce(1).saveAsParquetFile(path)
+
+    val children = 
fs.listStatus(fsPath).filter(_.getPath.getName.endsWith(".parquet"))
+    assert(children.length > 0)
+    val readFile = parquetFile(path + "/" + children(0).getPath.getName)
+    readFile.registerTempTable("tmpx")
+    val rdd_copy = sql("SELECT * FROM tmpx").collect()
+    val rdd_orig = rdd.collect()
+    for(i <- 0 to 99) {
+      assert(rdd_copy(i).apply(0) === rdd_orig(i).key,   s"key error in line 
$i")
+      assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line 
$i")
     }
     Utils.deleteRecursively(file)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to