[SPARK-9095] [SQL] Removes the old Parquet support

This PR removes the old Parquet support:

- Removes the old `ParquetRelation` together with related SQL configuration, 
plan nodes, strategies, utility classes, and test suites.

- Renames `ParquetRelation2` to `ParquetRelation`

- Renames `RowReadSupport` and `RowRecordMaterializer` to `CatalystReadSupport` 
and `CatalystRecordMaterializer` respectively, and moved them to separate files.

  This follows naming convention used in other Parquet data models implemented 
in parquet-mr. It should be easier for developers who are familiar with Parquet 
to follow.

There's still some other code that can be cleaned up. Especially 
`RowWriteSupport`. But I'd like to leave this part to SPARK-8848.

Author: Cheng Lian <[email protected]>

Closes #7441 from liancheng/spark-9095 and squashes the following commits:

c7b6e38 [Cheng Lian] Removes WriteToFile
2d688d6 [Cheng Lian] Renames ParquetRelation2 to ParquetRelation
ca9e1b7 [Cheng Lian] Removes old Parquet support


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c025c3d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c025c3d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c025c3d0

Branch: refs/heads/master
Commit: c025c3d0a1fdfbc45b64db9c871176b40b4a7b9b
Parents: 6b2baec
Author: Cheng Lian <[email protected]>
Authored: Sun Jul 26 16:49:19 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sun Jul 26 16:49:19 2015 -0700

----------------------------------------------------------------------
 .../catalyst/plans/logical/basicOperators.scala |   6 -
 .../scala/org/apache/spark/sql/DataFrame.scala  |   9 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |   8 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |   6 -
 .../scala/org/apache/spark/sql/SQLContext.scala |   6 +-
 .../spark/sql/execution/SparkStrategies.scala   |  58 +-
 .../spark/sql/parquet/CatalystReadSupport.scala | 153 ++++
 .../parquet/CatalystRecordMaterializer.scala    |  41 +
 .../sql/parquet/CatalystSchemaConverter.scala   |   5 +
 .../spark/sql/parquet/ParquetConverter.scala    |   1 +
 .../spark/sql/parquet/ParquetRelation.scala     | 843 +++++++++++++++----
 .../sql/parquet/ParquetTableOperations.scala    | 492 -----------
 .../spark/sql/parquet/ParquetTableSupport.scala | 151 +---
 .../apache/spark/sql/parquet/ParquetTypes.scala |  42 +-
 .../apache/spark/sql/parquet/newParquet.scala   | 732 ----------------
 .../spark/sql/parquet/ParquetFilterSuite.scala  |  65 +-
 .../spark/sql/parquet/ParquetIOSuite.scala      |  37 +-
 .../ParquetPartitionDiscoverySuite.scala        |   2 +-
 .../spark/sql/parquet/ParquetQuerySuite.scala   |  27 +-
 .../spark/sql/parquet/ParquetSchemaSuite.scala  |  12 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |   2 -
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  22 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  | 141 +---
 .../spark/sql/hive/HiveParquetSuite.scala       |  86 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  14 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |  54 +-
 .../apache/spark/sql/hive/parquetSuites.scala   | 174 +---
 27 files changed, 1037 insertions(+), 2152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 8e1a236..af68358 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -186,12 +186,6 @@ case class WithWindowDefinition(
   override def output: Seq[Attribute] = child.output
 }
 
-case class WriteToFile(
-    path: String,
-    child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-}
-
 /**
  * @param order  The ordering expressions
  * @param global True means global sorting apply for entire data set,

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index fa942a1..114ab91 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -139,8 +139,7 @@ class DataFrame private[sql](
     // happen right away to let these side effects take place eagerly.
     case _: Command |
          _: InsertIntoTable |
-         _: CreateTableUsingAsSelect |
-         _: WriteToFile =>
+         _: CreateTableUsingAsSelect =>
       LogicalRDD(queryExecution.analyzed.output, 
queryExecution.toRdd)(sqlContext)
     case _ =>
       queryExecution.analyzed
@@ -1615,11 +1614,7 @@ class DataFrame private[sql](
    */
   @deprecated("Use write.parquet(path)", "1.4.0")
   def saveAsParquetFile(path: String): Unit = {
-    if (sqlContext.conf.parquetUseDataSourceApi) {
-      write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
-    } else {
-      sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
-    }
+    write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e9d782c..eb09807 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -21,16 +21,16 @@ import java.util.Properties
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{Logging, Partition}
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{LogicalRelation, 
ResolvedDataSource}
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, 
JDBCRelation}
 import org.apache.spark.sql.json.JSONRelation
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.ParquetRelation
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.{Logging, Partition}
 
 /**
  * :: Experimental ::
@@ -259,7 +259,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) 
extends Logging {
       }.toArray
 
       sqlContext.baseRelationToDataFrame(
-        new ParquetRelation2(
+        new ParquetRelation(
           globbedPaths.map(_.toString), None, None, 
extraOptions.toMap)(sqlContext))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2a641b9..9b2dbd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -276,10 +276,6 @@ private[spark] object SQLConf {
     defaultValue = Some(true),
     doc = "Enables Parquet filter push-down optimization when set to true.")
 
-  val PARQUET_USE_DATA_SOURCE_API = 
booleanConf("spark.sql.parquet.useDataSourceApi",
-    defaultValue = Some(true),
-    doc = "<TODO>")
-
   val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
     key = "spark.sql.parquet.followParquetFormatSpec",
     defaultValue = Some(false),
@@ -456,8 +452,6 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf {
 
   private[spark] def parquetFilterPushDown: Boolean = 
getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
 
-  private[spark] def parquetUseDataSourceApi: Boolean = 
getConf(PARQUET_USE_DATA_SOURCE_API)
-
   private[spark] def orcFilterPushDown: Boolean = 
getConf(ORC_FILTER_PUSHDOWN_ENABLED)
 
   private[spark] def verifyPartitionPath: Boolean = 
getConf(HIVE_VERIFY_PARTITION_PATH)

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 49bfe74..0e25e06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -870,7 +870,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
       LeftSemiJoin ::
       HashJoin ::
       InMemoryScans ::
-      ParquetOperations ::
       BasicOperators ::
       CartesianProduct ::
       BroadcastNestedLoopJoin :: Nil)
@@ -1115,11 +1114,8 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
   def parquetFile(paths: String*): DataFrame = {
     if (paths.isEmpty) {
       emptyDataFrame
-    } else if (conf.parquetUseDataSourceApi) {
-      read.parquet(paths : _*)
     } else {
-      DataFrame(this, parquet.ParquetRelation(
-        paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
+      read.parquet(paths : _*)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index eb4be19..e2c7e80 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,19 +17,18 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.{SQLContext, Strategy, execution}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2}
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, 
InMemoryRelation}
-import org.apache.spark.sql.execution.{DescribeCommand => 
RunnableDescribeCommand}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsing, 
CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
-import org.apache.spark.sql.parquet._
+import org.apache.spark.sql.execution.{DescribeCommand => 
RunnableDescribeCommand}
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.{SQLContext, Strategy, execution}
 
 private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   self: SQLContext#SparkPlanner =>
@@ -306,57 +305,6 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     }
   }
 
-  object ParquetOperations extends Strategy {
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      // TODO: need to support writing to other types of files.  Unify the 
below code paths.
-      case logical.WriteToFile(path, child) =>
-        val relation =
-          ParquetRelation.create(path, child, 
sparkContext.hadoopConfiguration, sqlContext)
-        // Note: overwrite=false because otherwise the metadata we just 
created will be deleted
-        InsertIntoParquetTable(relation, planLater(child), overwrite = false) 
:: Nil
-      case logical.InsertIntoTable(
-          table: ParquetRelation, partition, child, overwrite, ifNotExists) =>
-        InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
-      case PhysicalOperation(projectList, filters: Seq[Expression], relation: 
ParquetRelation) =>
-        val partitionColNames = 
relation.partitioningAttributes.map(_.name).toSet
-        val filtersToPush = filters.filter { pred =>
-            val referencedColNames = pred.references.map(_.name).toSet
-            referencedColNames.intersect(partitionColNames).isEmpty
-          }
-        val prunePushedDownFilters =
-          if (sqlContext.conf.parquetFilterPushDown) {
-            (predicates: Seq[Expression]) => {
-              // Note: filters cannot be pushed down to Parquet if they 
contain more complex
-              // expressions than simple "Attribute cmp Literal" comparisons. 
Here we remove all
-              // filters that have been pushed down. Note that a predicate 
such as "(A AND B) OR C"
-              // can result in "A OR C" being pushed down. Here we are 
conservative in the sense
-              // that even if "A" was pushed and we check for "A AND B" we 
still want to keep
-              // "A AND B" in the higher-level filter, not just "B".
-              predicates.map(p => p -> ParquetFilters.createFilter(p)).collect 
{
-                case (predicate, None) => predicate
-                // Filter needs to be applied above when it contains 
partitioning
-                // columns
-                case (predicate, _)
-                  if 
!predicate.references.map(_.name).toSet.intersect(partitionColNames).isEmpty =>
-                  predicate
-              }
-            }
-          } else {
-            identity[Seq[Expression]] _
-          }
-        pruneFilterProject(
-          projectList,
-          filters,
-          prunePushedDownFilters,
-          ParquetTableScan(
-            _,
-            relation,
-            if (sqlContext.conf.parquetFilterPushDown) filtersToPush else 
Nil)) :: Nil
-
-      case _ => Nil
-    }
-  }
-
   object InMemoryScans extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
new file mode 100644
index 0000000..975fec1
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConversions.{iterableAsScalaIterable, 
mapAsJavaMap, mapAsScalaMap}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+
+private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] 
with Logging {
+  override def prepareForRead(
+      conf: Configuration,
+      keyValueMetaData: JMap[String, String],
+      fileSchema: MessageType,
+      readContext: ReadContext): RecordMaterializer[InternalRow] = {
+    log.debug(s"Preparing for read Parquet file with message type: 
$fileSchema")
+
+    val toCatalyst = new CatalystSchemaConverter(conf)
+    val parquetRequestedSchema = readContext.getRequestedSchema
+
+    val catalystRequestedSchema =
+      Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { 
metadata =>
+        metadata
+          // First tries to read requested schema, which may result from 
projections
+          .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
+          // If not available, tries to read Catalyst schema from file 
metadata.  It's only
+          // available if the target file is written by Spark SQL.
+          .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
+      }.map(StructType.fromString).getOrElse {
+        logDebug("Catalyst schema not available, falling back to Parquet 
schema")
+        toCatalyst.convert(parquetRequestedSchema)
+      }
+
+    logDebug(s"Catalyst schema used to read Parquet files: 
$catalystRequestedSchema")
+    new CatalystRecordMaterializer(parquetRequestedSchema, 
catalystRequestedSchema)
+  }
+
+  override def init(context: InitContext): ReadContext = {
+    val conf = context.getConfiguration
+
+    // If the target file was written by Spark SQL, we should be able to find 
a serialized Catalyst
+    // schema of this file from its the metadata.
+    val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
+
+    // Optional schema of requested columns, in the form of a string 
serialized from a Catalyst
+    // `StructType` containing all requested columns.
+    val maybeRequestedSchema = 
Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
+
+    // Below we construct a Parquet schema containing all requested columns.  
This schema tells
+    // Parquet which columns to read.
+    //
+    // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet 
schema.  Otherwise,
+    // we have to fallback to the full file schema which contains all columns 
in the file.
+    // Obviously this may waste IO bandwidth since it may read more columns 
than requested.
+    //
+    // Two things to note:
+    //
+    // 1. It's possible that some requested columns don't exist in the target 
Parquet file.  For
+    //    example, in the case of schema merging, the globally merged schema 
may contain extra
+    //    columns gathered from other Parquet files.  These columns will be 
simply filled with nulls
+    //    when actually reading the target Parquet file.
+    //
+    // 2. When `maybeRequestedSchema` is available, we can't simply convert 
the Catalyst schema to
+    //    Parquet schema using `CatalystSchemaConverter`, because the mapping 
is not unique due to
+    //    non-standard behaviors of some Parquet libraries/tools.  For 
example, a Parquet file
+    //    containing a single integer array field `f1` may have the following 
legacy 2-level
+    //    structure:
+    //
+    //      message root {
+    //        optional group f1 (LIST) {
+    //          required INT32 element;
+    //        }
+    //      }
+    //
+    //    while `CatalystSchemaConverter` may generate a standard 3-level 
structure:
+    //
+    //      message root {
+    //        optional group f1 (LIST) {
+    //          repeated group list {
+    //            required INT32 element;
+    //          }
+    //        }
+    //      }
+    //
+    //    Apparently, we can't use the 2nd schema to read the target Parquet 
file as they have
+    //    different physical structures.
+    val parquetRequestedSchema =
+      maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
+        val toParquet = new CatalystSchemaConverter(conf)
+        val fileSchema = context.getFileSchema.asGroupType()
+        val fileFieldNames = fileSchema.getFields.map(_.getName).toSet
+
+        StructType
+          // Deserializes the Catalyst schema of requested columns
+          .fromString(schemaString)
+          .map { field =>
+            if (fileFieldNames.contains(field.name)) {
+              // If the field exists in the target Parquet file, extracts the 
field type from the
+              // full file schema and makes a single-field Parquet schema
+              new MessageType("root", fileSchema.getType(field.name))
+            } else {
+              // Otherwise, just resorts to `CatalystSchemaConverter`
+              toParquet.convert(StructType(Array(field)))
+            }
+          }
+          // Merges all single-field Parquet schemas to form a complete schema 
for all requested
+          // columns.  Note that it's possible that no columns are requested 
at all (e.g., count
+          // some partition column of a partitioned Parquet table). That's why 
`fold` is used here
+          // and always fallback to an empty Parquet schema.
+          .fold(new MessageType("root")) {
+            _ union _
+          }
+      }
+
+    val metadata =
+      Map.empty[String, String] ++
+        
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
+        maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
+
+    logInfo(s"Going to read Parquet file with these requested columns: 
$parquetRequestedSchema")
+    new ReadContext(parquetRequestedSchema, metadata)
+  }
+}
+
+private[parquet] object CatalystReadSupport {
+  val SPARK_ROW_REQUESTED_SCHEMA = 
"org.apache.spark.sql.parquet.row.requested_schema"
+
+  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala
new file mode 100644
index 0000000..84f1dcc
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
+import org.apache.parquet.schema.MessageType
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A [[RecordMaterializer]] for Catalyst rows.
+ *
+ * @param parquetSchema Parquet schema of the records to be read
+ * @param catalystSchema Catalyst schema of the rows to be constructed
+ */
+private[parquet] class CatalystRecordMaterializer(
+    parquetSchema: MessageType, catalystSchema: StructType)
+  extends RecordMaterializer[InternalRow] {
+
+  private val rootConverter = new CatalystRowConverter(parquetSchema, 
catalystSchema, NoopUpdater)
+
+  override def getCurrentRecord: InternalRow = rootConverter.currentRow
+
+  override def getRootConverter: GroupConverter = rootConverter
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
index 1d3a0d1..e9ef01e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
@@ -570,6 +570,11 @@ private[parquet] object CatalystSchemaConverter {
        """.stripMargin.split("\n").mkString(" "))
   }
 
+  def checkFieldNames(schema: StructType): StructType = {
+    schema.fieldNames.foreach(checkFieldName)
+    schema
+  }
+
   def analysisRequire(f: => Boolean, message: String): Unit = {
     if (!f) {
       throw new AnalysisException(message)

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index be0a202..ea51650 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
 
 import org.apache.spark.sql.catalyst.InternalRow
 
+// TODO Removes this while fixing SPARK-8848
 private[sql] object CatalystConverter {
   // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
   // Note that "array" for the array elements is chosen by ParquetAvro.

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 086559e..cc6fa2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -17,81 +17,720 @@
 
 package org.apache.spark.sql.parquet
 
-import java.io.IOException
+import java.net.URI
 import java.util.logging.{Level, Logger => JLogger}
+import java.util.{List => JList}
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsAction
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.util.{Failure, Try}
+
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat, 
ParquetRecordReader}
+import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, 
_}
 import org.apache.parquet.schema.MessageType
 import org.apache.parquet.{Log => ParquetLog}
 
-import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
UnresolvedException}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.RDD._
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.{SqlNewHadoopPartition, SqlNewHadoopRDD}
+import org.apache.spark.sql.execution.datasources.PartitionSpec
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+
+private[sql] class DefaultSource extends HadoopFsRelationProvider {
+  override def createRelation(
+      sqlContext: SQLContext,
+      paths: Array[String],
+      schema: Option[StructType],
+      partitionColumns: Option[StructType],
+      parameters: Map[String, String]): HadoopFsRelation = {
+    new ParquetRelation(paths, schema, None, partitionColumns, 
parameters)(sqlContext)
+  }
+}
+
+// NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
+private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext)
+  extends OutputWriterInternal {
+
+  private val recordWriter: RecordWriter[Void, InternalRow] = {
+    val outputFormat = {
+      new ParquetOutputFormat[InternalRow]() {
+        // Here we override `getDefaultWorkFile` for two reasons:
+        //
+        //  1. To allow appending.  We need to generate unique output file 
names to avoid
+        //     overwriting existing files (either exist before the write job, 
or are just written
+        //     by other tasks within the same write job).
+        //
+        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` 
uses
+        //     `FileOutputCommitter.getWorkPath()`, which points to the base 
directory of all
+        //     partitions in the case of dynamic partitioning.
+        override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
+          val uniqueWriteJobId = 
context.getConfiguration.get("spark.sql.sources.writeJobUUID")
+          val split = context.getTaskAttemptID.getTaskID.getId
+          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
+        }
+      }
+    }
+
+    outputFormat.getRecordWriter(context)
+  }
+
+  override def writeInternal(row: InternalRow): Unit = 
recordWriter.write(null, row)
+
+  override def close(): Unit = recordWriter.close(context)
+}
+
+private[sql] class ParquetRelation(
+    override val paths: Array[String],
+    private val maybeDataSchema: Option[StructType],
+    // This is for metastore conversion.
+    private val maybePartitionSpec: Option[PartitionSpec],
+    override val userDefinedPartitionColumns: Option[StructType],
+    parameters: Map[String, String])(
+    val sqlContext: SQLContext)
+  extends HadoopFsRelation(maybePartitionSpec)
+  with Logging {
+
+  private[sql] def this(
+      paths: Array[String],
+      maybeDataSchema: Option[StructType],
+      maybePartitionSpec: Option[PartitionSpec],
+      parameters: Map[String, String])(
+      sqlContext: SQLContext) = {
+    this(
+      paths,
+      maybeDataSchema,
+      maybePartitionSpec,
+      maybePartitionSpec.map(_.partitionColumns),
+      parameters)(sqlContext)
+  }
+
+  // Should we merge schemas from all Parquet part-files?
+  private val shouldMergeSchemas =
+    parameters
+      .get(ParquetRelation.MERGE_SCHEMA)
+      .map(_.toBoolean)
+      
.getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
+
+  private val maybeMetastoreSchema = parameters
+    .get(ParquetRelation.METASTORE_SCHEMA)
+    .map(DataType.fromJson(_).asInstanceOf[StructType])
+
+  private lazy val metadataCache: MetadataCache = {
+    val meta = new MetadataCache
+    meta.refresh()
+    meta
+  }
 
-/**
- * Relation that consists of data stored in a Parquet columnar format.
- *
- * Users should interact with parquet files though a [[DataFrame]], created by 
a [[SQLContext]]
- * instead of using this class directly.
- *
- * {{{
- *   val parquetRDD = sqlContext.parquetFile("path/to/parquet.file")
- * }}}
- *
- * @param path The path to the Parquet file.
- */
-private[sql] case class ParquetRelation(
-    path: String,
-    @transient conf: Option[Configuration],
-    @transient sqlContext: SQLContext,
-    partitioningAttributes: Seq[Attribute] = Nil)
-  extends LeafNode with MultiInstanceRelation {
-
-  /** Schema derived from ParquetFile */
-  def parquetSchema: MessageType =
-    ParquetTypesConverter
-      .readMetaData(new Path(path), conf)
-      .getFileMetaData
-      .getSchema
-
-  /** Attributes */
-  override val output =
-    partitioningAttributes ++
-    ParquetTypesConverter.readSchemaFromFile(
-      new Path(path.split(",").head),
-      conf,
-      sqlContext.conf.isParquetBinaryAsString,
-      sqlContext.conf.isParquetINT96AsTimestamp)
-  lazy val attributeMap = AttributeMap(output.map(o => o -> o))
-
-  override def newInstance(): this.type = {
-    ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
-  }
-
-  // Equals must also take into account the output attributes so that we can 
distinguish between
-  // different instances of the same relation,
   override def equals(other: Any): Boolean = other match {
-    case p: ParquetRelation =>
-      p.path == path && p.output == output
+    case that: ParquetRelation =>
+      val schemaEquality = if (shouldMergeSchemas) {
+        this.shouldMergeSchemas == that.shouldMergeSchemas
+      } else {
+        this.dataSchema == that.dataSchema &&
+          this.schema == that.schema
+      }
+
+      this.paths.toSet == that.paths.toSet &&
+        schemaEquality &&
+        this.maybeDataSchema == that.maybeDataSchema &&
+        this.partitionColumns == that.partitionColumns
+
     case _ => false
   }
 
-  override def hashCode: Int = {
-    com.google.common.base.Objects.hashCode(path, output)
+  override def hashCode(): Int = {
+    if (shouldMergeSchemas) {
+      Objects.hashCode(
+        Boolean.box(shouldMergeSchemas),
+        paths.toSet,
+        maybeDataSchema,
+        partitionColumns)
+    } else {
+      Objects.hashCode(
+        Boolean.box(shouldMergeSchemas),
+        paths.toSet,
+        dataSchema,
+        schema,
+        maybeDataSchema,
+        partitionColumns)
+    }
+  }
+
+  /** Constraints on schema of dataframe to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
+    if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+      val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => "\"" + x + "\""
+      }.mkString(", ")
+      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns 
found, " +
+        s"cannot save to parquet format")
+    }
+  }
+
+  override def dataSchema: StructType = {
+    val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema)
+    // check if schema satisfies the constraints
+    // before moving forward
+    checkConstraints(schema)
+    schema
+  }
+
+  override private[sql] def refresh(): Unit = {
+    super.refresh()
+    metadataCache.refresh()
+  }
+
+  // Parquet data source always uses Catalyst internal representations.
+  override val needConversion: Boolean = false
+
+  override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
+
+  override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+    val conf = ContextUtil.getConfiguration(job)
+
+    val committerClass =
+      conf.getClass(
+        SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
+        classOf[ParquetOutputCommitter],
+        classOf[ParquetOutputCommitter])
+
+    if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
+      logInfo("Using default output committer for Parquet: " +
+        classOf[ParquetOutputCommitter].getCanonicalName)
+    } else {
+      logInfo("Using user defined output committer for Parquet: " + 
committerClass.getCanonicalName)
+    }
+
+    conf.setClass(
+      SQLConf.OUTPUT_COMMITTER_CLASS.key,
+      committerClass,
+      classOf[ParquetOutputCommitter])
+
+    // We're not really using `ParquetOutputFormat[Row]` for writing data 
here, because we override
+    // it in `ParquetOutputWriter` to support appending and dynamic 
partitioning.  The reason why
+    // we set it here is to setup the output committer class to 
`ParquetOutputCommitter`, which is
+    // bundled with `ParquetOutputFormat[Row]`.
+    job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
+
+    // TODO There's no need to use two kinds of WriteSupport
+    // We should unify them. `SpecificMutableRow` can process both atomic 
(primitive) types and
+    // complex types.
+    val writeSupportClass =
+      if 
(dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
+        classOf[MutableRowWriteSupport]
+      } else {
+        classOf[RowWriteSupport]
+      }
+
+    ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
+    RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
+
+    // Sets compression scheme
+    conf.set(
+      ParquetOutputFormat.COMPRESSION,
+      ParquetRelation
+        .shortParquetCompressionCodecNames
+        .getOrElse(
+          sqlContext.conf.parquetCompressionCodec.toUpperCase,
+          CompressionCodecName.UNCOMPRESSED).name())
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String, dataSchema: StructType, context: TaskAttemptContext): 
OutputWriter = {
+        new ParquetOutputWriter(path, context)
+      }
+    }
+  }
+
+  override def buildScan(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      inputFiles: Array[FileStatus],
+      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
+    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
+    val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
+    val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
+    val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
+    val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
+
+    // Create the function to set variable Parquet confs at both driver and 
executor side.
+    val initLocalJobFuncOpt =
+      ParquetRelation.initializeLocalJobFunc(
+        requiredColumns,
+        filters,
+        dataSchema,
+        useMetadataCache,
+        parquetFilterPushDown,
+        assumeBinaryIsString,
+        assumeInt96IsTimestamp,
+        followParquetFormatSpec) _
+
+    // Create the function to set input paths at the driver side.
+    val setInputPaths = 
ParquetRelation.initializeDriverSideJobFunc(inputFiles) _
+
+    Utils.withDummyCallSite(sqlContext.sparkContext) {
+      new SqlNewHadoopRDD(
+        sc = sqlContext.sparkContext,
+        broadcastedConf = broadcastedConf,
+        initDriverSideJobFuncOpt = Some(setInputPaths),
+        initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
+        inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
+        keyClass = classOf[Void],
+        valueClass = classOf[InternalRow]) {
+
+        val cacheMetadata = useMetadataCache
+
+        @transient val cachedStatuses = inputFiles.map { f =>
+          // In order to encode the authority of a Path containing special 
characters such as '/'
+          // (which does happen in some S3N credentials), we need to use the 
string returned by the
+          // URI of the path to create a new Path.
+          val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
+          new FileStatus(
+            f.getLen, f.isDir, f.getReplication, f.getBlockSize, 
f.getModificationTime,
+            f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, 
pathWithEscapedAuthority)
+        }.toSeq
+
+        private def escapePathUserInfo(path: Path): Path = {
+          val uri = path.toUri
+          new Path(new URI(
+            uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, 
uri.getPath,
+            uri.getQuery, uri.getFragment))
+        }
+
+        // Overridden so we can inject our own cached files statuses.
+        override def getPartitions: Array[SparkPartition] = {
+          val inputFormat = new ParquetInputFormat[InternalRow] {
+            override def listStatus(jobContext: JobContext): JList[FileStatus] 
= {
+              if (cacheMetadata) cachedStatuses else 
super.listStatus(jobContext)
+            }
+          }
+
+          val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
+          val rawSplits = inputFormat.getSplits(jobContext)
+
+          Array.tabulate[SparkPartition](rawSplits.size) { i =>
+            new SqlNewHadoopPartition(id, i, 
rawSplits(i).asInstanceOf[InputSplit with Writable])
+          }
+        }
+      }.values.asInstanceOf[RDD[Row]]  // type erasure hack to pass 
RDD[InternalRow] as RDD[Row]
+    }
   }
 
-  // TODO: Use data from the footers.
-  override lazy val statistics = Statistics(sizeInBytes = 
sqlContext.conf.defaultSizeInBytes)
+  private class MetadataCache {
+    // `FileStatus` objects of all "_metadata" files.
+    private var metadataStatuses: Array[FileStatus] = _
+
+    // `FileStatus` objects of all "_common_metadata" files.
+    private var commonMetadataStatuses: Array[FileStatus] = _
+
+    // `FileStatus` objects of all data files (Parquet part-files).
+    var dataStatuses: Array[FileStatus] = _
+
+    // Schema of the actual Parquet files, without partition columns 
discovered from partition
+    // directory paths.
+    var dataSchema: StructType = null
+
+    // Schema of the whole table, including partition columns.
+    var schema: StructType = _
+
+    // Cached leaves
+    var cachedLeaves: Set[FileStatus] = null
+
+    /**
+     * Refreshes `FileStatus`es, footers, partition spec, and table schema.
+     */
+    def refresh(): Unit = {
+      val currentLeafStatuses = cachedLeafStatuses()
+
+      // Check if cachedLeafStatuses is changed or not
+      val leafStatusesChanged = (cachedLeaves == null) ||
+        !cachedLeaves.equals(currentLeafStatuses)
+
+      if (leafStatusesChanged) {
+        cachedLeaves = currentLeafStatuses.toIterator.toSet
+
+        // Lists `FileStatus`es of all leaf nodes (files) under all base 
directories.
+        val leaves = currentLeafStatuses.filter { f =>
+          isSummaryFile(f.getPath) ||
+            !(f.getPath.getName.startsWith("_") || 
f.getPath.getName.startsWith("."))
+        }.toArray
+
+        dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
+        metadataStatuses =
+          leaves.filter(_.getPath.getName == 
ParquetFileWriter.PARQUET_METADATA_FILE)
+        commonMetadataStatuses =
+          leaves.filter(_.getPath.getName == 
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
+
+        dataSchema = {
+          val dataSchema0 = maybeDataSchema
+            .orElse(readSchema())
+            .orElse(maybeMetastoreSchema)
+            .getOrElse(throw new AnalysisException(
+              s"Failed to discover schema of Parquet file(s) in the following 
location(s):\n" +
+                paths.mkString("\n\t")))
+
+          // If this Parquet relation is converted from a Hive Metastore 
table, must reconcile case
+          // case insensitivity issue and possible schema mismatch (probably 
caused by schema
+          // evolution).
+          maybeMetastoreSchema
+            .map(ParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
+            .getOrElse(dataSchema0)
+        }
+      }
+    }
+
+    private def isSummaryFile(file: Path): Boolean = {
+      file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
+        file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
+    }
+
+    private def readSchema(): Option[StructType] = {
+      // Sees which file(s) we need to touch in order to figure out the schema.
+      //
+      // Always tries the summary files first if users don't require a merged 
schema.  In this case,
+      // "_common_metadata" is more preferable than "_metadata" because it 
doesn't contain row
+      // groups information, and could be much smaller for large Parquet files 
with lots of row
+      // groups.  If no summary file is available, falls back to some random 
part-file.
+      //
+      // NOTE: Metadata stored in the summary files are merged from all 
part-files.  However, for
+      // user defined key-value metadata (in which we store Spark SQL schema), 
Parquet doesn't know
+      // how to merge them correctly if some key is associated with different 
values in different
+      // part-files.  When this happens, Parquet simply gives up generating 
the summary file.  This
+      // implies that if a summary file presents, then:
+      //
+      //   1. Either all part-files have exactly the same Spark SQL schema, or
+      //   2. Some part-files don't contain Spark SQL schema in the key-value 
metadata at all (thus
+      //      their schemas may differ from each other).
+      //
+      // Here we tend to be pessimistic and take the second case into account. 
 Basically this means
+      // we can't trust the summary files if users require a merged schema, 
and must touch all part-
+      // files to do the merge.
+      val filesToTouch =
+        if (shouldMergeSchemas) {
+          // Also includes summary files, 'cause there might be empty 
partition directories.
+          (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
+        } else {
+          // Tries any "_common_metadata" first. Parquet files written by old 
versions or Parquet
+          // don't have this.
+          commonMetadataStatuses.headOption
+            // Falls back to "_metadata"
+            .orElse(metadataStatuses.headOption)
+            // Summary file(s) not found, the Parquet file is either 
corrupted, or different part-
+            // files contain conflicting user defined metadata (two or more 
values are associated
+            // with a same key in different files).  In either case, we fall 
back to any of the
+            // first part-file, and just assume all schemas are consistent.
+            .orElse(dataStatuses.headOption)
+            .toSeq
+        }
+
+      assert(
+        filesToTouch.nonEmpty || maybeDataSchema.isDefined || 
maybeMetastoreSchema.isDefined,
+        "No predefined schema found, " +
+          s"and no Parquet data files or summary files found under 
${paths.mkString(", ")}.")
+
+      ParquetRelation.mergeSchemasInParallel(filesToTouch, sqlContext)
+    }
+  }
 }
 
-private[sql] object ParquetRelation {
+private[sql] object ParquetRelation extends Logging {
+  // Whether we should merge schemas collected from all Parquet part-files.
+  private[sql] val MERGE_SCHEMA = "mergeSchema"
+
+  // Hive Metastore schema, used when converting Metastore Parquet tables.  
This option is only used
+  // internally.
+  private[sql] val METASTORE_SCHEMA = "metastoreSchema"
+
+  /** This closure sets various Parquet configurations at both driver side and 
executor side. */
+  private[parquet] def initializeLocalJobFunc(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      dataSchema: StructType,
+      useMetadataCache: Boolean,
+      parquetFilterPushDown: Boolean,
+      assumeBinaryIsString: Boolean,
+      assumeInt96IsTimestamp: Boolean,
+      followParquetFormatSpec: Boolean)(job: Job): Unit = {
+    val conf = job.getConfiguration
+    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[CatalystReadSupport].getName)
+
+    // Try to push down filters when filter push-down is enabled.
+    if (parquetFilterPushDown) {
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+        // is used here.
+        .flatMap(ParquetFilters.createFilter(dataSchema, _))
+        .reduceOption(FilterApi.and)
+        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
+    }
+
+    conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
+      CatalystSchemaConverter.checkFieldNames(requestedSchema).json
+    })
+
+    conf.set(
+      RowWriteSupport.SPARK_ROW_SCHEMA,
+      CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+
+    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
+    conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
+
+    // Sets flags for Parquet schema conversion
+    conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
+    conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
assumeInt96IsTimestamp)
+    conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, 
followParquetFormatSpec)
+  }
+
+  /** This closure sets input paths at the driver side. */
+  private[parquet] def initializeDriverSideJobFunc(
+      inputFiles: Array[FileStatus])(job: Job): Unit = {
+    // We side the input paths at the driver side.
+    logInfo(s"Reading Parquet file(s) from 
${inputFiles.map(_.getPath).mkString(", ")}")
+    if (inputFiles.nonEmpty) {
+      FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
+    }
+  }
+
+  private[parquet] def readSchema(
+      footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
+
+    def parseParquetSchema(schema: MessageType): StructType = {
+      val converter = new CatalystSchemaConverter(
+        sqlContext.conf.isParquetBinaryAsString,
+        sqlContext.conf.isParquetBinaryAsString,
+        sqlContext.conf.followParquetFormatSpec)
+
+      converter.convert(schema)
+    }
+
+    val seen = mutable.HashSet[String]()
+    val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
+      val metadata = footer.getParquetMetadata.getFileMetaData
+      val serializedSchema = metadata
+        .getKeyValueMetaData
+        .toMap
+        .get(CatalystReadSupport.SPARK_METADATA_KEY)
+      if (serializedSchema.isEmpty) {
+        // Falls back to Parquet schema if no Spark SQL schema found.
+        Some(parseParquetSchema(metadata.getSchema))
+      } else if (!seen.contains(serializedSchema.get)) {
+        seen += serializedSchema.get
+
+        // Don't throw even if we failed to parse the serialized Spark schema. 
Just fallback to
+        // whatever is available.
+        Some(Try(DataType.fromJson(serializedSchema.get))
+          .recover { case _: Throwable =>
+            logInfo(
+              s"Serialized Spark schema in Parquet key-value metadata is not 
in JSON format, " +
+                "falling back to the deprecated DataType.fromCaseClassString 
parser.")
+            DataType.fromCaseClassString(serializedSchema.get)
+          }
+          .recover { case cause: Throwable =>
+            logWarning(
+              s"""Failed to parse serialized Spark schema in Parquet key-value 
metadata:
+                 |\t$serializedSchema
+               """.stripMargin,
+              cause)
+          }
+          .map(_.asInstanceOf[StructType])
+          .getOrElse {
+            // Falls back to Parquet schema if Spark SQL schema can't be 
parsed.
+            parseParquetSchema(metadata.getSchema)
+          })
+      } else {
+        None
+      }
+    }
+
+    finalSchemas.reduceOption { (left, right) =>
+      try left.merge(right) catch { case e: Throwable =>
+        throw new SparkException(s"Failed to merge incompatible schemas $left 
and $right", e)
+      }
+    }
+  }
+
+  /**
+   * Reconciles Hive Metastore case insensitivity issue and data type 
conflicts between Metastore
+   * schema and Parquet schema.
+   *
+   * Hive doesn't retain case information, while Parquet is case sensitive. On 
the other hand, the
+   * schema read from Parquet files may be incomplete (e.g. older versions of 
Parquet doesn't
+   * distinguish binary and string).  This method generates a correct schema 
by merging Metastore
+   * schema data types and Parquet schema field names.
+   */
+  private[parquet] def mergeMetastoreParquetSchema(
+      metastoreSchema: StructType,
+      parquetSchema: StructType): StructType = {
+    def schemaConflictMessage: String =
+      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. 
Metastore schema:
+         |${metastoreSchema.prettyJson}
+         |
+         |Parquet schema:
+         |${parquetSchema.prettyJson}
+       """.stripMargin
+
+    val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, 
parquetSchema)
+
+    assert(metastoreSchema.size <= mergedParquetSchema.size, 
schemaConflictMessage)
+
+    val ordinalMap = metastoreSchema.zipWithIndex.map {
+      case (field, index) => field.name.toLowerCase -> index
+    }.toMap
+
+    val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
+      ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
+
+    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
+      // Uses Parquet field names but retains Metastore data types.
+      case (mSchema, pSchema) if mSchema.name.toLowerCase == 
pSchema.name.toLowerCase =>
+        mSchema.copy(name = pSchema.name)
+      case _ =>
+        throw new SparkException(schemaConflictMessage)
+    })
+  }
+
+  /**
+   * Returns the original schema from the Parquet file with any missing 
nullable fields from the
+   * Hive Metastore schema merged in.
+   *
+   * When constructing a DataFrame from a collection of structured data, the 
resulting object has
+   * a schema corresponding to the union of the fields present in each element 
of the collection.
+   * Spark SQL simply assigns a null value to any field that isn't present for 
a particular row.
+   * In some cases, it is possible that a given table partition stored as a 
Parquet file doesn't
+   * contain a particular nullable field in its schema despite that field 
being present in the
+   * table schema obtained from the Hive Metastore. This method returns a 
schema representing the
+   * Parquet file schema along with any additional nullable fields from the 
Metastore schema
+   * merged in.
+   */
+  private[parquet] def mergeMissingNullableFields(
+      metastoreSchema: StructType,
+      parquetSchema: StructType): StructType = {
+    val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
+    val missingFields = metastoreSchema
+      .map(_.name.toLowerCase)
+      .diff(parquetSchema.map(_.name.toLowerCase))
+      .map(fieldMap(_))
+      .filter(_.nullable)
+    StructType(parquetSchema ++ missingFields)
+  }
+
+  /**
+   * Figures out a merged Parquet schema with a distributed Spark job.
+   *
+   * Note that locality is not taken into consideration here because:
+   *
+   *  1. For a single Parquet part-file, in most cases the footer only resides 
in the last block of
+   *     that file.  Thus we only need to retrieve the location of the last 
block.  However, Hadoop
+   *     `FileSystem` only provides API to retrieve locations of all blocks, 
which can be
+   *     potentially expensive.
+   *
+   *  2. This optimization is mainly useful for S3, where file metadata 
operations can be pretty
+   *     slow.  And basically locality is not available when using S3 (you 
can't run computation on
+   *     S3 nodes).
+   */
+  def mergeSchemasInParallel(
+      filesToTouch: Seq[FileStatus], sqlContext: SQLContext): 
Option[StructType] = {
+    val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
+    val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
+    val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
+    val serializedConf = new 
SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
+
+    // HACK ALERT:
+    //
+    // Parquet requires `FileStatus`es to read footers.  Here we try to send 
cached `FileStatus`es
+    // to executor side to avoid fetching them again.  However, `FileStatus` 
is not `Serializable`
+    // but only `Writable`.  What makes it worth, for some reason, 
`FileStatus` doesn't play well
+    // with `SerializableWritable[T]` and always causes a weird 
`IllegalStateException`.  These
+    // facts virtually prevents us to serialize `FileStatus`es.
+    //
+    // Since Parquet only relies on path and length information of those 
`FileStatus`es to read
+    // footers, here we just extract them (which can be easily serialized), 
send them to executor
+    // side, and resemble fake `FileStatus`es there.
+    val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, 
f.getLen))
+
+    // Issues a Spark job to read Parquet schema in parallel.
+    val partiallyMergedSchemas =
+      sqlContext
+        .sparkContext
+        .parallelize(partialFileStatusInfo)
+        .mapPartitions { iterator =>
+          // Resembles fake `FileStatus`es with serialized path and length 
information.
+          val fakeFileStatuses = iterator.map { case (path, length) =>
+            new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new 
Path(path))
+          }.toSeq
+
+          // Skips row group information since we only need the schema
+          val skipRowGroups = true
+
+          // Reads footers in multi-threaded manner within each task
+          val footers =
+            ParquetFileReader.readAllFootersInParallel(
+              serializedConf.value, fakeFileStatuses, skipRowGroups)
+
+          // Converter used to convert Parquet `MessageType` to Spark SQL 
`StructType`
+          val converter =
+            new CatalystSchemaConverter(
+              assumeBinaryIsString = assumeBinaryIsString,
+              assumeInt96IsTimestamp = assumeInt96IsTimestamp,
+              followParquetFormatSpec = followParquetFormatSpec)
+
+          footers.map { footer =>
+            ParquetRelation.readSchemaFromFooter(footer, converter)
+          }.reduceOption(_ merge _).iterator
+        }.collect()
+
+    partiallyMergedSchemas.reduceOption(_ merge _)
+  }
+
+  /**
+   * Reads Spark SQL schema from a Parquet footer.  If a valid serialized 
Spark SQL schema string
+   * can be found in the file metadata, returns the deserialized 
[[StructType]], otherwise, returns
+   * a [[StructType]] converted from the [[MessageType]] stored in this footer.
+   */
+  def readSchemaFromFooter(
+      footer: Footer, converter: CatalystSchemaConverter): StructType = {
+    val fileMetaData = footer.getParquetMetadata.getFileMetaData
+    fileMetaData
+      .getKeyValueMetaData
+      .toMap
+      .get(CatalystReadSupport.SPARK_METADATA_KEY)
+      .flatMap(deserializeSchemaString)
+      .getOrElse(converter.convert(fileMetaData.getSchema))
+  }
+
+  private def deserializeSchemaString(schemaString: String): 
Option[StructType] = {
+    // Tries to deserialize the schema string as JSON first, then falls back 
to the case class
+    // string parser (data generated by older versions of Spark SQL uses this 
format).
+    Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
+      case _: Throwable =>
+        logInfo(
+          s"Serialized Spark schema in Parquet key-value metadata is not in 
JSON format, " +
+            "falling back to the deprecated DataType.fromCaseClassString 
parser.")
+        DataType.fromCaseClassString(schemaString).asInstanceOf[StructType]
+    }.recoverWith {
+      case cause: Throwable =>
+        logWarning(
+          "Failed to parse and ignored serialized Spark schema in " +
+            s"Parquet key-value metadata:\n\t$schemaString", cause)
+        Failure(cause)
+    }.toOption
+  }
 
   def enableLogForwarding() {
     // Note: the org.apache.parquet.Log class has a static initializer that
@@ -127,12 +766,6 @@ private[sql] object ParquetRelation {
     
JLogger.getLogger(classOf[ParquetRecordReader[_]].getName).setLevel(Level.OFF)
   }
 
-  // The element type for the RDDs that this relation maps to.
-  type RowType = org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-
-  // The compression type
-  type CompressionType = 
org.apache.parquet.hadoop.metadata.CompressionCodecName
-
   // The parquet compression short names
   val shortParquetCompressionCodecNames = Map(
     "NONE"         -> CompressionCodecName.UNCOMPRESSED,
@@ -140,82 +773,4 @@ private[sql] object ParquetRelation {
     "SNAPPY"       -> CompressionCodecName.SNAPPY,
     "GZIP"         -> CompressionCodecName.GZIP,
     "LZO"          -> CompressionCodecName.LZO)
-
-  /**
-   * Creates a new ParquetRelation and underlying Parquetfile for the given 
LogicalPlan. Note that
-   * this is used inside [[org.apache.spark.sql.execution.SparkStrategies 
SparkStrategies]] to
-   * create a resolved relation as a data sink for writing to a Parquetfile. 
The relation is empty
-   * but is initialized with ParquetMetadata and can be inserted into.
-   *
-   * @param pathString The directory the Parquetfile will be stored in.
-   * @param child The child node that will be used for extracting the schema.
-   * @param conf A configuration to be used.
-   * @return An empty ParquetRelation with inferred metadata.
-   */
-  def create(pathString: String,
-             child: LogicalPlan,
-             conf: Configuration,
-             sqlContext: SQLContext): ParquetRelation = {
-    if (!child.resolved) {
-      throw new UnresolvedException[LogicalPlan](
-        child,
-        "Attempt to create Parquet table from unresolved child (when schema is 
not available)")
-    }
-    createEmpty(pathString, child.output, false, conf, sqlContext)
-  }
-
-  /**
-   * Creates an empty ParquetRelation and underlying Parquetfile that only
-   * consists of the Metadata for the given schema.
-   *
-   * @param pathString The directory the Parquetfile will be stored in.
-   * @param attributes The schema of the relation.
-   * @param conf A configuration to be used.
-   * @return An empty ParquetRelation.
-   */
-  def createEmpty(pathString: String,
-                  attributes: Seq[Attribute],
-                  allowExisting: Boolean,
-                  conf: Configuration,
-                  sqlContext: SQLContext): ParquetRelation = {
-    val path = checkPath(pathString, allowExisting, conf)
-    conf.set(ParquetOutputFormat.COMPRESSION, 
shortParquetCompressionCodecNames.getOrElse(
-      sqlContext.conf.parquetCompressionCodec.toUpperCase, 
CompressionCodecName.UNCOMPRESSED)
-      .name())
-    ParquetRelation.enableLogForwarding()
-    // This is a hack. We always set nullable/containsNull/valueContainsNull 
to true
-    // for the schema of a parquet data.
-    val schema = StructType.fromAttributes(attributes).asNullable
-    val newAttributes = schema.toAttributes
-    ParquetTypesConverter.writeMetaData(newAttributes, path, conf)
-    new ParquetRelation(path.toString, Some(conf), sqlContext) {
-      override val output = newAttributes
-    }
-  }
-
-  private def checkPath(pathStr: String, allowExisting: Boolean, conf: 
Configuration): Path = {
-    if (pathStr == null) {
-      throw new IllegalArgumentException("Unable to create ParquetRelation: 
path is null")
-    }
-    val origPath = new Path(pathStr)
-    val fs = origPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException(
-        s"Unable to create ParquetRelation: incorrectly formatted path 
$pathStr")
-    }
-    val path = origPath.makeQualified(fs)
-    if (!allowExisting && fs.exists(path)) {
-      sys.error(s"File $pathStr already exists.")
-    }
-
-    if (fs.exists(path) &&
-        !fs.getFileStatus(path)
-        .getPermission
-        .getUserAction
-        .implies(FsAction.READ_WRITE)) {
-      throw new IOException(
-        s"Unable to create ParquetRelation: path $path not read-writable")
-    }
-    path
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
deleted file mode 100644
index 75cbbde..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.io.IOException
-import java.text.{NumberFormat, SimpleDateFormat}
-import java.util.concurrent.TimeUnit
-import java.util.Date
-
-import scala.collection.JavaConversions._
-import scala.util.Try
-
-import com.google.common.cache.CacheBuilder
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
NewFileInputFormat}
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, 
FileOutputFormat => NewFileOutputFormat}
-import org.apache.parquet.hadoop._
-import org.apache.parquet.hadoop.api.ReadSupport
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLConf
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, _}
-import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.{Logging, TaskContext}
-import org.apache.spark.util.SerializableConfiguration
-
-/**
- * :: DeveloperApi ::
- * Parquet table scan operator. Imports the file that backs the given
- * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[InternalRow]``.
- */
-private[sql] case class ParquetTableScan(
-    attributes: Seq[Attribute],
-    relation: ParquetRelation,
-    columnPruningPred: Seq[Expression])
-  extends LeafNode {
-
-  // The resolution of Parquet attributes is case sensitive, so we resolve the 
original attributes
-  // by exprId. note: output cannot be transient, see
-  // https://issues.apache.org/jira/browse/SPARK-1367
-  val output = attributes.map(relation.attributeMap)
-
-  // A mapping of ordinals partitionRow -> finalOutput.
-  val requestedPartitionOrdinals = {
-    val partitionAttributeOrdinals = 
AttributeMap(relation.partitioningAttributes.zipWithIndex)
-
-    attributes.zipWithIndex.flatMap {
-      case (attribute, finalOrdinal) =>
-        partitionAttributeOrdinals.get(attribute).map(_ -> finalOrdinal)
-    }
-  }.toArray
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat
-
-    val sc = sqlContext.sparkContext
-    val job = new Job(sc.hadoopConfiguration)
-    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
-
-    val conf: Configuration = ContextUtil.getConfiguration(job)
-
-    relation.path.split(",").foreach { curPath =>
-      val qualifiedPath = {
-        val path = new Path(curPath)
-        path.getFileSystem(conf).makeQualified(path)
-      }
-      NewFileInputFormat.addInputPath(job, qualifiedPath)
-    }
-
-    // Store both requested and original schema in `Configuration`
-    conf.set(
-      RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-      ParquetTypesConverter.convertToString(output))
-    conf.set(
-      RowWriteSupport.SPARK_ROW_SCHEMA,
-      ParquetTypesConverter.convertToString(relation.output))
-
-    // Store record filtering predicate in `Configuration`
-    // Note 1: the input format ignores all predicates that cannot be expressed
-    // as simple column predicate filters in Parquet. Here we just record
-    // the whole pruning predicate.
-    ParquetFilters
-      .createRecordFilter(columnPruningPred)
-      .map(_.asInstanceOf[FilterPredicateCompat].getFilterPredicate)
-      // Set this in configuration of ParquetInputFormat, needed for 
RowGroupFiltering
-      .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
-
-    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
-    conf.setBoolean(
-      SQLConf.PARQUET_CACHE_METADATA.key,
-      sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, true))
-
-    // Use task side metadata in parquet
-    conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
-
-    val baseRDD =
-      new org.apache.spark.rdd.NewHadoopRDD(
-        sc,
-        classOf[FilteringParquetRowInputFormat],
-        classOf[Void],
-        classOf[InternalRow],
-        conf)
-
-    if (requestedPartitionOrdinals.nonEmpty) {
-      // This check is based on CatalystConverter.createRootConverter.
-      val primitiveRow = output.forall(a => 
ParquetTypesConverter.isPrimitiveType(a.dataType))
-
-      // Uses temporary variable to avoid the whole `ParquetTableScan` object 
being captured into
-      // the `mapPartitionsWithInputSplit` closure below.
-      val outputSize = output.size
-
-      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
-        val partValue = "([^=]+)=([^=]+)".r
-        val partValues =
-          split.asInstanceOf[org.apache.parquet.hadoop.ParquetInputSplit]
-            .getPath
-            .toString
-            .split("/")
-            .flatMap {
-              case partValue(key, value) => Some(key -> value)
-              case _ => None
-            }.toMap
-
-        // Convert the partitioning attributes into the correct types
-        val partitionRowValues =
-          relation.partitioningAttributes
-            .map(a => Cast(Literal(partValues(a.name)), 
a.dataType).eval(EmptyRow))
-
-        if (primitiveRow) {
-          new Iterator[InternalRow] {
-            def hasNext: Boolean = iter.hasNext
-            def next(): InternalRow = {
-              // We are using CatalystPrimitiveRowConverter and it returns a 
SpecificMutableRow.
-              val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
-
-              // Parquet will leave partitioning columns empty, so we fill 
them in here.
-              var i = 0
-              while (i < requestedPartitionOrdinals.length) {
-                row(requestedPartitionOrdinals(i)._2) =
-                  partitionRowValues(requestedPartitionOrdinals(i)._1)
-                i += 1
-              }
-              row
-            }
-          }
-        } else {
-          // Create a mutable row since we need to fill in values from 
partition columns.
-          val mutableRow = new GenericMutableRow(outputSize)
-          new Iterator[InternalRow] {
-            def hasNext: Boolean = iter.hasNext
-            def next(): InternalRow = {
-              // We are using CatalystGroupConverter and it returns a 
GenericRow.
-              // Since GenericRow is not mutable, we just cast it to a Row.
-              val row = iter.next()._2.asInstanceOf[InternalRow]
-
-              var i = 0
-              while (i < row.numFields) {
-                mutableRow(i) = row.genericGet(i)
-                i += 1
-              }
-              // Parquet will leave partitioning columns empty, so we fill 
them in here.
-              i = 0
-              while (i < requestedPartitionOrdinals.length) {
-                mutableRow(requestedPartitionOrdinals(i)._2) =
-                  partitionRowValues(requestedPartitionOrdinals(i)._1)
-                i += 1
-              }
-              mutableRow
-            }
-          }
-        }
-      }
-    } else {
-      baseRDD.map(_._2)
-    }
-  }
-
-  /**
-   * Applies a (candidate) projection.
-   *
-   * @param prunedAttributes The list of attributes to be used in the 
projection.
-   * @return Pruned TableScan.
-   */
-  def pruneColumns(prunedAttributes: Seq[Attribute]): ParquetTableScan = {
-    val success = validateProjection(prunedAttributes)
-    if (success) {
-      ParquetTableScan(prunedAttributes, relation, columnPruningPred)
-    } else {
-      sys.error("Warning: Could not validate Parquet schema projection in 
pruneColumns")
-    }
-  }
-
-  /**
-   * Evaluates a candidate projection by checking whether the candidate is a 
subtype
-   * of the original type.
-   *
-   * @param projection The candidate projection.
-   * @return True if the projection is valid, false otherwise.
-   */
-  private def validateProjection(projection: Seq[Attribute]): Boolean = {
-    val original: MessageType = relation.parquetSchema
-    val candidate: MessageType = 
ParquetTypesConverter.convertFromAttributes(projection)
-    Try(original.checkContains(candidate)).isSuccess
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * Operator that acts as a sink for queries on RDDs and can be used to
- * store the output inside a directory of Parquet files. This operator
- * is similar to Hive's INSERT INTO TABLE operation in the sense that
- * one can choose to either overwrite or append to a directory. Note
- * that consecutive insertions to the same table must have compatible
- * (source) schemas.
- *
- * WARNING: EXPERIMENTAL! InsertIntoParquetTable with overwrite=false may
- * cause data corruption in the case that multiple users try to append to
- * the same table simultaneously. Inserting into a table that was
- * previously generated by other means (e.g., by creating an HDFS
- * directory and importing Parquet files generated by other tools) may
- * cause unpredicted behaviour and therefore results in a RuntimeException
- * (only detected via filename pattern so will not catch all cases).
- */
-@DeveloperApi
-private[sql] case class InsertIntoParquetTable(
-    relation: ParquetRelation,
-    child: SparkPlan,
-    overwrite: Boolean = false)
-  extends UnaryNode with SparkHadoopMapReduceUtil {
-
-  /**
-   * Inserts all rows into the Parquet file.
-   */
-  protected override def doExecute(): RDD[InternalRow] = {
-    // TODO: currently we do not check whether the "schema"s are compatible
-    // That means if one first creates a table and then INSERTs data with
-    // and incompatible schema the execution will fail. It would be nice
-    // to catch this early one, maybe having the planner validate the schema
-    // before calling execute().
-
-    val childRdd = child.execute()
-    assert(childRdd != null)
-
-    val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
-
-    val writeSupport =
-      if 
(child.output.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
-        log.debug("Initializing MutableRowWriteSupport")
-        classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport]
-      } else {
-        classOf[org.apache.spark.sql.parquet.RowWriteSupport]
-      }
-
-    ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
-
-    val conf = ContextUtil.getConfiguration(job)
-    // This is a hack. We always set nullable/containsNull/valueContainsNull 
to true
-    // for the schema of a parquet data.
-    val schema = StructType.fromAttributes(relation.output).asNullable
-    RowWriteSupport.setSchema(schema.toAttributes, conf)
-
-    val fspath = new Path(relation.path)
-    val fs = fspath.getFileSystem(conf)
-
-    if (overwrite) {
-      try {
-        fs.delete(fspath, true)
-      } catch {
-        case e: IOException =>
-          throw new IOException(
-            s"Unable to clear output directory ${fspath.toString} prior"
-              + s" to InsertIntoParquetTable:\n${e.toString}")
-      }
-    }
-    saveAsHadoopFile(childRdd, relation.path.toString, conf)
-
-    // We return the child RDD to allow chaining (alternatively, one could 
return nothing).
-    childRdd
-  }
-
-  override def output: Seq[Attribute] = child.output
-
-  /**
-   * Stores the given Row RDD as a Hadoop file.
-   *
-   * Note: We cannot use ``saveAsNewAPIHadoopFile`` from 
[[org.apache.spark.rdd.PairRDDFunctions]]
-   * together with [[org.apache.spark.util.MutablePair]] because 
``PairRDDFunctions`` uses
-   * ``Tuple2`` and not ``Product2``. Also, we want to allow appending files 
to an existing
-   * directory and need to determine which was the largest written file index 
before starting to
-   * write.
-   *
-   * @param rdd The [[org.apache.spark.rdd.RDD]] to writer
-   * @param path The directory to write to.
-   * @param conf A [[org.apache.hadoop.conf.Configuration]].
-   */
-  private def saveAsHadoopFile(
-      rdd: RDD[InternalRow],
-      path: String,
-      conf: Configuration) {
-    val job = new Job(conf)
-    val keyType = classOf[Void]
-    job.setOutputKeyClass(keyType)
-    job.setOutputValueClass(classOf[InternalRow])
-    NewFileOutputFormat.setOutputPath(job, new Path(path))
-    val wrappedConf = new SerializableConfiguration(job.getConfiguration)
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    val jobtrackerID = formatter.format(new Date())
-    val stageId = sqlContext.sparkContext.newRddId()
-
-    val taskIdOffset =
-      if (overwrite) {
-        1
-      } else {
-        FileSystemHelper
-          .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, 
job.getConfiguration) + 1
-      }
-
-    def writeShard(context: TaskContext, iter: Iterator[InternalRow]): Int = {
-      /* "reduce task" <split #> <attempt # = spark task #> */
-      val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, 
context.partitionId,
-        context.attemptNumber)
-      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
-      val format = new AppendingParquetOutputFormat(taskIdOffset)
-      val committer = format.getOutputCommitter(hadoopContext)
-      committer.setupTask(hadoopContext)
-      val writer = format.getRecordWriter(hadoopContext)
-      try {
-        while (iter.hasNext) {
-          val row = iter.next()
-          writer.write(null, row)
-        }
-      } finally {
-        writer.close(hadoopContext)
-      }
-      SparkHadoopMapRedUtil.commitTask(committer, hadoopContext, context)
-      1
-    }
-    val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
-    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
-     * however we're only going to use this local OutputCommitter for
-     * setupJob/commitJob, so we just use a dummy "map" task.
-     */
-    val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 
0, 0)
-    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
-    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
-    jobCommitter.setupJob(jobTaskContext)
-    sqlContext.sparkContext.runJob(rdd, writeShard _)
-    jobCommitter.commitJob(jobTaskContext)
-  }
-}
-
-/**
- * TODO: this will be able to append to directories it created itself, not 
necessarily
- * to imported ones.
- */
-private[parquet] class AppendingParquetOutputFormat(offset: Int)
-  extends org.apache.parquet.hadoop.ParquetOutputFormat[InternalRow] {
-  // override to accept existing directories as valid output directory
-  override def checkOutputSpecs(job: JobContext): Unit = {}
-  var committer: OutputCommitter = null
-
-  // override to choose output filename so not overwrite existing ones
-  override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
-    val numfmt = NumberFormat.getInstance()
-    numfmt.setMinimumIntegerDigits(5)
-    numfmt.setGroupingUsed(false)
-
-    val taskId: TaskID = getTaskAttemptID(context).getTaskID
-    val partition: Int = taskId.getId
-    val filename = "part-r-" + numfmt.format(partition + offset) + ".parquet"
-    val committer: FileOutputCommitter =
-      getOutputCommitter(context).asInstanceOf[FileOutputCommitter]
-    new Path(committer.getWorkPath, filename)
-  }
-
-  // The TaskAttemptContext is a class in hadoop-1 but is an interface in 
hadoop-2.
-  // The signatures of the method TaskAttemptContext.getTaskAttemptID for the 
both versions
-  // are the same, so the method calls are source-compatible but NOT 
binary-compatible because
-  // the opcode of method call for class is INVOKEVIRTUAL and for interface is 
INVOKEINTERFACE.
-  private def getTaskAttemptID(context: TaskAttemptContext): TaskAttemptID = {
-    
context.getClass.getMethod("getTaskAttemptID").invoke(context).asInstanceOf[TaskAttemptID]
-  }
-
-  // override to create output committer from configuration
-  override def getOutputCommitter(context: TaskAttemptContext): 
OutputCommitter = {
-    if (committer == null) {
-      val output = getOutputPath(context)
-      val cls = 
context.getConfiguration.getClass("spark.sql.parquet.output.committer.class",
-        classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter])
-      val ctor = cls.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
-      committer = ctor.newInstance(output, 
context).asInstanceOf[ParquetOutputCommitter]
-    }
-    committer
-  }
-
-  // FileOutputFormat.getOutputPath takes JobConf in hadoop-1 but JobContext 
in hadoop-2
-  private def getOutputPath(context: TaskAttemptContext): Path = {
-    context.getConfiguration().get("mapred.output.dir") match {
-      case null => null
-      case name => new Path(name)
-    }
-  }
-}
-
-// TODO Removes this class after removing old Parquet support code
-/**
- * We extend ParquetInputFormat in order to have more control over which
- * RecordFilter we want to use.
- */
-private[parquet] class FilteringParquetRowInputFormat
-  extends org.apache.parquet.hadoop.ParquetInputFormat[InternalRow] with 
Logging {
-
-  override def createRecordReader(
-      inputSplit: InputSplit,
-      taskAttemptContext: TaskAttemptContext): RecordReader[Void, InternalRow] 
= {
-
-    import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter
-
-    val readSupport: ReadSupport[InternalRow] = new RowReadSupport()
-
-    val filter = 
ParquetInputFormat.getFilter(ContextUtil.getConfiguration(taskAttemptContext))
-    if (!filter.isInstanceOf[NoOpFilter]) {
-      new ParquetRecordReader[InternalRow](
-        readSupport,
-        filter)
-    } else {
-      new ParquetRecordReader[InternalRow](readSupport)
-    }
-  }
-
-}
-
-private[parquet] object FileSystemHelper {
-  def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
-    val origPath = new Path(pathStr)
-    val fs = origPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException(
-        s"ParquetTableOperations: Path $origPath is incorrectly formatted")
-    }
-    val path = origPath.makeQualified(fs)
-    if (!fs.exists(path) || !fs.getFileStatus(path).isDir) {
-      throw new IllegalArgumentException(
-        s"ParquetTableOperations: path $path does not exist or is not a 
directory")
-    }
-    fs.globStatus(path)
-      .flatMap { status => if (status.isDir) fs.listStatus(status.getPath) 
else List(status) }
-      .map(_.getPath)
-  }
-
-    /**
-     * Finds the maximum taskid in the output file names at the given path.
-     */
-  def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
-    val files = FileSystemHelper.listFiles(pathStr, conf)
-    // filename pattern is part-r-<int>.parquet
-    val nameP = new scala.util.matching.Regex("""part-.-(\d{1,}).*""", 
"taskid")
-    val hiddenFileP = new scala.util.matching.Regex("_.*")
-    files.map(_.getName).map {
-      case nameP(taskid) => taskid.toInt
-      case hiddenFileP() => 0
-      case other: String =>
-        sys.error("ERROR: attempting to append to set of Parquet files and 
found file" +
-          s"that does not match name pattern: $other")
-      case _ => 0
-    }.reduceOption(_ max _).getOrElse(0)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to