Repository: spark
Updated Branches:
  refs/heads/master 94761485b -> fdf5bba35


http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 5c7152e..dfe73c6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
 
@@ -175,17 +175,17 @@ class SQLQuerySuite extends QueryTest {
     def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = 
{
       val relation = 
EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
       relation match {
-        case LogicalRelation(r: FSBasedParquetRelation) =>
+        case LogicalRelation(r: ParquetRelation2) =>
           if (!isDataSourceParquet) {
             fail(
               s"${classOf[MetastoreRelation].getCanonicalName} is expected, 
but found " +
-              s"${FSBasedParquetRelation.getClass.getCanonicalName}.")
+              s"${ParquetRelation2.getClass.getCanonicalName}.")
           }
 
         case r: MetastoreRelation =>
           if (isDataSourceParquet) {
             fail(
-              s"${FSBasedParquetRelation.getClass.getCanonicalName} is 
expected, but found " +
+              s"${ParquetRelation2.getClass.getCanonicalName} is expected, but 
found " +
               s"${classOf[MetastoreRelation].getCanonicalName}.")
           }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 41bcbe8..b6be09e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -26,8 +26,8 @@ import org.apache.spark.sql.execution.{ExecutedCommand, 
PhysicalRDD}
 import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan}
-import org.apache.spark.sql.sources.{InsertIntoDataSource, 
InsertIntoFSBasedRelation, LogicalRelation}
+import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
+import org.apache.spark.sql.sources.{InsertIntoDataSource, 
InsertIntoHadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
 import org.apache.spark.util.Utils
@@ -291,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
     )
 
     table("test_parquet_ctas").queryExecution.optimizedPlan match {
-      case LogicalRelation(_: FSBasedParquetRelation) => // OK
+      case LogicalRelation(_: ParquetRelation2) => // OK
       case _ => fail(
         "test_parquet_ctas should be converted to " +
-          s"${classOf[FSBasedParquetRelation].getCanonicalName}")
+          s"${classOf[ParquetRelation2].getCanonicalName}")
     }
 
     sql("DROP TABLE IF EXISTS test_parquet_ctas")
@@ -315,9 +315,9 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
 
     val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
     df.queryExecution.executedPlan match {
-      case ExecutedCommand(InsertIntoFSBasedRelation(_: 
FSBasedParquetRelation, _, _, _)) => // OK
+      case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, 
_, _)) => // OK
       case o => fail("test_insert_parquet should be converted to a " +
-        s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
+        s"${classOf[ParquetRelation2].getCanonicalName} and " +
         s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the 
SparkPlan. " +
         s"However, found a ${o.toString} ")
     }
@@ -345,9 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
 
     val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM 
jt_array")
     df.queryExecution.executedPlan match {
-      case ExecutedCommand(InsertIntoFSBasedRelation(r: 
FSBasedParquetRelation, _, _, _)) => // OK
+      case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, 
_, _)) => // OK
       case o => fail("test_insert_parquet should be converted to a " +
-        s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
+        s"${classOf[ParquetRelation2].getCanonicalName} and " +
         s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the 
SparkPlan." +
         s"However, found a ${o.toString} ")
     }
@@ -378,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
 
     assertResult(2) {
       analyzed.collect {
-        case r @ LogicalRelation(_: FSBasedParquetRelation) => r
+        case r @ LogicalRelation(_: ParquetRelation2) => r
       }.size
     }
 
@@ -390,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
       // Converted test_parquet should be cached.
       catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
         case null => fail("Converted test_parquet should be cached in the 
cache.")
-        case logical @ LogicalRelation(parquetRelation: 
FSBasedParquetRelation) => // OK
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => 
// OK
         case other =>
           fail(
             "The cached test_parquet should be a Parquet Relation. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 8801aba..29b2158 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -24,7 +24,7 @@ import com.google.common.base.Objects
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{NullWritable, Text}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, 
TextOutputFormat}
-import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
@@ -32,17 +32,16 @@ import org.apache.spark.sql.types.{DataType, StructField, 
StructType}
 import org.apache.spark.sql.{Row, SQLContext}
 
 /**
- * A simple example [[FSBasedRelationProvider]].
+ * A simple example [[HadoopFsRelationProvider]].
  */
-class SimpleTextSource extends FSBasedRelationProvider {
+class SimpleTextSource extends HadoopFsRelationProvider {
   override def createRelation(
       sqlContext: SQLContext,
       paths: Array[String],
       schema: Option[StructType],
       partitionColumns: Option[StructType],
-      parameters: Map[String, String]): FSBasedRelation = {
-    val partitionsSchema = 
partitionColumns.getOrElse(StructType(Array.empty[StructField]))
-    new SimpleTextRelation(paths, schema, partitionsSchema, 
parameters)(sqlContext)
+      parameters: Map[String, String]): HadoopFsRelation = {
+    new SimpleTextRelation(paths, schema, partitionColumns, 
parameters)(sqlContext)
   }
 }
 
@@ -59,38 +58,30 @@ class AppendingTextOutputFormat(outputFile: Path) extends 
TextOutputFormat[NullW
   }
 }
 
-class SimpleTextOutputWriter extends OutputWriter {
-  private var recordWriter: RecordWriter[NullWritable, Text] = _
-  private var taskAttemptContext: TaskAttemptContext = _
-
-  override def init(
-      path: String,
-      dataSchema: StructType,
-      context: TaskAttemptContext): Unit = {
-    recordWriter = new AppendingTextOutputFormat(new 
Path(path)).getRecordWriter(context)
-    taskAttemptContext = context
-  }
+class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) 
extends OutputWriter {
+  private val recordWriter: RecordWriter[NullWritable, Text] =
+    new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
 
   override def write(row: Row): Unit = {
     val serialized = row.toSeq.map(_.toString).mkString(",")
     recordWriter.write(null, new Text(serialized))
   }
 
-  override def close(): Unit = recordWriter.close(taskAttemptContext)
+  override def close(): Unit = recordWriter.close(context)
 }
 
 /**
- * A simple example [[FSBasedRelation]], used for testing purposes.  Data are 
stored as comma
+ * A simple example [[HadoopFsRelation]], used for testing purposes.  Data are 
stored as comma
  * separated string lines.  When scanning data, schema must be explicitly 
provided via data source
  * option `"dataSchema"`.
  */
 class SimpleTextRelation(
-    paths: Array[String],
+    override val paths: Array[String],
     val maybeDataSchema: Option[StructType],
-    partitionsSchema: StructType,
+    override val userDefinedPartitionColumns: Option[StructType],
     parameters: Map[String, String])(
     @transient val sqlContext: SQLContext)
-  extends FSBasedRelation(paths, partitionsSchema) {
+  extends HadoopFsRelation {
 
   import sqlContext.sparkContext
 
@@ -110,9 +101,6 @@ class SimpleTextRelation(
   override def hashCode(): Int =
     Objects.hashCode(paths, maybeDataSchema, dataSchema)
 
-  override def outputWriterClass: Class[_ <: OutputWriter] =
-    classOf[SimpleTextOutputWriter]
-
   override def buildScan(inputPaths: Array[String]): RDD[Row] = {
     val fields = dataSchema.map(_.dataType)
 
@@ -122,4 +110,13 @@ class SimpleTextRelation(
       }: _*)
     }
   }
+
+  override def prepareJobForWrite(job: Job): OutputWriterFactory = new 
OutputWriterFactory {
+    override def newInstance(
+        path: String,
+        dataSchema: StructType,
+        context: TaskAttemptContext): OutputWriter = {
+      new SimpleTextOutputWriter(path, context)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
deleted file mode 100644
index 394833f..0000000
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
+++ /dev/null
@@ -1,564 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql._
-import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.parquet.ParquetTest
-import org.apache.spark.sql.types._
-
-// TODO Don't extend ParquetTest
-// This test suite extends ParquetTest for some convenient utility methods. 
These methods should be
-// moved to some more general places, maybe QueryTest.
-class FSBasedRelationTest extends QueryTest with ParquetTest {
-  override val sqlContext: SQLContext = TestHive
-
-  import sqlContext._
-  import sqlContext.implicits._
-
-  val dataSourceName = classOf[SimpleTextSource].getCanonicalName
-
-  val dataSchema =
-    StructType(
-      Seq(
-        StructField("a", IntegerType, nullable = false),
-        StructField("b", StringType, nullable = false)))
-
-  val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b")
-
-  val partitionedTestDF1 = (for {
-    i <- 1 to 3
-    p2 <- Seq("foo", "bar")
-  } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2")
-
-  val partitionedTestDF2 = (for {
-    i <- 1 to 3
-    p2 <- Seq("foo", "bar")
-  } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2")
-
-  val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2)
-
-  def checkQueries(df: DataFrame): Unit = {
-    // Selects everything
-    checkAnswer(
-      df,
-      for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, 
s"val_$i", p1, p2))
-
-    // Simple filtering and partition pruning
-    checkAnswer(
-      df.filter('a > 1 && 'p1 === 2),
-      for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, 
p2))
-
-    // Simple projection and filtering
-    checkAnswer(
-      df.filter('a > 1).select('b, 'a + 1),
-      for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield 
Row(s"val_$i", i + 1))
-
-    // Simple projection and partition pruning
-    checkAnswer(
-      df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
-      for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1))
-
-    // Self-join
-    df.registerTempTable("t")
-    withTempTable("t") {
-      checkAnswer(
-        sql(
-          """SELECT l.a, r.b, l.p1, r.p2
-            |FROM t l JOIN t r
-            |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2
-          """.stripMargin),
-        for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, 
s"val_$i", p1, p2))
-    }
-  }
-
-  test("save()/load() - non-partitioned table - Overwrite") {
-    withTempPath { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Overwrite)
-
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Overwrite)
-
-      checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        testDF.collect())
-    }
-  }
-
-  test("save()/load() - non-partitioned table - Append") {
-    withTempPath { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Overwrite)
-
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Append)
-
-      checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)).orderBy("a"),
-        testDF.unionAll(testDF).orderBy("a").collect())
-    }
-  }
-
-  test("save()/load() - non-partitioned table - ErrorIfExists") {
-    withTempDir { file =>
-      intercept[RuntimeException] {
-        testDF.save(
-          path = file.getCanonicalPath,
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists)
-      }
-    }
-  }
-
-  test("save()/load() - non-partitioned table - Ignore") {
-    withTempDir { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Ignore)
-
-      val path = new Path(file.getCanonicalPath)
-      val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-      assert(fs.listStatus(path).isEmpty)
-    }
-  }
-
-  test("save()/load() - partitioned table - simple queries") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.ErrorIfExists,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkQueries(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)))
-    }
-  }
-
-  test("save()/load() - partitioned table - Overwrite") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        partitionedTestDF.collect())
-    }
-  }
-
-  test("save()/load() - partitioned table - Append") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        partitionedTestDF.unionAll(partitionedTestDF).collect())
-    }
-  }
-
-  test("save()/load() - partitioned table - Append - new partition values") {
-    withTempPath { file =>
-      partitionedTestDF1.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF2.save(
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        partitionedTestDF.collect())
-    }
-  }
-
-  test("save()/load() - partitioned table - ErrorIfExists") {
-    withTempDir { file =>
-      intercept[RuntimeException] {
-        partitionedTestDF.save(
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists,
-          options = Map("path" -> file.getCanonicalPath),
-          partitionColumns = Seq("p1", "p2"))
-      }
-    }
-  }
-
-  test("save()/load() - partitioned table - Ignore") {
-    withTempDir { file =>
-      partitionedTestDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Ignore)
-
-      val path = new Path(file.getCanonicalPath)
-      val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
-      assert(fs.listStatus(path).isEmpty)
-    }
-  }
-
-  def withTable(tableName: String)(f: => Unit): Unit = {
-    try f finally sql(s"DROP TABLE $tableName")
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - Overwrite") {
-    testDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      Map("dataSchema" -> dataSchema.json))
-
-    withTable("t") {
-      checkAnswer(table("t"), testDF.collect())
-    }
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - Append") {
-    testDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite)
-
-    testDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Append)
-
-    withTable("t") {
-      checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
-    }
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      intercept[AnalysisException] {
-        testDF.saveAsTable(
-          tableName = "t",
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists)
-      }
-    }
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - Ignore") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      testDF.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Ignore)
-
-      assert(table("t").collect().isEmpty)
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - simple queries") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      Map("dataSchema" -> dataSchema.json))
-
-    withTable("t") {
-      checkQueries(table("t"))
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Overwrite") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    withTable("t") {
-      checkAnswer(table("t"), partitionedTestDF.collect())
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Append") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Append,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    withTable("t") {
-      checkAnswer(table("t"), 
partitionedTestDF.unionAll(partitionedTestDF).collect())
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Append - new partition 
values") {
-    partitionedTestDF1.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF2.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Append,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    withTable("t") {
-      checkAnswer(table("t"), partitionedTestDF.collect())
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Append - mismatched 
partition columns") {
-    partitionedTestDF1.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    // Using only a subset of all partition columns
-    intercept[Throwable] {
-      partitionedTestDF2.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p1"))
-    }
-
-    // Using different order of partition columns
-    intercept[Throwable] {
-      partitionedTestDF2.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p2", "p1"))
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - ErrorIfExists") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      intercept[AnalysisException] {
-        partitionedTestDF.saveAsTable(
-          tableName = "t",
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists,
-          options = Map("dataSchema" -> dataSchema.json),
-          partitionColumns = Seq("p1", "p2"))
-      }
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Ignore") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      partitionedTestDF.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Ignore,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p1", "p2"))
-
-      assert(table("t").collect().isEmpty)
-    }
-  }
-
-  test("Hadoop style globbing") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      val df = load(
-        source = dataSourceName,
-        options = Map(
-          "path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
-          "dataSchema" -> dataSchema.json))
-
-      val expectedPaths = Set(
-        s"${file.getCanonicalFile}/p1=1/p2=foo",
-        s"${file.getCanonicalFile}/p1=2/p2=foo",
-        s"${file.getCanonicalFile}/p1=1/p2=bar",
-        s"${file.getCanonicalFile}/p1=2/p2=bar"
-      ).map { p =>
-        val path = new Path(p)
-        val fs = 
path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-        path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
-      }
-
-      val actualPaths = df.queryExecution.analyzed.collectFirst {
-        case LogicalRelation(relation: FSBasedRelation) =>
-          relation.paths.toSet
-      }.getOrElse {
-        fail("Expect an FSBasedRelation, but none could be found")
-      }
-
-      assert(actualPaths === expectedPaths)
-      checkAnswer(df, partitionedTestDF.collect())
-    }
-  }
-}
-
-class SimpleTextRelationSuite extends FSBasedRelationTest {
-  override val dataSourceName: String = 
classOf[SimpleTextSource].getCanonicalName
-
-  import sqlContext._
-
-  test("save()/load() - partitioned table - simple queries - partition columns 
in data") {
-    withTempDir { file =>
-      val basePath = new Path(file.getCanonicalPath)
-      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
-      val qualifiedBasePath = fs.makeQualified(basePath)
-
-      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
-        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
-        sparkContext
-          .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
-          .saveAsTextFile(partitionDir.toString)
-      }
-
-      val dataSchemaWithPartition =
-        StructType(dataSchema.fields :+ StructField("p1", IntegerType, 
nullable = true))
-
-      checkQueries(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchemaWithPartition.json)))
-    }
-  }
-}
-
-class FSBasedParquetRelationSuite extends FSBasedRelationTest {
-  override val dataSourceName: String = 
classOf[parquet.DefaultSource].getCanonicalName
-
-  import sqlContext._
-  import sqlContext.implicits._
-
-  test("save()/load() - partitioned table - simple queries - partition columns 
in data") {
-    withTempDir { file =>
-      val basePath = new Path(file.getCanonicalPath)
-      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
-      val qualifiedBasePath = fs.makeQualified(basePath)
-
-      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
-        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
-        sparkContext
-          .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
-          .toDF("a", "b", "p1")
-          .saveAsParquetFile(partitionDir.toString)
-      }
-
-      val dataSchemaWithPartition =
-        StructType(dataSchema.fields :+ StructField("p1", IntegerType, 
nullable = true))
-
-      checkQueries(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchemaWithPartition.json)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
new file mode 100644
index 0000000..cf6afd2
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.parquet.ParquetTest
+import org.apache.spark.sql.types._
+
+// TODO Don't extend ParquetTest
+// This test suite extends ParquetTest for some convenient utility methods. 
These methods should be
+// moved to some more general places, maybe QueryTest.
+class HadoopFsRelationTest extends QueryTest with ParquetTest {
+  override val sqlContext: SQLContext = TestHive
+
+  import sqlContext._
+  import sqlContext.implicits._
+
+  val dataSourceName = classOf[SimpleTextSource].getCanonicalName
+
+  val dataSchema =
+    StructType(
+      Seq(
+        StructField("a", IntegerType, nullable = false),
+        StructField("b", StringType, nullable = false)))
+
+  val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b")
+
+  val partitionedTestDF1 = (for {
+    i <- 1 to 3
+    p2 <- Seq("foo", "bar")
+  } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2")
+
+  val partitionedTestDF2 = (for {
+    i <- 1 to 3
+    p2 <- Seq("foo", "bar")
+  } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2")
+
+  val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2)
+
+  def checkQueries(df: DataFrame): Unit = {
+    // Selects everything
+    checkAnswer(
+      df,
+      for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, 
s"val_$i", p1, p2))
+
+    // Simple filtering and partition pruning
+    checkAnswer(
+      df.filter('a > 1 && 'p1 === 2),
+      for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, 
p2))
+
+    // Simple projection and filtering
+    checkAnswer(
+      df.filter('a > 1).select('b, 'a + 1),
+      for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield 
Row(s"val_$i", i + 1))
+
+    // Simple projection and partition pruning
+    checkAnswer(
+      df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
+      for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1))
+
+    // Self-join
+    df.registerTempTable("t")
+    withTempTable("t") {
+      checkAnswer(
+        sql(
+          """SELECT l.a, r.b, l.p1, r.p2
+            |FROM t l JOIN t r
+            |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2
+          """.stripMargin),
+        for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, 
s"val_$i", p1, p2))
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Overwrite") {
+    withTempPath { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Overwrite)
+
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Overwrite)
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        testDF.collect())
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Append") {
+    withTempPath { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Overwrite)
+
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Append)
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)).orderBy("a"),
+        testDF.unionAll(testDF).orderBy("a").collect())
+    }
+  }
+
+  test("save()/load() - non-partitioned table - ErrorIfExists") {
+    withTempDir { file =>
+      intercept[RuntimeException] {
+        testDF.save(
+          path = file.getCanonicalPath,
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists)
+      }
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Ignore") {
+    withTempDir { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Ignore)
+
+      val path = new Path(file.getCanonicalPath)
+      val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+      assert(fs.listStatus(path).isEmpty)
+    }
+  }
+
+  test("save()/load() - partitioned table - simple queries") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.ErrorIfExists,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkQueries(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)))
+    }
+  }
+
+  test("save()/load() - partitioned table - Overwrite") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - Append") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.unionAll(partitionedTestDF).collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - Append - new partition values") {
+    withTempPath { file =>
+      partitionedTestDF1.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF2.save(
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - ErrorIfExists") {
+    withTempDir { file =>
+      intercept[RuntimeException] {
+        partitionedTestDF.save(
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists,
+          options = Map("path" -> file.getCanonicalPath),
+          partitionColumns = Seq("p1", "p2"))
+      }
+    }
+  }
+
+  test("save()/load() - partitioned table - Ignore") {
+    withTempDir { file =>
+      partitionedTestDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Ignore)
+
+      val path = new Path(file.getCanonicalPath)
+      val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
+      assert(fs.listStatus(path).isEmpty)
+    }
+  }
+
+  def withTable(tableName: String)(f: => Unit): Unit = {
+    try f finally sql(s"DROP TABLE $tableName")
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Overwrite") {
+    testDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      Map("dataSchema" -> dataSchema.json))
+
+    withTable("t") {
+      checkAnswer(table("t"), testDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Append") {
+    testDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite)
+
+    testDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Append)
+
+    withTable("t") {
+      checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      intercept[AnalysisException] {
+        testDF.saveAsTable(
+          tableName = "t",
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists)
+      }
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Ignore") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      testDF.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Ignore)
+
+      assert(table("t").collect().isEmpty)
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - simple queries") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      Map("dataSchema" -> dataSchema.json))
+
+    withTable("t") {
+      checkQueries(table("t"))
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Overwrite") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), partitionedTestDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Append,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), 
partitionedTestDF.unionAll(partitionedTestDF).collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append - new partition 
values") {
+    partitionedTestDF1.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF2.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Append,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), partitionedTestDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append - mismatched 
partition columns") {
+    partitionedTestDF1.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    // Using only a subset of all partition columns
+    intercept[Throwable] {
+      partitionedTestDF2.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p1"))
+    }
+
+    // Using different order of partition columns
+    intercept[Throwable] {
+      partitionedTestDF2.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p2", "p1"))
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - ErrorIfExists") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      intercept[AnalysisException] {
+        partitionedTestDF.saveAsTable(
+          tableName = "t",
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists,
+          options = Map("dataSchema" -> dataSchema.json),
+          partitionColumns = Seq("p1", "p2"))
+      }
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Ignore") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      partitionedTestDF.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Ignore,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p1", "p2"))
+
+      assert(table("t").collect().isEmpty)
+    }
+  }
+
+  test("Hadoop style globbing") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      val df = load(
+        source = dataSourceName,
+        options = Map(
+          "path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
+          "dataSchema" -> dataSchema.json))
+
+      val expectedPaths = Set(
+        s"${file.getCanonicalFile}/p1=1/p2=foo",
+        s"${file.getCanonicalFile}/p1=2/p2=foo",
+        s"${file.getCanonicalFile}/p1=1/p2=bar",
+        s"${file.getCanonicalFile}/p1=2/p2=bar"
+      ).map { p =>
+        val path = new Path(p)
+        val fs = 
path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
+      }
+
+      val actualPaths = df.queryExecution.analyzed.collectFirst {
+        case LogicalRelation(relation: HadoopFsRelation) =>
+          relation.paths.toSet
+      }.getOrElse {
+        fail("Expect an FSBasedRelation, but none could be found")
+      }
+
+      assert(actualPaths === expectedPaths)
+      checkAnswer(df, partitionedTestDF.collect())
+    }
+  }
+}
+
+class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
+  override val dataSourceName: String = 
classOf[SimpleTextSource].getCanonicalName
+
+  import sqlContext._
+
+  test("save()/load() - partitioned table - simple queries - partition columns 
in data") {
+    withTempDir { file =>
+      val basePath = new Path(file.getCanonicalPath)
+      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+      val qualifiedBasePath = fs.makeQualified(basePath)
+
+      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+        sparkContext
+          .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
+          .saveAsTextFile(partitionDir.toString)
+      }
+
+      val dataSchemaWithPartition =
+        StructType(dataSchema.fields :+ StructField("p1", IntegerType, 
nullable = true))
+
+      checkQueries(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchemaWithPartition.json)))
+    }
+  }
+}
+
+class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
+  override val dataSourceName: String = 
classOf[parquet.DefaultSource].getCanonicalName
+
+  import sqlContext._
+  import sqlContext.implicits._
+
+  test("save()/load() - partitioned table - simple queries - partition columns 
in data") {
+    withTempDir { file =>
+      val basePath = new Path(file.getCanonicalPath)
+      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+      val qualifiedBasePath = fs.makeQualified(basePath)
+
+      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+        sparkContext
+          .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
+          .toDF("a", "b", "p1")
+          .saveAsParquetFile(partitionDir.toString)
+      }
+
+      val dataSchemaWithPartition =
+        StructType(dataSchema.fields :+ StructField("p1", IntegerType, 
nullable = true))
+
+      checkQueries(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchemaWithPartition.json)))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to