Repository: spark
Updated Branches:
  refs/heads/branch-1.2 0f6a2eeaf -> 64b30be7e


[SPARK-4413][SQL] Parquet support through datasource API

Goals:
 - Support for accessing parquet using SQL but not requiring Hive (thus 
allowing support of parquet tables with decimal columns)
 - Support for folder based partitioning with automatic discovery of available 
partitions
 - Caching of file metadata

See scaladoc of `ParquetRelation2` for more details.

Author: Michael Armbrust <mich...@databricks.com>

Closes #3269 from marmbrus/newParquet and squashes the following commits:

1dd75f1 [Michael Armbrust] Pass all paths for FileInputFormat at once.
645768b [Michael Armbrust] Review comments.
abd8e2f [Michael Armbrust] Alternative implementation of parquet based on the 
datasources API.
938019e [Michael Armbrust] Add an experimental interface to data sources that 
exposes catalyst expressions.
e9d2641 [Michael Armbrust] logging / formatting improvements.

(cherry picked from commit 02ec058efe24348cdd3691b55942e6f0ef138732)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64b30be7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64b30be7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64b30be7

Branch: refs/heads/branch-1.2
Commit: 64b30be7e4cb86059bbfeb3e2f8f47f41d015862
Parents: 0f6a2ee
Author: Michael Armbrust <mich...@databricks.com>
Authored: Thu Nov 20 18:31:02 2014 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Thu Nov 20 18:31:31 2014 -0800

----------------------------------------------------------------------
 .../sql/parquet/ParquetTableOperations.scala    |   4 +-
 .../apache/spark/sql/parquet/newParquet.scala   | 290 +++++++++++++++++++
 .../spark/sql/sources/DataSourceStrategy.scala  |  43 ++-
 .../apache/spark/sql/sources/interfaces.scala   |  22 +-
 .../sql/parquet/ParquetMetastoreSuite.scala     | 207 -------------
 .../spark/sql/parquet/parquetSuites.scala       | 253 ++++++++++++++++
 6 files changed, 599 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64b30be7/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 5d0643a..0e36852 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -361,7 +361,7 @@ private[parquet] class FilteringParquetRowInputFormat
 
   private var footers: JList[Footer] = _
 
-  private var fileStatuses= Map.empty[Path, FileStatus]
+  private var fileStatuses = Map.empty[Path, FileStatus]
 
   override def createRecordReader(
       inputSplit: InputSplit,
@@ -405,7 +405,9 @@ private[parquet] class FilteringParquetRowInputFormat
         }
         val newFooters = new mutable.HashMap[FileStatus, Footer]
         if (toFetch.size > 0) {
+          val startFetch = System.currentTimeMillis
           val fetched = getFooters(conf, toFetch)
+          logInfo(s"Fetched $toFetch footers in ${System.currentTimeMillis - 
startFetch} ms")
           for ((status, i) <- toFetch.zipWithIndex) {
             newFooters(status) = fetched.get(i)
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/64b30be7/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
new file mode 100644
index 0000000..bea12e6
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.parquet
+
+import java.util.{List => JList}
+
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
+
+import parquet.hadoop.ParquetInputFormat
+import parquet.hadoop.util.ContextUtil
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.{Partition => SparkPartition, Logging}
+import org.apache.spark.rdd.{NewHadoopPartition, RDD}
+
+import org.apache.spark.sql.{SQLConf, Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, And, 
Expression, Attribute}
+import org.apache.spark.sql.catalyst.types.{IntegerType, StructField, 
StructType}
+import org.apache.spark.sql.sources._
+
+import scala.collection.JavaConversions._
+
+/**
+ * Allows creation of parquet based tables using the syntax
+ * `CREATE TABLE ... USING org.apache.spark.sql.parquet`.  Currently the only 
option required
+ * is `path`, which should be the location of a collection of, optionally 
partitioned,
+ * parquet files.
+ */
+class DefaultSource extends RelationProvider {
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String]): BaseRelation = {
+    val path =
+      parameters.getOrElse("path", sys.error("'path' must be specifed for 
parquet tables."))
+
+    ParquetRelation2(path)(sqlContext)
+  }
+}
+
+private[parquet] case class Partition(partitionValues: Map[String, Any], 
files: Seq[FileStatus])
+
+/**
+ * An alternative to [[ParquetRelation]] that plugs in using the data sources 
API.  This class is
+ * currently not intended as a full replacement of the parquet support in 
Spark SQL though it is
+ * likely that it will eventually subsume the existing physical plan 
implementation.
+ *
+ * Compared with the current implementation, this class has the following 
notable differences:
+ *
+ * Partitioning: Partitions are auto discovered and must be in the form of 
directories `key=value/`
+ * located at `path`.  Currently only a single partitioning column is 
supported and it must
+ * be an integer.  This class supports both fully self-describing data, which 
contains the partition
+ * key, and data where the partition key is only present in the folder 
structure.  The presence
+ * of the partitioning key in the data is also auto-detected.  The `null` 
partition is not yet
+ * supported.
+ *
+ * Metadata: The metadata is automatically discovered by reading the first 
parquet file present.
+ * There is currently no support for working with files that have different 
schema.  Additionally,
+ * when parquet metadata caching is turned on, the FileStatus objects for all 
data will be cached
+ * to improve the speed of interactive querying.  When data is added to a 
table it must be dropped
+ * and recreated to pick up any changes.
+ *
+ * Statistics: Statistics for the size of the table are automatically 
populated during metadata
+ * discovery.
+ */
+@DeveloperApi
+case class ParquetRelation2(path: String)(@transient val sqlContext: 
SQLContext)
+  extends CatalystScan with Logging {
+
+  def sparkContext = sqlContext.sparkContext
+
+  // Minor Hack: scala doesnt seem to respect @transient for vals declared via 
extraction
+  @transient
+  private var partitionKeys: Seq[String] = _
+  @transient
+  private var partitions: Seq[Partition] = _
+  discoverPartitions()
+
+  // TODO: Only finds the first partition, assumes the key is of type 
Integer...
+  private def discoverPartitions() = {
+    val fs = FileSystem.get(new java.net.URI(path), 
sparkContext.hadoopConfiguration)
+    val partValue = "([^=]+)=([^=]+)".r
+
+    val childrenOfPath = fs.listStatus(new 
Path(path)).filterNot(_.getPath.getName.startsWith("_"))
+    val childDirs = childrenOfPath.filter(s => s.isDir)
+
+    if (childDirs.size > 0) {
+      val partitionPairs = childDirs.map(_.getPath.getName).map {
+        case partValue(key, value) => (key, value)
+      }
+
+      val foundKeys = partitionPairs.map(_._1).distinct
+      if (foundKeys.size > 1) {
+        sys.error(s"Too many distinct partition keys: $foundKeys")
+      }
+
+      // Do a parallel lookup of partition metadata.
+      val partitionFiles =
+        childDirs.par.map { d =>
+          fs.listStatus(d.getPath)
+            // TODO: Is there a standard hadoop function for this?
+            .filterNot(_.getPath.getName.startsWith("_"))
+            .filterNot(_.getPath.getName.startsWith("."))
+        }.seq
+
+      partitionKeys = foundKeys.toSeq
+      partitions = partitionFiles.zip(partitionPairs).map { case (files, (key, 
value)) =>
+        Partition(Map(key -> value.toInt), files)
+      }.toSeq
+    } else {
+      partitionKeys = Nil
+      partitions = Partition(Map.empty, childrenOfPath) :: Nil
+    }
+  }
+
+  override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum
+
+  val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not 
deal with attributes.
+    ParquetTypesConverter.readSchemaFromFile(
+      partitions.head.files.head.getPath,
+      Some(sparkContext.hadoopConfiguration),
+      sqlContext.isParquetBinaryAsString))
+
+  val dataIncludesKey =
+    
partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)
+
+  override val schema =
+    if (dataIncludesKey) {
+      dataSchema
+    } else {
+      StructType(dataSchema.fields :+ StructField(partitionKeys.head, 
IntegerType))
+    }
+
+  override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): 
RDD[Row] = {
+    // This is mostly a hack so that we can use the existing parquet filter 
code.
+    val requiredColumns = output.map(_.name)
+    // TODO: Parquet filters should be based on data sources API, not catalyst 
expressions.
+    val filters = DataSourceStrategy.selectFilters(predicates)
+
+    val job = new Job(sparkContext.hadoopConfiguration)
+    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
+    val jobConf: Configuration = ContextUtil.getConfiguration(job)
+
+    val requestedSchema = StructType(requiredColumns.map(schema(_)))
+
+    // TODO: Make folder based partitioning a first class citizen of the Data 
Sources API.
+    val partitionFilters = filters.collect {
+      case e @ EqualTo(attr, value) if partitionKeys.contains(attr) =>
+        logInfo(s"Parquet scan partition filter: $attr=$value")
+        (p: Partition) => p.partitionValues(attr) == value
+
+      case e @ In(attr, values) if partitionKeys.contains(attr) =>
+        logInfo(s"Parquet scan partition filter: $attr IN 
${values.mkString("{", ",", "}")}")
+        val set = values.toSet
+        (p: Partition) => set.contains(p.partitionValues(attr))
+
+      case e @ GreaterThan(attr, value) if partitionKeys.contains(attr) =>
+        logInfo(s"Parquet scan partition filter: $attr > $value")
+        (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] > 
value.asInstanceOf[Int]
+
+      case e @ GreaterThanOrEqual(attr, value) if partitionKeys.contains(attr) 
=>
+        logInfo(s"Parquet scan partition filter: $attr >= $value")
+        (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] >= 
value.asInstanceOf[Int]
+
+      case e @ LessThan(attr, value) if partitionKeys.contains(attr) =>
+        logInfo(s"Parquet scan partition filter: $attr < $value")
+        (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] < 
value.asInstanceOf[Int]
+
+      case e @ LessThanOrEqual(attr, value) if partitionKeys.contains(attr) =>
+        logInfo(s"Parquet scan partition filter: $attr <= $value")
+        (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] <= 
value.asInstanceOf[Int]
+    }
+
+    val selectedPartitions = partitions.filter(p => 
partitionFilters.forall(_(p)))
+    val fs = FileSystem.get(new java.net.URI(path), 
sparkContext.hadoopConfiguration)
+    val selectedFiles = selectedPartitions.flatMap(_.files).map(f => 
fs.makeQualified(f.getPath))
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, 
selectedFiles:_*)
+
+    // Push down filters when possible
+    predicates
+      .reduceOption(And)
+      .flatMap(ParquetFilters.createFilter)
+      .filter(_ => sqlContext.parquetFilterPushDown)
+      .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
+
+    def percentRead = selectedPartitions.size.toDouble / 
partitions.size.toDouble * 100
+    logInfo(s"Reading $percentRead% of $path partitions")
+
+    // Store both requested and original schema in `Configuration`
+    jobConf.set(
+      RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      ParquetTypesConverter.convertToString(requestedSchema.toAttributes))
+    jobConf.set(
+      RowWriteSupport.SPARK_ROW_SCHEMA,
+      ParquetTypesConverter.convertToString(schema.toAttributes))
+
+    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
+    val useCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, 
"true").toBoolean
+    jobConf.set(SQLConf.PARQUET_CACHE_METADATA, useCache.toString)
+
+    val baseRDD =
+      new org.apache.spark.rdd.NewHadoopRDD(
+          sparkContext,
+          classOf[FilteringParquetRowInputFormat],
+          classOf[Void],
+          classOf[Row],
+          jobConf) {
+        val cacheMetadata = useCache
+
+        @transient
+        val cachedStatus = selectedPartitions.flatMap(_.files)
+
+        // Overridden so we can inject our own cached files statuses.
+        override def getPartitions: Array[SparkPartition] = {
+          val inputFormat =
+            if (cacheMetadata) {
+              new FilteringParquetRowInputFormat {
+                override def listStatus(jobContext: JobContext): 
JList[FileStatus] = cachedStatus
+              }
+            } else {
+              new FilteringParquetRowInputFormat
+            }
+
+          inputFormat match {
+            case configurable: Configurable =>
+              configurable.setConf(getConf)
+            case _ =>
+          }
+          val jobContext = newJobContext(getConf, jobId)
+          val rawSplits = inputFormat.getSplits(jobContext).toArray
+          val result = new Array[SparkPartition](rawSplits.size)
+          for (i <- 0 until rawSplits.size) {
+            result(i) =
+              new NewHadoopPartition(id, i, 
rawSplits(i).asInstanceOf[InputSplit with Writable])
+          }
+          result
+        }
+      }
+
+    // The ordinal for the partition key in the result row, if requested.
+    val partitionKeyLocation =
+      partitionKeys
+        .headOption
+        .map(requiredColumns.indexOf(_))
+        .getOrElse(-1)
+
+    // When the data does not include the key and the key is requested then we 
must fill it in
+    // based on information from the input split.
+    if (!dataIncludesKey && partitionKeyLocation != -1) {
+      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
+        val partValue = "([^=]+)=([^=]+)".r
+        val partValues =
+          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
+            .getPath
+            .toString
+            .split("/")
+            .flatMap {
+            case partValue(key, value) => Some(key -> value)
+            case _ => None
+          }.toMap
+
+        val currentValue = partValues.values.head.toInt
+        iter.map { pair =>
+          val res = pair._2.asInstanceOf[SpecificMutableRow]
+          res.setInt(partitionKeyLocation, currentValue)
+          res
+        }
+      }
+    } else {
+      baseRDD.map(_._2)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/64b30be7/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 954e868..37853d4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -31,6 +31,13 @@ import org.apache.spark.sql.execution.SparkPlan
  */
 private[sql] object DataSourceStrategy extends Strategy {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: 
CatalystScan)) =>
+      pruneFilterProjectRaw(
+        l,
+        projectList,
+        filters,
+        (a, f) => t.buildScan(a, f)) :: Nil
+
     case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: 
PrunedFilteredScan)) =>
       pruneFilterProject(
         l,
@@ -51,19 +58,35 @@ private[sql] object DataSourceStrategy extends Strategy {
     case _ => Nil
   }
 
+  // Based on Public API.
   protected def pruneFilterProject(
-    relation: LogicalRelation,
-    projectList: Seq[NamedExpression],
-    filterPredicates: Seq[Expression],
-    scanBuilder: (Array[String], Array[Filter]) => RDD[Row]) = {
+      relation: LogicalRelation,
+      projectList: Seq[NamedExpression],
+      filterPredicates: Seq[Expression],
+      scanBuilder: (Array[String], Array[Filter]) => RDD[Row]) = {
+    pruneFilterProjectRaw(
+      relation,
+      projectList,
+      filterPredicates,
+      (requestedColumns, pushedFilters) => {
+        scanBuilder(requestedColumns.map(_.name).toArray, 
selectFilters(pushedFilters).toArray)
+      })
+  }
+
+  // Based on Catalyst expressions.
+  protected def pruneFilterProjectRaw(
+      relation: LogicalRelation,
+      projectList: Seq[NamedExpression],
+      filterPredicates: Seq[Expression],
+      scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[Row]) = {
 
     val projectSet = AttributeSet(projectList.flatMap(_.references))
     val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
     val filterCondition = filterPredicates.reduceLeftOption(And)
 
-    val pushedFilters = selectFilters(filterPredicates.map { _ transform {
+    val pushedFilters = filterPredicates.map { _ transform {
       case a: AttributeReference => relation.attributeMap(a) // Match original 
case of attributes.
-    }}).toArray
+    }}
 
     if (projectList.map(_.toAttribute) == projectList &&
         projectSet.size == projectList.size &&
@@ -74,8 +97,6 @@ private[sql] object DataSourceStrategy extends Strategy {
       val requestedColumns =
         projectList.asInstanceOf[Seq[Attribute]] // Safe due to if above.
           .map(relation.attributeMap)            // Match original case of 
attributes.
-          .map(_.name)
-          .toArray
 
       val scan =
         execution.PhysicalRDD(
@@ -84,14 +105,14 @@ private[sql] object DataSourceStrategy extends Strategy {
       filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
     } else {
       val requestedColumns = (projectSet ++ 
filterSet).map(relation.attributeMap).toSeq
-      val columnNames = requestedColumns.map(_.name).toArray
 
-      val scan = execution.PhysicalRDD(requestedColumns, 
scanBuilder(columnNames, pushedFilters))
+      val scan =
+        execution.PhysicalRDD(requestedColumns, scanBuilder(requestedColumns, 
pushedFilters))
       execution.Project(projectList, filterCondition.map(execution.Filter(_, 
scan)).getOrElse(scan))
     }
   }
 
-  protected def selectFilters(filters: Seq[Expression]): Seq[Filter] = 
filters.collect {
+  protected[sql] def selectFilters(filters: Seq[Expression]): Seq[Filter] = 
filters.collect {
     case expressions.EqualTo(a: Attribute, Literal(v, _)) => EqualTo(a.name, v)
     case expressions.EqualTo(Literal(v, _), a: Attribute) => EqualTo(a.name, v)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64b30be7/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 861638b..2b8fc05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -16,12 +16,13 @@
 */
 package org.apache.spark.sql.sources
 
-import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.annotation.{Experimental, DeveloperApi}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{SQLConf, Row, SQLContext, StructType}
 import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
 
 /**
+ * ::DeveloperApi::
  * Implemented by objects that produce relations for a specific kind of data 
source.  When
  * Spark SQL is given a DDL operation with a USING clause specified, this 
interface is used to
  * pass in the parameters specified by a user.
@@ -40,6 +41,7 @@ trait RelationProvider {
 }
 
 /**
+ * ::DeveloperApi::
  * Represents a collection of tuples with a known schema.  Classes that extend 
BaseRelation must
  * be able to produce the schema of their data in the form of a [[StructType]] 
 Concrete
  * implementation should inherit from one of the descendant `Scan` classes, 
which define various
@@ -65,6 +67,7 @@ abstract class BaseRelation {
 }
 
 /**
+ * ::DeveloperApi::
  * A BaseRelation that can produce all of its tuples as an RDD of Row objects.
  */
 @DeveloperApi
@@ -73,6 +76,7 @@ abstract class TableScan extends BaseRelation {
 }
 
 /**
+ * ::DeveloperApi::
  * A BaseRelation that can eliminate unneeded columns before producing an RDD
  * containing all of its tuples as Row objects.
  */
@@ -82,6 +86,7 @@ abstract class PrunedScan extends BaseRelation {
 }
 
 /**
+ * ::DeveloperApi::
  * A BaseRelation that can eliminate unneeded columns and filter using selected
  * predicates before producing an RDD containing all matching tuples as Row 
objects.
  *
@@ -93,3 +98,18 @@ abstract class PrunedScan extends BaseRelation {
 abstract class PrunedFilteredScan extends BaseRelation {
   def buildScan(requiredColumns: Array[String], filters: Array[Filter]): 
RDD[Row]
 }
+
+/**
+ * ::Experimental::
+ * An interface for experimenting with a more direct connection to the query 
planner.  Compared to
+ * [[PrunedFilteredScan]], this operator receives the raw expressions from the
+ * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].  Unlike the 
other APIs this
+ * interface is not designed to be binary compatible across releases and thus 
should only be used
+ * for experimentation.
+ */
+@Experimental
+abstract class CatalystScan extends BaseRelation {
+  def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): 
RDD[Row]
+}
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/64b30be7/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
deleted file mode 100644
index cc65242..0000000
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.io.File
-
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.hive.execution.HiveTableScan
-import org.apache.spark.sql.hive.test.TestHive._
-
-// The data where the partitioning key exists only in the directory structure.
-case class ParquetData(intField: Int, stringField: String)
-// The data that also includes the partitioning key
-case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
-
-
-/**
- * Tests for our SerDe -> Native parquet scan conversion.
- */
-class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
-  override def beforeAll(): Unit = {
-    val partitionedTableDir = File.createTempFile("parquettests", "sparksql")
-    partitionedTableDir.delete()
-    partitionedTableDir.mkdir()
-
-    (1 to 10).foreach { p =>
-      val partDir = new File(partitionedTableDir, s"p=$p")
-      sparkContext.makeRDD(1 to 10)
-        .map(i => ParquetData(i, s"part-$p"))
-        .saveAsParquetFile(partDir.getCanonicalPath)
-    }
-
-    val partitionedTableDirWithKey = File.createTempFile("parquettests", 
"sparksql")
-    partitionedTableDirWithKey.delete()
-    partitionedTableDirWithKey.mkdir()
-
-    (1 to 10).foreach { p =>
-      val partDir = new File(partitionedTableDirWithKey, s"p=$p")
-      sparkContext.makeRDD(1 to 10)
-        .map(i => ParquetDataWithKey(p, i, s"part-$p"))
-        .saveAsParquetFile(partDir.getCanonicalPath)
-    }
-
-    sql(s"""
-    create external table partitioned_parquet
-    (
-      intField INT,
-      stringField STRING
-    )
-    PARTITIONED BY (p int)
-    ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
-     STORED AS
-     INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
-     OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
-    location '${partitionedTableDir.getCanonicalPath}'
-    """)
-
-    sql(s"""
-    create external table partitioned_parquet_with_key
-    (
-      intField INT,
-      stringField STRING
-    )
-    PARTITIONED BY (p int)
-    ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
-     STORED AS
-     INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
-     OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
-    location '${partitionedTableDirWithKey.getCanonicalPath}'
-    """)
-
-    sql(s"""
-    create external table normal_parquet
-    (
-      intField INT,
-      stringField STRING
-    )
-    ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
-     STORED AS
-     INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
-     OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
-    location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
-    """)
-
-    (1 to 10).foreach { p =>
-      sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
-    }
-
-    (1 to 10).foreach { p =>
-      sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
-    }
-
-    setConf("spark.sql.hive.convertMetastoreParquet", "true")
-  }
-
-  override def afterAll(): Unit = {
-    setConf("spark.sql.hive.convertMetastoreParquet", "false")
-  }
-
-  Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
-    test(s"project the partitioning column $table") {
-      checkAnswer(
-        sql(s"SELECT p, count(*) FROM $table group by p"),
-        (1, 10) ::
-        (2, 10) ::
-        (3, 10) ::
-        (4, 10) ::
-        (5, 10) ::
-        (6, 10) ::
-        (7, 10) ::
-        (8, 10) ::
-        (9, 10) ::
-        (10, 10) :: Nil
-      )
-    }
-
-    test(s"project partitioning and non-partitioning columns $table") {
-      checkAnswer(
-        sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, 
stringField"),
-        ("part-1", 1, 10) ::
-        ("part-2", 2, 10) ::
-        ("part-3", 3, 10) ::
-        ("part-4", 4, 10) ::
-        ("part-5", 5, 10) ::
-        ("part-6", 6, 10) ::
-        ("part-7", 7, 10) ::
-        ("part-8", 8, 10) ::
-        ("part-9", 9, 10) ::
-        ("part-10", 10, 10) :: Nil
-      )
-    }
-
-    test(s"simple count $table") {
-      checkAnswer(
-        sql(s"SELECT COUNT(*) FROM $table"),
-        100)
-    }
-
-    test(s"pruned count $table") {
-      checkAnswer(
-        sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
-        10)
-    }
-
-    test(s"multi-partition pruned count $table") {
-      checkAnswer(
-        sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
-        30)
-    }
-
-    test(s"non-partition predicates $table") {
-      checkAnswer(
-        sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
-        30)
-    }
-
-    test(s"sum $table") {
-      checkAnswer(
-        sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p 
= 1"),
-        1 + 2 + 3)
-    }
-
-    test(s"hive udfs $table") {
-      checkAnswer(
-        sql(s"SELECT concat(stringField, stringField) FROM $table"),
-        sql(s"SELECT stringField FROM $table").map {
-          case Row(s: String) => Row(s + s)
-        }.collect().toSeq)
-    }
-  }
-
-  test("non-part select(*)") {
-    checkAnswer(
-      sql("SELECT COUNT(*) FROM normal_parquet"),
-      10)
-  }
-
-  test("conversion is working") {
-    assert(
-      sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
-        case _: HiveTableScan => true
-      }.isEmpty)
-    assert(
-      sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
-        case _: ParquetTableScan => true
-      }.nonEmpty)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/64b30be7/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
new file mode 100644
index 0000000..7159ebd
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -0,0 +1,253 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.hive.test.TestHive._
+
+// The data where the partitioning key exists only in the directory structure.
+case class ParquetData(intField: Int, stringField: String)
+// The data that also includes the partitioning key
+case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+
+
+/**
+ * A suite to test the automatic conversion of metastore tables with parquet 
data to use the
+ * built in parquet support.
+ */
+class ParquetMetastoreSuite extends ParquetTest {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    sql(s"""
+      create external table partitioned_parquet
+      (
+        intField INT,
+        stringField STRING
+      )
+      PARTITIONED BY (p int)
+      ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      location '${partitionedTableDir.getCanonicalPath}'
+    """)
+
+    sql(s"""
+      create external table partitioned_parquet_with_key
+      (
+        intField INT,
+        stringField STRING
+      )
+      PARTITIONED BY (p int)
+      ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      location '${partitionedTableDirWithKey.getCanonicalPath}'
+    """)
+
+    sql(s"""
+      create external table normal_parquet
+      (
+        intField INT,
+        stringField STRING
+      )
+      ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+    """)
+
+    (1 to 10).foreach { p =>
+      sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
+    }
+
+    (1 to 10).foreach { p =>
+      sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
+    }
+
+    setConf("spark.sql.hive.convertMetastoreParquet", "true")
+  }
+
+  override def afterAll(): Unit = {
+    setConf("spark.sql.hive.convertMetastoreParquet", "false")
+  }
+
+  test("conversion is working") {
+    assert(
+      sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+        case _: HiveTableScan => true
+      }.isEmpty)
+    assert(
+      sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+        case _: ParquetTableScan => true
+      }.nonEmpty)
+  }
+}
+
+/**
+ * A suite of tests for the Parquet support through the data sources API.
+ */
+class ParquetSourceSuite extends ParquetTest {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    sql( s"""
+      create temporary table partitioned_parquet
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${partitionedTableDir.getCanonicalPath}'
+      )
+    """)
+
+    sql( s"""
+      create temporary table partitioned_parquet_with_key
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${partitionedTableDirWithKey.getCanonicalPath}'
+      )
+    """)
+
+    sql( s"""
+      create temporary table normal_parquet
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+      )
+    """)
+  }
+}
+
+/**
+ * A collection of tests for parquet data with various forms of partitioning.
+ */
+abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
+  var partitionedTableDir: File = null
+  var partitionedTableDirWithKey: File = null
+
+  override def beforeAll(): Unit = {
+    partitionedTableDir = File.createTempFile("parquettests", "sparksql")
+    partitionedTableDir.delete()
+    partitionedTableDir.mkdir()
+
+    (1 to 10).foreach { p =>
+      val partDir = new File(partitionedTableDir, s"p=$p")
+      sparkContext.makeRDD(1 to 10)
+        .map(i => ParquetData(i, s"part-$p"))
+        .saveAsParquetFile(partDir.getCanonicalPath)
+    }
+
+    partitionedTableDirWithKey = File.createTempFile("parquettests", 
"sparksql")
+    partitionedTableDirWithKey.delete()
+    partitionedTableDirWithKey.mkdir()
+
+    (1 to 10).foreach { p =>
+      val partDir = new File(partitionedTableDirWithKey, s"p=$p")
+      sparkContext.makeRDD(1 to 10)
+        .map(i => ParquetDataWithKey(p, i, s"part-$p"))
+        .saveAsParquetFile(partDir.getCanonicalPath)
+    }
+  }
+
+  Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
+    test(s"project the partitioning column $table") {
+      checkAnswer(
+        sql(s"SELECT p, count(*) FROM $table group by p"),
+        (1, 10) ::
+        (2, 10) ::
+        (3, 10) ::
+        (4, 10) ::
+        (5, 10) ::
+        (6, 10) ::
+        (7, 10) ::
+        (8, 10) ::
+        (9, 10) ::
+        (10, 10) :: Nil
+      )
+    }
+
+    test(s"project partitioning and non-partitioning columns $table") {
+      checkAnswer(
+        sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, 
stringField"),
+        ("part-1", 1, 10) ::
+        ("part-2", 2, 10) ::
+        ("part-3", 3, 10) ::
+        ("part-4", 4, 10) ::
+        ("part-5", 5, 10) ::
+        ("part-6", 6, 10) ::
+        ("part-7", 7, 10) ::
+        ("part-8", 8, 10) ::
+        ("part-9", 9, 10) ::
+        ("part-10", 10, 10) :: Nil
+      )
+    }
+
+    test(s"simple count $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table"),
+        100)
+    }
+
+    test(s"pruned count $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
+        10)
+    }
+
+    test(s"multi-partition pruned count $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
+        30)
+    }
+
+    test(s"non-partition predicates $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
+        30)
+    }
+
+    test(s"sum $table") {
+      checkAnswer(
+        sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p 
= 1"),
+        1 + 2 + 3)
+    }
+
+    test(s"hive udfs $table") {
+      checkAnswer(
+        sql(s"SELECT concat(stringField, stringField) FROM $table"),
+        sql(s"SELECT stringField FROM $table").map {
+          case Row(s: String) => Row(s + s)
+        }.collect().toSeq)
+    }
+  }
+
+  test("non-part select(*)") {
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM normal_parquet"),
+      10)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to