http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 7b6a7f6..fc9f61a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -18,18 +18,13 @@
 package org.apache.spark.sql.parquet
 
 import java.nio.{ByteBuffer, ByteOrder}
-import java.util
 import java.util.{HashMap => JHashMap}
 
-import scala.collection.JavaConversions._
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.column.ParquetProperties
 import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
-import org.apache.parquet.hadoop.api.{InitContext, ReadSupport, WriteSupport}
+import org.apache.parquet.hadoop.api.WriteSupport
 import org.apache.parquet.io.api._
-import org.apache.parquet.schema.MessageType
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.InternalRow
@@ -39,147 +34,6 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
- * A [[RecordMaterializer]] for Catalyst rows.
- *
- * @param parquetSchema Parquet schema of the records to be read
- * @param catalystSchema Catalyst schema of the rows to be constructed
- */
-private[parquet] class RowRecordMaterializer(parquetSchema: MessageType, 
catalystSchema: StructType)
-  extends RecordMaterializer[InternalRow] {
-
-  private val rootConverter = new CatalystRowConverter(parquetSchema, 
catalystSchema, NoopUpdater)
-
-  override def getCurrentRecord: InternalRow = rootConverter.currentRow
-
-  override def getRootConverter: GroupConverter = rootConverter
-}
-
-private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with 
Logging {
-  override def prepareForRead(
-      conf: Configuration,
-      keyValueMetaData: util.Map[String, String],
-      fileSchema: MessageType,
-      readContext: ReadContext): RecordMaterializer[InternalRow] = {
-    log.debug(s"Preparing for read Parquet file with message type: 
$fileSchema")
-
-    val toCatalyst = new CatalystSchemaConverter(conf)
-    val parquetRequestedSchema = readContext.getRequestedSchema
-
-    val catalystRequestedSchema =
-      Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { 
metadata =>
-        metadata
-          // First tries to read requested schema, which may result from 
projections
-          .get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
-          // If not available, tries to read Catalyst schema from file 
metadata.  It's only
-          // available if the target file is written by Spark SQL.
-          .orElse(metadata.get(RowReadSupport.SPARK_METADATA_KEY))
-      }.map(StructType.fromString).getOrElse {
-        logDebug("Catalyst schema not available, falling back to Parquet 
schema")
-        toCatalyst.convert(parquetRequestedSchema)
-      }
-
-    logDebug(s"Catalyst schema used to read Parquet files: 
$catalystRequestedSchema")
-    new RowRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
-  }
-
-  override def init(context: InitContext): ReadContext = {
-    val conf = context.getConfiguration
-
-    // If the target file was written by Spark SQL, we should be able to find 
a serialized Catalyst
-    // schema of this file from its the metadata.
-    val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
-
-    // Optional schema of requested columns, in the form of a string 
serialized from a Catalyst
-    // `StructType` containing all requested columns.
-    val maybeRequestedSchema = 
Option(conf.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
-
-    // Below we construct a Parquet schema containing all requested columns.  
This schema tells
-    // Parquet which columns to read.
-    //
-    // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet 
schema.  Otherwise,
-    // we have to fallback to the full file schema which contains all columns 
in the file.
-    // Obviously this may waste IO bandwidth since it may read more columns 
than requested.
-    //
-    // Two things to note:
-    //
-    // 1. It's possible that some requested columns don't exist in the target 
Parquet file.  For
-    //    example, in the case of schema merging, the globally merged schema 
may contain extra
-    //    columns gathered from other Parquet files.  These columns will be 
simply filled with nulls
-    //    when actually reading the target Parquet file.
-    //
-    // 2. When `maybeRequestedSchema` is available, we can't simply convert 
the Catalyst schema to
-    //    Parquet schema using `CatalystSchemaConverter`, because the mapping 
is not unique due to
-    //    non-standard behaviors of some Parquet libraries/tools.  For 
example, a Parquet file
-    //    containing a single integer array field `f1` may have the following 
legacy 2-level
-    //    structure:
-    //
-    //      message root {
-    //        optional group f1 (LIST) {
-    //          required INT32 element;
-    //        }
-    //      }
-    //
-    //    while `CatalystSchemaConverter` may generate a standard 3-level 
structure:
-    //
-    //      message root {
-    //        optional group f1 (LIST) {
-    //          repeated group list {
-    //            required INT32 element;
-    //          }
-    //        }
-    //      }
-    //
-    //    Apparently, we can't use the 2nd schema to read the target Parquet 
file as they have
-    //    different physical structures.
-    val parquetRequestedSchema =
-      maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
-        val toParquet = new CatalystSchemaConverter(conf)
-        val fileSchema = context.getFileSchema.asGroupType()
-        val fileFieldNames = fileSchema.getFields.map(_.getName).toSet
-
-        StructType
-          // Deserializes the Catalyst schema of requested columns
-          .fromString(schemaString)
-          .map { field =>
-            if (fileFieldNames.contains(field.name)) {
-              // If the field exists in the target Parquet file, extracts the 
field type from the
-              // full file schema and makes a single-field Parquet schema
-              new MessageType("root", fileSchema.getType(field.name))
-            } else {
-              // Otherwise, just resorts to `CatalystSchemaConverter`
-              toParquet.convert(StructType(Array(field)))
-            }
-          }
-          // Merges all single-field Parquet schemas to form a complete schema 
for all requested
-          // columns.  Note that it's possible that no columns are requested 
at all (e.g., count
-          // some partition column of a partitioned Parquet table). That's why 
`fold` is used here
-          // and always fallback to an empty Parquet schema.
-          .fold(new MessageType("root")) {
-            _ union _
-          }
-      }
-
-    val metadata =
-      Map.empty[String, String] ++
-        maybeRequestedSchema.map(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> 
_) ++
-        maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
-
-    logInfo(s"Going to read Parquet file with these requested columns: 
$parquetRequestedSchema")
-    new ReadContext(parquetRequestedSchema, metadata)
-  }
-}
-
-private[parquet] object RowReadSupport {
-  val SPARK_ROW_REQUESTED_SCHEMA = 
"org.apache.spark.sql.parquet.row.requested_schema"
-  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
-
-  private def getRequestedSchema(configuration: Configuration): Seq[Attribute] 
= {
-    val schemaString = 
configuration.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
-    if (schemaString == null) null else 
ParquetTypesConverter.convertFromString(schemaString)
-  }
-}
-
-/**
  * A `parquet.hadoop.api.WriteSupport` for Row objects.
  */
 private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with 
Logging {
@@ -190,7 +44,7 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[InternalRow] with Lo
   override def init(configuration: Configuration): WriteSupport.WriteContext = 
{
     val origAttributesStr: String = 
configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
     val metadata = new JHashMap[String, String]()
-    metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr)
+    metadata.put(CatalystReadSupport.SPARK_METADATA_KEY, origAttributesStr)
 
     if (attributes == null) {
       attributes = 
ParquetTypesConverter.convertFromString(origAttributesStr).toArray
@@ -443,4 +297,3 @@ private[parquet] object RowWriteSupport {
       ParquetProperties.WriterVersion.PARQUET_1_0.toString)
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index e748bd7..3854f5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -53,15 +53,6 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
     length
   }
 
-  def convertToAttributes(
-      parquetSchema: MessageType,
-      isBinaryAsString: Boolean,
-      isInt96AsTimestamp: Boolean): Seq[Attribute] = {
-    val converter = new CatalystSchemaConverter(
-      isBinaryAsString, isInt96AsTimestamp, followParquetFormatSpec = false)
-    converter.convert(parquetSchema).toAttributes
-  }
-
   def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
     val converter = new CatalystSchemaConverter()
     converter.convert(StructType.fromAttributes(attributes))
@@ -103,7 +94,7 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
     }
     val extraMetadata = new java.util.HashMap[String, String]()
     extraMetadata.put(
-      RowReadSupport.SPARK_METADATA_KEY,
+      CatalystReadSupport.SPARK_METADATA_KEY,
       ParquetTypesConverter.convertToString(attributes))
     // TODO: add extra data, e.g., table name, date, etc.?
 
@@ -165,35 +156,4 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
       .getOrElse(
         throw new IllegalArgumentException(s"Could not find Parquet metadata 
at path $path"))
   }
-
-  /**
-   * Reads in Parquet Metadata from the given path and tries to extract the 
schema
-   * (Catalyst attributes) from the application-specific key-value map. If this
-   * is empty it falls back to converting from the Parquet file schema which
-   * may lead to an upcast of types (e.g., {byte, short} to int).
-   *
-   * @param origPath The path at which we expect one (or more) Parquet files.
-   * @param conf The Hadoop configuration to use.
-   * @return A list of attributes that make up the schema.
-   */
-  def readSchemaFromFile(
-      origPath: Path,
-      conf: Option[Configuration],
-      isBinaryAsString: Boolean,
-      isInt96AsTimestamp: Boolean): Seq[Attribute] = {
-    val keyValueMetadata: java.util.Map[String, String] =
-      readMetaData(origPath, conf)
-        .getFileMetaData
-        .getKeyValueMetaData
-    if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) {
-      
convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
-    } else {
-      val attributes = convertToAttributes(
-        readMetaData(origPath, conf).getFileMetaData.getSchema,
-        isBinaryAsString,
-        isInt96AsTimestamp)
-      log.info(s"Falling back to schema conversion from Parquet types; result: 
$attributes")
-      attributes
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
deleted file mode 100644
index 8ec228c..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ /dev/null
@@ -1,732 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.net.URI
-import java.util.{List => JList}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.util.{Failure, Try}
-
-import com.google.common.base.Objects
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.hadoop._
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.RDD._
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.{SqlNewHadoopPartition, SqlNewHadoopRDD}
-import org.apache.spark.sql.execution.datasources.PartitionSpec
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-
-
-private[sql] class DefaultSource extends HadoopFsRelationProvider {
-  override def createRelation(
-      sqlContext: SQLContext,
-      paths: Array[String],
-      schema: Option[StructType],
-      partitionColumns: Option[StructType],
-      parameters: Map[String, String]): HadoopFsRelation = {
-    new ParquetRelation2(paths, schema, None, partitionColumns, 
parameters)(sqlContext)
-  }
-}
-
-// NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
-private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext)
-  extends OutputWriterInternal {
-
-  private val recordWriter: RecordWriter[Void, InternalRow] = {
-    val outputFormat = {
-      new ParquetOutputFormat[InternalRow]() {
-        // Here we override `getDefaultWorkFile` for two reasons:
-        //
-        //  1. To allow appending.  We need to generate unique output file 
names to avoid
-        //     overwriting existing files (either exist before the write job, 
or are just written
-        //     by other tasks within the same write job).
-        //
-        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` 
uses
-        //     `FileOutputCommitter.getWorkPath()`, which points to the base 
directory of all
-        //     partitions in the case of dynamic partitioning.
-        override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
-          val uniqueWriteJobId = 
context.getConfiguration.get("spark.sql.sources.writeJobUUID")
-          val split = context.getTaskAttemptID.getTaskID.getId
-          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
-        }
-      }
-    }
-
-    outputFormat.getRecordWriter(context)
-  }
-
-  override def writeInternal(row: InternalRow): Unit = 
recordWriter.write(null, row)
-
-  override def close(): Unit = recordWriter.close(context)
-}
-
-private[sql] class ParquetRelation2(
-    override val paths: Array[String],
-    private val maybeDataSchema: Option[StructType],
-    // This is for metastore conversion.
-    private val maybePartitionSpec: Option[PartitionSpec],
-    override val userDefinedPartitionColumns: Option[StructType],
-    parameters: Map[String, String])(
-    val sqlContext: SQLContext)
-  extends HadoopFsRelation(maybePartitionSpec)
-  with Logging {
-
-  private[sql] def this(
-      paths: Array[String],
-      maybeDataSchema: Option[StructType],
-      maybePartitionSpec: Option[PartitionSpec],
-      parameters: Map[String, String])(
-      sqlContext: SQLContext) = {
-    this(
-      paths,
-      maybeDataSchema,
-      maybePartitionSpec,
-      maybePartitionSpec.map(_.partitionColumns),
-      parameters)(sqlContext)
-  }
-
-  // Should we merge schemas from all Parquet part-files?
-  private val shouldMergeSchemas =
-    parameters
-      .get(ParquetRelation2.MERGE_SCHEMA)
-      .map(_.toBoolean)
-      
.getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
-
-  private val maybeMetastoreSchema = parameters
-    .get(ParquetRelation2.METASTORE_SCHEMA)
-    .map(DataType.fromJson(_).asInstanceOf[StructType])
-
-  private lazy val metadataCache: MetadataCache = {
-    val meta = new MetadataCache
-    meta.refresh()
-    meta
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: ParquetRelation2 =>
-      val schemaEquality = if (shouldMergeSchemas) {
-        this.shouldMergeSchemas == that.shouldMergeSchemas
-      } else {
-        this.dataSchema == that.dataSchema &&
-          this.schema == that.schema
-      }
-
-      this.paths.toSet == that.paths.toSet &&
-        schemaEquality &&
-        this.maybeDataSchema == that.maybeDataSchema &&
-        this.partitionColumns == that.partitionColumns
-
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    if (shouldMergeSchemas) {
-      Objects.hashCode(
-        Boolean.box(shouldMergeSchemas),
-        paths.toSet,
-        maybeDataSchema,
-        partitionColumns)
-    } else {
-      Objects.hashCode(
-        Boolean.box(shouldMergeSchemas),
-        paths.toSet,
-        dataSchema,
-        schema,
-        maybeDataSchema,
-        partitionColumns)
-    }
-  }
-
-  /** Constraints on schema of dataframe to be stored. */
-  private def checkConstraints(schema: StructType): Unit = {
-    if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
-      val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => "\"" + x + "\""
-      }.mkString(", ")
-      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns 
found, " +
-        s"cannot save to parquet format")
-    }
-  }
-
-  override def dataSchema: StructType = {
-    val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema)
-    // check if schema satisfies the constraints
-    // before moving forward
-    checkConstraints(schema)
-    schema
-  }
-
-  override private[sql] def refresh(): Unit = {
-    super.refresh()
-    metadataCache.refresh()
-  }
-
-  // Parquet data source always uses Catalyst internal representations.
-  override val needConversion: Boolean = false
-
-  override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
-
-  override def prepareJobForWrite(job: Job): OutputWriterFactory = {
-    val conf = ContextUtil.getConfiguration(job)
-
-    val committerClass =
-      conf.getClass(
-        SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
-        classOf[ParquetOutputCommitter],
-        classOf[ParquetOutputCommitter])
-
-    if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
-      logInfo("Using default output committer for Parquet: " +
-        classOf[ParquetOutputCommitter].getCanonicalName)
-    } else {
-      logInfo("Using user defined output committer for Parquet: " + 
committerClass.getCanonicalName)
-    }
-
-    conf.setClass(
-      SQLConf.OUTPUT_COMMITTER_CLASS.key,
-      committerClass,
-      classOf[ParquetOutputCommitter])
-
-    // We're not really using `ParquetOutputFormat[Row]` for writing data 
here, because we override
-    // it in `ParquetOutputWriter` to support appending and dynamic 
partitioning.  The reason why
-    // we set it here is to setup the output committer class to 
`ParquetOutputCommitter`, which is
-    // bundled with `ParquetOutputFormat[Row]`.
-    job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
-
-    // TODO There's no need to use two kinds of WriteSupport
-    // We should unify them. `SpecificMutableRow` can process both atomic 
(primitive) types and
-    // complex types.
-    val writeSupportClass =
-      if 
(dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
-        classOf[MutableRowWriteSupport]
-      } else {
-        classOf[RowWriteSupport]
-      }
-
-    ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
-    RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
-
-    // Sets compression scheme
-    conf.set(
-      ParquetOutputFormat.COMPRESSION,
-      ParquetRelation
-        .shortParquetCompressionCodecNames
-        .getOrElse(
-          sqlContext.conf.parquetCompressionCodec.toUpperCase,
-          CompressionCodecName.UNCOMPRESSED).name())
-
-    new OutputWriterFactory {
-      override def newInstance(
-          path: String, dataSchema: StructType, context: TaskAttemptContext): 
OutputWriter = {
-        new ParquetOutputWriter(path, context)
-      }
-    }
-  }
-
-  override def buildScan(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      inputFiles: Array[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
-    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
-    val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
-    val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
-    val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
-    val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
-
-    // Create the function to set variable Parquet confs at both driver and 
executor side.
-    val initLocalJobFuncOpt =
-      ParquetRelation2.initializeLocalJobFunc(
-        requiredColumns,
-        filters,
-        dataSchema,
-        useMetadataCache,
-        parquetFilterPushDown,
-        assumeBinaryIsString,
-        assumeInt96IsTimestamp,
-        followParquetFormatSpec) _
-
-    // Create the function to set input paths at the driver side.
-    val setInputPaths = 
ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _
-
-    Utils.withDummyCallSite(sqlContext.sparkContext) {
-      new SqlNewHadoopRDD(
-        sc = sqlContext.sparkContext,
-        broadcastedConf = broadcastedConf,
-        initDriverSideJobFuncOpt = Some(setInputPaths),
-        initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
-        inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
-        keyClass = classOf[Void],
-        valueClass = classOf[InternalRow]) {
-
-        val cacheMetadata = useMetadataCache
-
-        @transient val cachedStatuses = inputFiles.map { f =>
-          // In order to encode the authority of a Path containing special 
characters such as '/'
-          // (which does happen in some S3N credentials), we need to use the 
string returned by the
-          // URI of the path to create a new Path.
-          val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
-          new FileStatus(
-            f.getLen, f.isDir, f.getReplication, f.getBlockSize, 
f.getModificationTime,
-            f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, 
pathWithEscapedAuthority)
-        }.toSeq
-
-        private def escapePathUserInfo(path: Path): Path = {
-          val uri = path.toUri
-          new Path(new URI(
-            uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, 
uri.getPath,
-            uri.getQuery, uri.getFragment))
-        }
-
-        // Overridden so we can inject our own cached files statuses.
-        override def getPartitions: Array[SparkPartition] = {
-          val inputFormat = new ParquetInputFormat[InternalRow] {
-            override def listStatus(jobContext: JobContext): JList[FileStatus] 
= {
-              if (cacheMetadata) cachedStatuses else 
super.listStatus(jobContext)
-            }
-          }
-
-          val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
-          val rawSplits = inputFormat.getSplits(jobContext)
-
-          Array.tabulate[SparkPartition](rawSplits.size) { i =>
-            new SqlNewHadoopPartition(id, i, 
rawSplits(i).asInstanceOf[InputSplit with Writable])
-          }
-        }
-      }.values.asInstanceOf[RDD[Row]]  // type erasure hack to pass 
RDD[InternalRow] as RDD[Row]
-    }
-  }
-
-  private class MetadataCache {
-    // `FileStatus` objects of all "_metadata" files.
-    private var metadataStatuses: Array[FileStatus] = _
-
-    // `FileStatus` objects of all "_common_metadata" files.
-    private var commonMetadataStatuses: Array[FileStatus] = _
-
-    // `FileStatus` objects of all data files (Parquet part-files).
-    var dataStatuses: Array[FileStatus] = _
-
-    // Schema of the actual Parquet files, without partition columns 
discovered from partition
-    // directory paths.
-    var dataSchema: StructType = null
-
-    // Schema of the whole table, including partition columns.
-    var schema: StructType = _
-
-    // Cached leaves
-    var cachedLeaves: Set[FileStatus] = null
-
-    /**
-     * Refreshes `FileStatus`es, footers, partition spec, and table schema.
-     */
-    def refresh(): Unit = {
-      val currentLeafStatuses = cachedLeafStatuses()
-
-      // Check if cachedLeafStatuses is changed or not
-      val leafStatusesChanged = (cachedLeaves == null) ||
-        !cachedLeaves.equals(currentLeafStatuses)
-
-      if (leafStatusesChanged) {
-        cachedLeaves = currentLeafStatuses.toIterator.toSet
-
-        // Lists `FileStatus`es of all leaf nodes (files) under all base 
directories.
-        val leaves = currentLeafStatuses.filter { f =>
-          isSummaryFile(f.getPath) ||
-            !(f.getPath.getName.startsWith("_") || 
f.getPath.getName.startsWith("."))
-        }.toArray
-
-        dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
-        metadataStatuses =
-          leaves.filter(_.getPath.getName == 
ParquetFileWriter.PARQUET_METADATA_FILE)
-        commonMetadataStatuses =
-          leaves.filter(_.getPath.getName == 
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
-
-        dataSchema = {
-          val dataSchema0 = maybeDataSchema
-            .orElse(readSchema())
-            .orElse(maybeMetastoreSchema)
-            .getOrElse(throw new AnalysisException(
-              s"Failed to discover schema of Parquet file(s) in the following 
location(s):\n" +
-                paths.mkString("\n\t")))
-
-          // If this Parquet relation is converted from a Hive Metastore 
table, must reconcile case
-          // case insensitivity issue and possible schema mismatch (probably 
caused by schema
-          // evolution).
-          maybeMetastoreSchema
-            .map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
-            .getOrElse(dataSchema0)
-        }
-      }
-    }
-
-    private def isSummaryFile(file: Path): Boolean = {
-      file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
-        file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
-    }
-
-    private def readSchema(): Option[StructType] = {
-      // Sees which file(s) we need to touch in order to figure out the schema.
-      //
-      // Always tries the summary files first if users don't require a merged 
schema.  In this case,
-      // "_common_metadata" is more preferable than "_metadata" because it 
doesn't contain row
-      // groups information, and could be much smaller for large Parquet files 
with lots of row
-      // groups.  If no summary file is available, falls back to some random 
part-file.
-      //
-      // NOTE: Metadata stored in the summary files are merged from all 
part-files.  However, for
-      // user defined key-value metadata (in which we store Spark SQL schema), 
Parquet doesn't know
-      // how to merge them correctly if some key is associated with different 
values in different
-      // part-files.  When this happens, Parquet simply gives up generating 
the summary file.  This
-      // implies that if a summary file presents, then:
-      //
-      //   1. Either all part-files have exactly the same Spark SQL schema, or
-      //   2. Some part-files don't contain Spark SQL schema in the key-value 
metadata at all (thus
-      //      their schemas may differ from each other).
-      //
-      // Here we tend to be pessimistic and take the second case into account. 
 Basically this means
-      // we can't trust the summary files if users require a merged schema, 
and must touch all part-
-      // files to do the merge.
-      val filesToTouch =
-        if (shouldMergeSchemas) {
-          // Also includes summary files, 'cause there might be empty 
partition directories.
-          (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
-        } else {
-          // Tries any "_common_metadata" first. Parquet files written by old 
versions or Parquet
-          // don't have this.
-          commonMetadataStatuses.headOption
-            // Falls back to "_metadata"
-            .orElse(metadataStatuses.headOption)
-            // Summary file(s) not found, the Parquet file is either 
corrupted, or different part-
-            // files contain conflicting user defined metadata (two or more 
values are associated
-            // with a same key in different files).  In either case, we fall 
back to any of the
-            // first part-file, and just assume all schemas are consistent.
-            .orElse(dataStatuses.headOption)
-            .toSeq
-        }
-
-      assert(
-        filesToTouch.nonEmpty || maybeDataSchema.isDefined || 
maybeMetastoreSchema.isDefined,
-        "No predefined schema found, " +
-          s"and no Parquet data files or summary files found under 
${paths.mkString(", ")}.")
-
-      ParquetRelation2.mergeSchemasInParallel(filesToTouch, sqlContext)
-    }
-  }
-}
-
-private[sql] object ParquetRelation2 extends Logging {
-  // Whether we should merge schemas collected from all Parquet part-files.
-  private[sql] val MERGE_SCHEMA = "mergeSchema"
-
-  // Hive Metastore schema, used when converting Metastore Parquet tables.  
This option is only used
-  // internally.
-  private[sql] val METASTORE_SCHEMA = "metastoreSchema"
-
-  /** This closure sets various Parquet configurations at both driver side and 
executor side. */
-  private[parquet] def initializeLocalJobFunc(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      dataSchema: StructType,
-      useMetadataCache: Boolean,
-      parquetFilterPushDown: Boolean,
-      assumeBinaryIsString: Boolean,
-      assumeInt96IsTimestamp: Boolean,
-      followParquetFormatSpec: Boolean)(job: Job): Unit = {
-    val conf = job.getConfiguration
-    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[RowReadSupport].getName)
-
-    // Try to push down filters when filter push-down is enabled.
-    if (parquetFilterPushDown) {
-      filters
-        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
-        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
-        // is used here.
-        .flatMap(ParquetFilters.createFilter(dataSchema, _))
-        .reduceOption(FilterApi.and)
-        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
-    }
-
-    conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
-      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
-      ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
-    })
-
-    conf.set(
-      RowWriteSupport.SPARK_ROW_SCHEMA,
-      ParquetTypesConverter.convertToString(dataSchema.toAttributes))
-
-    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
-    conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
-
-    // Sets flags for Parquet schema conversion
-    conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
-    conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
assumeInt96IsTimestamp)
-    conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, 
followParquetFormatSpec)
-  }
-
-  /** This closure sets input paths at the driver side. */
-  private[parquet] def initializeDriverSideJobFunc(
-      inputFiles: Array[FileStatus])(job: Job): Unit = {
-    // We side the input paths at the driver side.
-    logInfo(s"Reading Parquet file(s) from 
${inputFiles.map(_.getPath).mkString(", ")}")
-    if (inputFiles.nonEmpty) {
-      FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
-    }
-  }
-
-  private[parquet] def readSchema(
-      footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
-
-    def parseParquetSchema(schema: MessageType): StructType = {
-      StructType.fromAttributes(
-        // TODO Really no need to use `Attribute` here, we only need to know 
the data type.
-        ParquetTypesConverter.convertToAttributes(
-          schema,
-          sqlContext.conf.isParquetBinaryAsString,
-          sqlContext.conf.isParquetINT96AsTimestamp))
-    }
-
-    val seen = mutable.HashSet[String]()
-    val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
-      val metadata = footer.getParquetMetadata.getFileMetaData
-      val serializedSchema = metadata
-        .getKeyValueMetaData
-        .toMap
-        .get(RowReadSupport.SPARK_METADATA_KEY)
-      if (serializedSchema.isEmpty) {
-        // Falls back to Parquet schema if no Spark SQL schema found.
-        Some(parseParquetSchema(metadata.getSchema))
-      } else if (!seen.contains(serializedSchema.get)) {
-        seen += serializedSchema.get
-
-        // Don't throw even if we failed to parse the serialized Spark schema. 
Just fallback to
-        // whatever is available.
-        Some(Try(DataType.fromJson(serializedSchema.get))
-          .recover { case _: Throwable =>
-            logInfo(
-              s"Serialized Spark schema in Parquet key-value metadata is not 
in JSON format, " +
-                "falling back to the deprecated DataType.fromCaseClassString 
parser.")
-            DataType.fromCaseClassString(serializedSchema.get)
-          }
-          .recover { case cause: Throwable =>
-            logWarning(
-              s"""Failed to parse serialized Spark schema in Parquet key-value 
metadata:
-                 |\t$serializedSchema
-               """.stripMargin,
-              cause)
-          }
-          .map(_.asInstanceOf[StructType])
-          .getOrElse {
-            // Falls back to Parquet schema if Spark SQL schema can't be 
parsed.
-            parseParquetSchema(metadata.getSchema)
-          })
-      } else {
-        None
-      }
-    }
-
-    finalSchemas.reduceOption { (left, right) =>
-      try left.merge(right) catch { case e: Throwable =>
-        throw new SparkException(s"Failed to merge incompatible schemas $left 
and $right", e)
-      }
-    }
-  }
-
-  /**
-   * Reconciles Hive Metastore case insensitivity issue and data type 
conflicts between Metastore
-   * schema and Parquet schema.
-   *
-   * Hive doesn't retain case information, while Parquet is case sensitive. On 
the other hand, the
-   * schema read from Parquet files may be incomplete (e.g. older versions of 
Parquet doesn't
-   * distinguish binary and string).  This method generates a correct schema 
by merging Metastore
-   * schema data types and Parquet schema field names.
-   */
-  private[parquet] def mergeMetastoreParquetSchema(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    def schemaConflictMessage: String =
-      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. 
Metastore schema:
-         |${metastoreSchema.prettyJson}
-         |
-         |Parquet schema:
-         |${parquetSchema.prettyJson}
-       """.stripMargin
-
-    val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, 
parquetSchema)
-
-    assert(metastoreSchema.size <= mergedParquetSchema.size, 
schemaConflictMessage)
-
-    val ordinalMap = metastoreSchema.zipWithIndex.map {
-      case (field, index) => field.name.toLowerCase -> index
-    }.toMap
-
-    val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
-      ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
-    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
-      // Uses Parquet field names but retains Metastore data types.
-      case (mSchema, pSchema) if mSchema.name.toLowerCase == 
pSchema.name.toLowerCase =>
-        mSchema.copy(name = pSchema.name)
-      case _ =>
-        throw new SparkException(schemaConflictMessage)
-    })
-  }
-
-  /**
-   * Returns the original schema from the Parquet file with any missing 
nullable fields from the
-   * Hive Metastore schema merged in.
-   *
-   * When constructing a DataFrame from a collection of structured data, the 
resulting object has
-   * a schema corresponding to the union of the fields present in each element 
of the collection.
-   * Spark SQL simply assigns a null value to any field that isn't present for 
a particular row.
-   * In some cases, it is possible that a given table partition stored as a 
Parquet file doesn't
-   * contain a particular nullable field in its schema despite that field 
being present in the
-   * table schema obtained from the Hive Metastore. This method returns a 
schema representing the
-   * Parquet file schema along with any additional nullable fields from the 
Metastore schema
-   * merged in.
-   */
-  private[parquet] def mergeMissingNullableFields(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
-    val missingFields = metastoreSchema
-      .map(_.name.toLowerCase)
-      .diff(parquetSchema.map(_.name.toLowerCase))
-      .map(fieldMap(_))
-      .filter(_.nullable)
-    StructType(parquetSchema ++ missingFields)
-  }
-
-  /**
-   * Figures out a merged Parquet schema with a distributed Spark job.
-   *
-   * Note that locality is not taken into consideration here because:
-   *
-   *  1. For a single Parquet part-file, in most cases the footer only resides 
in the last block of
-   *     that file.  Thus we only need to retrieve the location of the last 
block.  However, Hadoop
-   *     `FileSystem` only provides API to retrieve locations of all blocks, 
which can be
-   *     potentially expensive.
-   *
-   *  2. This optimization is mainly useful for S3, where file metadata 
operations can be pretty
-   *     slow.  And basically locality is not available when using S3 (you 
can't run computation on
-   *     S3 nodes).
-   */
-  def mergeSchemasInParallel(
-      filesToTouch: Seq[FileStatus], sqlContext: SQLContext): 
Option[StructType] = {
-    val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
-    val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
-    val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
-    val serializedConf = new 
SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
-
-    // HACK ALERT:
-    //
-    // Parquet requires `FileStatus`es to read footers.  Here we try to send 
cached `FileStatus`es
-    // to executor side to avoid fetching them again.  However, `FileStatus` 
is not `Serializable`
-    // but only `Writable`.  What makes it worth, for some reason, 
`FileStatus` doesn't play well
-    // with `SerializableWritable[T]` and always causes a weird 
`IllegalStateException`.  These
-    // facts virtually prevents us to serialize `FileStatus`es.
-    //
-    // Since Parquet only relies on path and length information of those 
`FileStatus`es to read
-    // footers, here we just extract them (which can be easily serialized), 
send them to executor
-    // side, and resemble fake `FileStatus`es there.
-    val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, 
f.getLen))
-
-    // Issues a Spark job to read Parquet schema in parallel.
-    val partiallyMergedSchemas =
-      sqlContext
-        .sparkContext
-        .parallelize(partialFileStatusInfo)
-        .mapPartitions { iterator =>
-          // Resembles fake `FileStatus`es with serialized path and length 
information.
-          val fakeFileStatuses = iterator.map { case (path, length) =>
-            new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new 
Path(path))
-          }.toSeq
-
-          // Skips row group information since we only need the schema
-          val skipRowGroups = true
-
-          // Reads footers in multi-threaded manner within each task
-          val footers =
-            ParquetFileReader.readAllFootersInParallel(
-              serializedConf.value, fakeFileStatuses, skipRowGroups)
-
-          // Converter used to convert Parquet `MessageType` to Spark SQL 
`StructType`
-          val converter =
-            new CatalystSchemaConverter(
-              assumeBinaryIsString = assumeBinaryIsString,
-              assumeInt96IsTimestamp = assumeInt96IsTimestamp,
-              followParquetFormatSpec = followParquetFormatSpec)
-
-          footers.map { footer =>
-            ParquetRelation2.readSchemaFromFooter(footer, converter)
-          }.reduceOption(_ merge _).iterator
-        }.collect()
-
-    partiallyMergedSchemas.reduceOption(_ merge _)
-  }
-
-  /**
-   * Reads Spark SQL schema from a Parquet footer.  If a valid serialized 
Spark SQL schema string
-   * can be found in the file metadata, returns the deserialized 
[[StructType]], otherwise, returns
-   * a [[StructType]] converted from the [[MessageType]] stored in this footer.
-   */
-  def readSchemaFromFooter(
-      footer: Footer, converter: CatalystSchemaConverter): StructType = {
-    val fileMetaData = footer.getParquetMetadata.getFileMetaData
-    fileMetaData
-      .getKeyValueMetaData
-      .toMap
-      .get(RowReadSupport.SPARK_METADATA_KEY)
-      .flatMap(deserializeSchemaString)
-      .getOrElse(converter.convert(fileMetaData.getSchema))
-  }
-
-  private def deserializeSchemaString(schemaString: String): 
Option[StructType] = {
-    // Tries to deserialize the schema string as JSON first, then falls back 
to the case class
-    // string parser (data generated by older versions of Spark SQL uses this 
format).
-    Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
-      case _: Throwable =>
-        logInfo(
-          s"Serialized Spark schema in Parquet key-value metadata is not in 
JSON format, " +
-            "falling back to the deprecated DataType.fromCaseClassString 
parser.")
-        DataType.fromCaseClassString(schemaString).asInstanceOf[StructType]
-    }.recoverWith {
-      case cause: Throwable =>
-        logWarning(
-          "Failed to parse and ignored serialized Spark schema in " +
-            s"Parquet key-value metadata:\n\t$schemaString", cause)
-        Failure(cause)
-    }.toOption
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 23df102..b6a7c4f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.parquet
 
-import org.scalatest.BeforeAndAfterAll
 import org.apache.parquet.filter2.predicate.Operators._
 import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
 
@@ -40,7 +39,7 @@ import org.apache.spark.sql.{Column, DataFrame, QueryTest, 
Row, SQLConf}
  * 2. `Tuple1(Option(x))` is used together with `AnyVal` types like `Int` to 
ensure the inferred
  *    data type is nullable.
  */
-class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
+class ParquetFilterSuite extends QueryTest with ParquetTest {
   lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
 
   private def checkFilterPredicate(
@@ -56,17 +55,9 @@ class ParquetFilterSuiteBase extends QueryTest with 
ParquetTest {
         .select(output.map(e => Column(e)): _*)
         .where(Column(predicate))
 
-      val maybeAnalyzedPredicate = {
-        val forParquetTableScan = query.queryExecution.executedPlan.collect {
-          case plan: ParquetTableScan => plan.columnPruningPred
-        }.flatten.reduceOption(_ && _)
-
-        val forParquetDataSource = query.queryExecution.optimizedPlan.collect {
-          case PhysicalOperation(_, filters, LogicalRelation(_: 
ParquetRelation2)) => filters
-        }.flatten.reduceOption(_ && _)
-
-        forParquetTableScan.orElse(forParquetDataSource)
-      }
+      val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+        case PhysicalOperation(_, filters, LogicalRelation(_: 
ParquetRelation)) => filters
+      }.flatten.reduceOption(_ && _)
 
       assert(maybeAnalyzedPredicate.isDefined)
       maybeAnalyzedPredicate.foreach { pred =>
@@ -98,7 +89,7 @@ class ParquetFilterSuiteBase extends QueryTest with 
ParquetTest {
       (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], 
expected: Seq[Row])
       (implicit df: DataFrame): Unit = {
     def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = {
-      
assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).toSeq.sorted) {
+      assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).sorted) 
{
         df.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
       }
     }
@@ -308,18 +299,6 @@ class ParquetFilterSuiteBase extends QueryTest with 
ParquetTest {
         '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
     }
   }
-}
-
-class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with 
BeforeAndAfterAll {
-  lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
-  override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
-  }
-
-  override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
-  }
 
   test("SPARK-6554: don't push down predicates which reference partition 
columns") {
     import sqlContext.implicits._
@@ -338,37 +317,3 @@ class ParquetDataSourceOnFilterSuite extends 
ParquetFilterSuiteBase with BeforeA
     }
   }
 }
-
-class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with 
BeforeAndAfterAll {
-  lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
-  override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
-  }
-
-  override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
-  }
-
-  test("SPARK-6742: don't push down predicates which reference partition 
columns") {
-    import sqlContext.implicits._
-
-    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
-      withTempPath { dir =>
-        val path = s"${dir.getCanonicalPath}/part=1"
-        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
-
-        // If the "part = 1" filter gets pushed down, this query will throw an 
exception since
-        // "part" is not a valid column in the actual Parquet file
-        val df = DataFrame(sqlContext, 
org.apache.spark.sql.parquet.ParquetRelation(
-          path,
-          Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext,
-          Seq(AttributeReference("part", IntegerType, false)()) ))
-
-        checkAnswer(
-          df.filter("a = 1 or part = 1"),
-          (1 to 3).map(i => Row(1, i, i.toString)))
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 3a5b860..b5314a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -32,7 +32,6 @@ import 
org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, P
 import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, 
ParquetOutputCommitter, ParquetWriter}
 import org.apache.parquet.io.api.RecordConsumer
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
-import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql._
@@ -63,7 +62,7 @@ private[parquet] class TestGroupWriteSupport(schema: 
MessageType) extends WriteS
 /**
  * A test suite that tests basic Parquet I/O.
  */
-class ParquetIOSuiteBase extends QueryTest with ParquetTest {
+class ParquetIOSuite extends QueryTest with ParquetTest {
   lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
   import sqlContext.implicits._
 
@@ -357,7 +356,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest 
{
       """.stripMargin)
 
     withTempPath { location =>
-      val extraMetadata = Map(RowReadSupport.SPARK_METADATA_KEY -> 
sparkSchema.toString)
+      val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> 
sparkSchema.toString)
       val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, 
"Spark")
       val path = new Path(location.getCanonicalPath)
 
@@ -422,26 +421,6 @@ class ParquetIOSuiteBase extends QueryTest with 
ParquetTest {
       }
     }
   }
-}
-
-class BogusParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
-  extends ParquetOutputCommitter(outputPath, context) {
-
-  override def commitJob(jobContext: JobContext): Unit = {
-    sys.error("Intentional exception for testing purposes")
-  }
-}
-
-class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with 
BeforeAndAfterAll {
-  private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
-  override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
-  }
-
-  override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key, 
originalConf.toString)
-  }
 
   test("SPARK-6330 regression test") {
     // In 1.3.0, save to fs other than file: without configuring core-site.xml 
would get:
@@ -456,14 +435,10 @@ class ParquetDataSourceOnIOSuite extends 
ParquetIOSuiteBase with BeforeAndAfterA
   }
 }
 
-class ParquetDataSourceOffIOSuite extends ParquetIOSuiteBase with 
BeforeAndAfterAll {
-  private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
-  override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
-  }
+class BogusParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
+  extends ParquetOutputCommitter(outputPath, context) {
 
-  override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
+  override def commitJob(jobContext: JobContext): Unit = {
+    sys.error("Intentional exception for testing purposes")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 7f16b11..2eef101 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -467,7 +467,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest {
       (1 to 10).map(i => (i, i.toString)).toDF("a", 
"b").write.parquet(dir.getCanonicalPath)
       val queryExecution = 
sqlContext.read.parquet(dir.getCanonicalPath).queryExecution
       queryExecution.analyzed.collectFirst {
-        case LogicalRelation(relation: ParquetRelation2) =>
+        case LogicalRelation(relation: ParquetRelation) =>
           assert(relation.partitionSpec === PartitionSpec.emptySpec)
       }.getOrElse {
         fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 21007d9..c037faf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.parquet
 
 import org.apache.hadoop.fs.Path
-import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{QueryTest, Row, SQLConf}
@@ -26,7 +25,7 @@ import org.apache.spark.sql.{QueryTest, Row, SQLConf}
 /**
  * A test suite that tests various Parquet queries.
  */
-class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
+class ParquetQuerySuite extends QueryTest with ParquetTest {
   lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
   import sqlContext.sql
 
@@ -164,27 +163,3 @@ class ParquetQuerySuiteBase extends QueryTest with 
ParquetTest {
     }
   }
 }
-
-class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with 
BeforeAndAfterAll {
-  private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
-  override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
-  }
-
-  override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
-  }
-}
-
-class ParquetDataSourceOffQuerySuite extends ParquetQuerySuiteBase with 
BeforeAndAfterAll {
-  private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
-  override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
-  }
-
-  override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index fa62939..4a0b3b6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -378,7 +378,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
         StructField("lowerCase", StringType),
         StructField("UPPERCase", DoubleType, nullable = false)))) {
 
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      ParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("lowercase", StringType),
           StructField("uppercase", DoubleType, nullable = false))),
@@ -393,7 +393,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       StructType(Seq(
         StructField("UPPERCase", DoubleType, nullable = false)))) {
 
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      ParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("uppercase", DoubleType, nullable = false))),
 
@@ -404,7 +404,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
 
     // Metastore schema contains additional non-nullable fields.
     assert(intercept[Throwable] {
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      ParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("uppercase", DoubleType, nullable = false),
           StructField("lowerCase", BinaryType, nullable = false))),
@@ -415,7 +415,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
 
     // Conflicting non-nullable field names
     intercept[Throwable] {
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      ParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(StructField("lower", StringType, nullable = false))),
         StructType(Seq(StructField("lowerCase", BinaryType))))
     }
@@ -429,7 +429,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
         StructField("firstField", StringType, nullable = true),
         StructField("secondField", StringType, nullable = true),
         StructField("thirdfield", StringType, nullable = true)))) {
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      ParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("firstfield", StringType, nullable = true),
           StructField("secondfield", StringType, nullable = true),
@@ -442,7 +442,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     // Merge should fail if the Metastore contains any additional fields that 
are not
     // nullable.
     assert(intercept[Throwable] {
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      ParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("firstfield", StringType, nullable = true),
           StructField("secondfield", StringType, nullable = true),

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 4cdb83c..1b8edef 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -444,9 +444,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) 
with Logging {
       HiveDDLStrategy,
       DDLStrategy,
       TakeOrderedAndProject,
-      ParquetOperations,
       InMemoryScans,
-      ParquetConversion, // Must be before HiveTableScans
       HiveTableScans,
       DataSinks,
       Scripts,

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0a2121c..2629235 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConversions._
 
 import com.google.common.base.Objects
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.hadoop.hive.metastore.Warehouse
@@ -30,7 +29,6 @@ import org.apache.hadoop.hive.ql.metadata._
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, 
OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions._
@@ -39,10 +37,11 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.datasources
-import org.apache.spark.sql.execution.datasources.{Partition => 
ParquetPartition, PartitionSpec, CreateTableUsingAsSelect, ResolvedDataSource, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation, Partition => ParquetPartition, PartitionSpec, 
ResolvedDataSource}
 import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.ParquetRelation
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
 
 
 private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: 
HiveContext)
@@ -260,8 +259,8 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
     // serialize the Metastore schema to JSON and pass it as a data source 
option because of the
     // evil case insensitivity issue, which is reconciled within 
`ParquetRelation2`.
     val parquetOptions = Map(
-      ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
-      ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
+      ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
+      ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
     val tableIdentifier =
       QualifiedTableName(metastoreRelation.databaseName, 
metastoreRelation.tableName)
 
@@ -272,7 +271,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
       cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => None // Cache miss
-        case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
+        case logical@LogicalRelation(parquetRelation: ParquetRelation) =>
           // If we have the same paths, same schema, and same partition spec,
           // we will use the cached Parquet Relation.
           val useCached =
@@ -317,7 +316,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
       val cached = getCached(tableIdentifier, paths, metastoreSchema, 
Some(partitionSpec))
       val parquetRelation = cached.getOrElse {
         val created = LogicalRelation(
-          new ParquetRelation2(
+          new ParquetRelation(
             paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
         cachedDataSourceTables.put(tableIdentifier, created)
         created
@@ -330,7 +329,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
       val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
       val parquetRelation = cached.getOrElse {
         val created = LogicalRelation(
-          new ParquetRelation2(paths.toArray, None, None, 
parquetOptions)(hive))
+          new ParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
         cachedDataSourceTables.put(tableIdentifier, created)
         created
       }
@@ -370,8 +369,6 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
   /**
    * When scanning or writing to non-partitioned Metastore Parquet tables, 
convert them to Parquet
    * data source relations for better performance.
-   *
-   * This rule can be considered as [[HiveStrategies.ParquetConversion]] done 
right.
    */
   object ParquetConversions extends Rule[LogicalPlan] {
     override def apply(plan: LogicalPlan): LogicalPlan = {
@@ -386,7 +383,6 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
             // Inserting into partitioned table is not supported in Parquet 
data source (yet).
             if !relation.hiveQlTable.isPartitioned &&
               hive.convertMetastoreParquet &&
-              conf.parquetUseDataSourceApi &&
               
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
@@ -397,7 +393,6 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
           // Inserting into partitioned table is not supported in Parquet data 
source (yet).
           if !relation.hiveQlTable.isPartitioned &&
             hive.convertMetastoreParquet &&
-            conf.parquetUseDataSourceApi &&
             
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
@@ -406,7 +401,6 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         // Read path
         case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
             if hive.convertMetastoreParquet &&
-              conf.parquetUseDataSourceApi &&
               
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index a22c329..cd6cd32 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,23 +17,14 @@
 
 package org.apache.spark.sql.hive
 
-import scala.collection.JavaConversions._
-
-import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{DescribeCommand => 
RunnableDescribeCommand, _}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsing, 
CreateTableUsingAsSelect, DescribeCommand}
+import org.apache.spark.sql.execution.{DescribeCommand => 
RunnableDescribeCommand, _}
 import org.apache.spark.sql.hive.execution._
-import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.types.StringType
 
 
 private[hive] trait HiveStrategies {
@@ -42,136 +33,6 @@ private[hive] trait HiveStrategies {
 
   val hiveContext: HiveContext
 
-  /**
-   * :: Experimental ::
-   * Finds table scans that would use the Hive SerDe and replaces them with 
our own native parquet
-   * table scan operator.
-   *
-   * TODO: Much of this logic is duplicated in HiveTableScan.  Ideally we 
would do some refactoring
-   * but since this is after the code freeze for 1.1 all logic is here to 
minimize disruption.
-   *
-   * Other issues:
-   *  - Much of this logic assumes case insensitive resolution.
-   */
-  @Experimental
-  object ParquetConversion extends Strategy {
-    implicit class LogicalPlanHacks(s: DataFrame) {
-      def lowerCase: DataFrame = DataFrame(s.sqlContext, s.logicalPlan)
-
-      def addPartitioningAttributes(attrs: Seq[Attribute]): DataFrame = {
-        // Don't add the partitioning key if its already present in the data.
-        if 
(attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
-          s
-        } else {
-          DataFrame(
-            s.sqlContext,
-            s.logicalPlan transform {
-              case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
-            })
-        }
-      }
-    }
-
-    implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
-      def fakeOutput(newOutput: Seq[Attribute]): OutputFaker =
-        OutputFaker(
-          originalPlan.output.map(a =>
-            newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
-              .getOrElse(
-                sys.error(s"Can't find attribute $a to fake in set 
${newOutput.mkString(",")}"))),
-          originalPlan)
-    }
-
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case PhysicalOperation(projectList, predicates, relation: 
MetastoreRelation)
-          if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
-             hiveContext.convertMetastoreParquet &&
-             !hiveContext.conf.parquetUseDataSourceApi =>
-
-        // Filter out all predicates that only deal with partition keys
-        val partitionsKeys = AttributeSet(relation.partitionKeys)
-        val (pruningPredicates, otherPredicates) = predicates.partition {
-          _.references.subsetOf(partitionsKeys)
-        }
-
-        // We are going to throw the predicates and projection back at the 
whole optimization
-        // sequence so lets unresolve all the attributes, allowing them to be 
rebound to the
-        // matching parquet attributes.
-        val unresolvedOtherPredicates = Column(otherPredicates.map(_ transform 
{
-          case a: AttributeReference => UnresolvedAttribute(a.name)
-        }).reduceOption(And).getOrElse(Literal(true)))
-
-        val unresolvedProjection: Seq[Column] = projectList.map(_ transform {
-          case a: AttributeReference => UnresolvedAttribute(a.name)
-        }).map(Column(_))
-
-        try {
-          if (relation.hiveQlTable.isPartitioned) {
-            val rawPredicate = 
pruningPredicates.reduceOption(And).getOrElse(Literal(true))
-            // Translate the predicate so that it automatically casts the 
input values to the
-            // correct data types during evaluation.
-            val castedPredicate = rawPredicate transform {
-              case a: AttributeReference =>
-                val idx = relation.partitionKeys.indexWhere(a.exprId == 
_.exprId)
-                val key = relation.partitionKeys(idx)
-                Cast(BoundReference(idx, StringType, nullable = true), 
key.dataType)
-            }
-
-            val inputData = new GenericMutableRow(relation.partitionKeys.size)
-            val pruningCondition =
-              if (codegenEnabled) {
-                GeneratePredicate.generate(castedPredicate)
-              } else {
-                InterpretedPredicate.create(castedPredicate)
-              }
-
-            val partitions = 
relation.getHiveQlPartitions(pruningPredicates).filter { part =>
-              val partitionValues = part.getValues
-              var i = 0
-              while (i < partitionValues.size()) {
-                inputData(i) = 
CatalystTypeConverters.convertToCatalyst(partitionValues(i))
-                i += 1
-              }
-              pruningCondition(inputData)
-            }
-
-            val partitionLocations = partitions.map(_.getLocation)
-
-            if (partitionLocations.isEmpty) {
-              PhysicalRDD(plan.output, sparkContext.emptyRDD[InternalRow]) :: 
Nil
-            } else {
-              hiveContext
-                .read.parquet(partitionLocations: _*)
-                .addPartitioningAttributes(relation.partitionKeys)
-                .lowerCase
-                .where(unresolvedOtherPredicates)
-                .select(unresolvedProjection: _*)
-                .queryExecution
-                .executedPlan
-                .fakeOutput(projectList.map(_.toAttribute)) :: Nil
-            }
-
-          } else {
-            hiveContext
-              .read.parquet(relation.hiveQlTable.getDataLocation.toString)
-              .lowerCase
-              .where(unresolvedOtherPredicates)
-              .select(unresolvedProjection: _*)
-              .queryExecution
-              .executedPlan
-              .fakeOutput(projectList.map(_.toAttribute)) :: Nil
-          }
-        } catch {
-          // parquetFile will throw an exception when there is no data.
-          // TODO: Remove this hack for Spark 1.3.
-          case iae: java.lang.IllegalArgumentException
-              if iae.getMessage.contains("Can not create a Path from an empty 
string") =>
-            PhysicalRDD(plan.output, sparkContext.emptyRDD[InternalRow]) :: Nil
-        }
-      case _ => Nil
-    }
-  }
-
   object Scripts extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.ScriptTransformation(input, script, output, child, schema: 
HiveScriptIOSchema) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index af68615..a45c2d9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.parquet.ParquetTest
-import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.sql.{QueryTest, Row}
 
 case class Cases(lower: String, UPPER: String)
 
@@ -28,64 +28,54 @@ class HiveParquetSuite extends QueryTest with ParquetTest {
 
   import sqlContext._
 
-  def run(prefix: String): Unit = {
-    test(s"$prefix: Case insensitive attribute names") {
-      withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), 
"cases") {
-        val expected = (1 to 4).map(i => Row(i.toString))
-        checkAnswer(sql("SELECT upper FROM cases"), expected)
-        checkAnswer(sql("SELECT LOWER FROM cases"), expected)
-      }
+  test("Case insensitive attribute names") {
+    withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), 
"cases") {
+      val expected = (1 to 4).map(i => Row(i.toString))
+      checkAnswer(sql("SELECT upper FROM cases"), expected)
+      checkAnswer(sql("SELECT LOWER FROM cases"), expected)
     }
+  }
 
-    test(s"$prefix: SELECT on Parquet table") {
-      val data = (1 to 4).map(i => (i, s"val_$i"))
-      withParquetTable(data, "t") {
-        checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple))
-      }
+  test("SELECT on Parquet table") {
+    val data = (1 to 4).map(i => (i, s"val_$i"))
+    withParquetTable(data, "t") {
+      checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple))
     }
+  }
 
-    test(s"$prefix: Simple column projection + filter on Parquet table") {
-      withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
-        checkAnswer(
-          sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
-          Seq(Row(true, "val_2"), Row(true, "val_4")))
-      }
+  test("Simple column projection + filter on Parquet table") {
+    withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
+      checkAnswer(
+        sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
+        Seq(Row(true, "val_2"), Row(true, "val_4")))
     }
+  }
 
-    test(s"$prefix: Converting Hive to Parquet Table via saveAsParquetFile") {
-      withTempPath { dir =>
-        sql("SELECT * FROM src").write.parquet(dir.getCanonicalPath)
-        read.parquet(dir.getCanonicalPath).registerTempTable("p")
-        withTempTable("p") {
-          checkAnswer(
-            sql("SELECT * FROM src ORDER BY key"),
-            sql("SELECT * from p ORDER BY key").collect().toSeq)
-        }
+  test("Converting Hive to Parquet Table via saveAsParquetFile") {
+    withTempPath { dir =>
+      sql("SELECT * FROM src").write.parquet(dir.getCanonicalPath)
+      read.parquet(dir.getCanonicalPath).registerTempTable("p")
+      withTempTable("p") {
+        checkAnswer(
+          sql("SELECT * FROM src ORDER BY key"),
+          sql("SELECT * from p ORDER BY key").collect().toSeq)
       }
     }
+  }
 
-    test(s"$prefix: INSERT OVERWRITE TABLE Parquet table") {
-      withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") {
-        withTempPath { file =>
-          sql("SELECT * FROM t LIMIT 1").write.parquet(file.getCanonicalPath)
-          read.parquet(file.getCanonicalPath).registerTempTable("p")
-          withTempTable("p") {
-            // let's do three overwrites for good measure
-            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
-            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
-            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
-            checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM 
t").collect().toSeq)
-          }
+  test("INSERT OVERWRITE TABLE Parquet table") {
+    withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") {
+      withTempPath { file =>
+        sql("SELECT * FROM t LIMIT 1").write.parquet(file.getCanonicalPath)
+        read.parquet(file.getCanonicalPath).registerTempTable("p")
+        withTempTable("p") {
+          // let's do three overwrites for good measure
+          sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+          sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+          sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+          checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM 
t").collect().toSeq)
         }
       }
     }
   }
-
-  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "true") {
-    run("Parquet data source enabled")
-  }
-
-  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "false") {
-    run("Parquet data source disabled")
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index e403f32..4fdf774 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -21,10 +21,9 @@ import java.io.File
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.scalatest.BeforeAndAfterAll
-
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.InvalidInputException
+import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.Logging
 import org.apache.spark.sql._
@@ -33,7 +32,7 @@ import org.apache.spark.sql.hive.client.{HiveTable, 
ManagedTable}
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.ParquetRelation
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -564,10 +563,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with BeforeA
   }
 
   test("scan a parquet table created through a CTAS statement") {
-    withSQLConf(
-      HiveContext.CONVERT_METASTORE_PARQUET.key -> "true",
-      SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "true") {
-
+    withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "true") {
       withTempTable("jt") {
         (1 to 10).map(i => i -> s"str$i").toDF("a", 
"b").registerTempTable("jt")
 
@@ -582,9 +578,9 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with BeforeA
             Row(3) :: Row(4) :: Nil)
 
           table("test_parquet_ctas").queryExecution.optimizedPlan match {
-            case LogicalRelation(p: ParquetRelation2) => // OK
+            case LogicalRelation(p: ParquetRelation) => // OK
             case _ =>
-              fail(s"test_parquet_ctas should have be converted to 
${classOf[ParquetRelation2]}")
+              fail(s"test_parquet_ctas should have be converted to 
${classOf[ParquetRelation]}")
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 0342826..ff42fde 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -28,7 +28,8 @@ import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, 
MetastoreRelation}
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 
 case class Nested1(f1: Nested2)
@@ -61,7 +62,9 @@ class MyDialect extends DefaultParserDialect
  * Hive to generate them (in contrast to HiveQuerySuite).  Often this is 
because the query is
  * valid, but Hive currently cannot execute it.
  */
-class SQLQuerySuite extends QueryTest {
+class SQLQuerySuite extends QueryTest with SQLTestUtils {
+  override def sqlContext: SQLContext = TestHive
+
   test("SPARK-6835: udtf in lateral view") {
     val df = Seq((1, 1)).toDF("c1", "c2")
     df.registerTempTable("table1")
@@ -195,17 +198,17 @@ class SQLQuerySuite extends QueryTest {
     def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = 
{
       val relation = 
EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
       relation match {
-        case LogicalRelation(r: ParquetRelation2) =>
+        case LogicalRelation(r: ParquetRelation) =>
           if (!isDataSourceParquet) {
             fail(
               s"${classOf[MetastoreRelation].getCanonicalName} is expected, 
but found " +
-              s"${ParquetRelation2.getClass.getCanonicalName}.")
+              s"${ParquetRelation.getClass.getCanonicalName}.")
           }
 
         case r: MetastoreRelation =>
           if (isDataSourceParquet) {
             fail(
-              s"${ParquetRelation2.getClass.getCanonicalName} is expected, but 
found " +
+              s"${ParquetRelation.getClass.getCanonicalName} is expected, but 
found " +
               s"${classOf[MetastoreRelation].getCanonicalName}.")
           }
       }
@@ -350,33 +353,26 @@ class SQLQuerySuite extends QueryTest {
       "serde_p1=p1", "serde_p2=p2", "tbl_p1=p11", "tbl_p2=p22", "MANAGED_TABLE"
     )
 
-    val origUseParquetDataSource = conf.parquetUseDataSourceApi
-    try {
-      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
-      sql(
-        """CREATE TABLE ctas5
-          | STORED AS parquet AS
-          |   SELECT key, value
-          |   FROM src
-          |   ORDER BY key, value""".stripMargin).collect()
-
-      checkExistence(sql("DESC EXTENDED ctas5"), true,
-        "name:key", "type:string", "name:value", "ctas5",
-        "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
-        "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
-        "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
-        "MANAGED_TABLE"
-      )
-
-      val default = convertMetastoreParquet
-      // use the Hive SerDe for parquet tables
-      sql("set spark.sql.hive.convertMetastoreParquet = false")
+    sql(
+      """CREATE TABLE ctas5
+        | STORED AS parquet AS
+        |   SELECT key, value
+        |   FROM src
+        |   ORDER BY key, value""".stripMargin).collect()
+
+    checkExistence(sql("DESC EXTENDED ctas5"), true,
+      "name:key", "type:string", "name:value", "ctas5",
+      "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
+      "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
+      "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
+      "MANAGED_TABLE"
+    )
+
+    // use the Hive SerDe for parquet tables
+    withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
       checkAnswer(
         sql("SELECT key, value FROM ctas5 ORDER BY key, value"),
         sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)
-      sql(s"set spark.sql.hive.convertMetastoreParquet = $default")
-    } finally {
-      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, origUseParquetDataSource)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to