[SPARK-7591] [SQL] Partitioning support API tweaks Please see [SPARK-7591] [1] for the details.
/cc rxin marmbrus yhuai [1]: https://issues.apache.org/jira/browse/SPARK-7591 Author: Cheng Lian <[email protected]> Closes #6150 from liancheng/spark-7591 and squashes the following commits: af422e7 [Cheng Lian] Addresses @rxin's comments 37d1738 [Cheng Lian] Fixes HadoopFsRelation partition columns initialization 2fc680a [Cheng Lian] Fixes Scala style issue 189ad23 [Cheng Lian] Removes HadoopFsRelation constructor arguments 522c24e [Cheng Lian] Adds OutputWriterFactory 047d40d [Cheng Lian] Renames FSBased* to HadoopFs*, also renamed FSBasedParquetRelation back to ParquetRelation2 (cherry picked from commit fdf5bba35d201fe0de3901b4d47262c485c76569) Signed-off-by: Cheng Lian <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcb2c5d1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcb2c5d1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcb2c5d1 Branch: refs/heads/branch-1.4 Commit: bcb2c5d1690ee1fca32c51d031121490f8716dfb Parents: c0bb974 Author: Cheng Lian <[email protected]> Authored: Fri May 15 16:20:49 2015 +0800 Committer: Cheng Lian <[email protected]> Committed: Fri May 15 16:21:22 2015 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/SQLContext.scala | 14 +- .../spark/sql/parquet/fsBasedParquet.scala | 565 ------------------ .../apache/spark/sql/parquet/newParquet.scala | 570 +++++++++++++++++++ .../spark/sql/sources/DataSourceStrategy.scala | 10 +- .../spark/sql/sources/PartitioningUtils.scala | 4 + .../org/apache/spark/sql/sources/commands.scala | 23 +- .../org/apache/spark/sql/sources/ddl.scala | 8 +- .../apache/spark/sql/sources/interfaces.scala | 140 +++-- .../org/apache/spark/sql/sources/rules.scala | 2 +- .../spark/sql/parquet/ParquetFilterSuite.scala | 2 +- .../spark/sql/parquet/ParquetSchemaSuite.scala | 12 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 12 +- .../spark/sql/hive/execution/commands.scala | 2 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 6 +- .../sql/hive/execution/SQLQuerySuite.scala | 8 +- .../apache/spark/sql/hive/parquetSuites.scala | 20 +- .../spark/sql/sources/SimpleTextRelation.scala | 47 +- .../sql/sources/fsBasedRelationSuites.scala | 564 ------------------ .../sql/sources/hadoopFsRelationSuites.scala | 564 ++++++++++++++++++ 19 files changed, 1287 insertions(+), 1286 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index b33a700..9fb355e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.ParserDialect import org.apache.spark.sql.execution.{Filter, _} import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.json._ -import org.apache.spark.sql.parquet.FSBasedParquetRelation +import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -610,7 +610,7 @@ class SQLContext(@transient val sparkContext: SparkContext) } else if (conf.parquetUseDataSourceApi) { val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray baseRelationToDataFrame( - new FSBasedParquetRelation( + new ParquetRelation2( globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this)) } else { DataFrame(this, parquet.ParquetRelation( @@ -989,7 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def jdbc(url: String, table: String): DataFrame = { jdbc(url, table, JDBCRelation.columnPartition(null), new Properties()) } - + /** * :: Experimental :: * Construct a [[DataFrame]] representing the database table accessible via JDBC URL @@ -1002,7 +1002,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def jdbc(url: String, table: String, properties: Properties): DataFrame = { jdbc(url, table, JDBCRelation.columnPartition(null), properties) } - + /** * :: Experimental :: * Construct a [[DataFrame]] representing the database table accessible via JDBC URL @@ -1020,7 +1020,7 @@ class SQLContext(@transient val sparkContext: SparkContext) @Experimental def jdbc( url: String, - table: String, + table: String, columnName: String, lowerBound: Long, upperBound: Long, @@ -1056,7 +1056,7 @@ class SQLContext(@transient val sparkContext: SparkContext) val parts = JDBCRelation.columnPartition(partitioning) jdbc(url, table, parts, properties) } - + /** * :: Experimental :: * Construct a [[DataFrame]] representing the database table accessible via JDBC URL @@ -1093,7 +1093,7 @@ class SQLContext(@transient val sparkContext: SparkContext) } jdbc(url, table, parts, properties) } - + private def jdbc( url: String, table: String, http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala deleted file mode 100644 index c83a9c3..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala +++ /dev/null @@ -1,565 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.util.{List => JList} - -import scala.collection.JavaConversions._ -import scala.util.Try - -import com.google.common.base.Objects -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import parquet.filter2.predicate.FilterApi -import parquet.format.converter.ParquetMetadataConverter -import parquet.hadoop._ -import parquet.hadoop.metadata.CompressionCodecName -import parquet.hadoop.util.ContextUtil - -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.RDD._ -import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD} -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.sql.{Row, SQLConf, SQLContext} -import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} - -private[sql] class DefaultSource extends FSBasedRelationProvider { - override def createRelation( - sqlContext: SQLContext, - paths: Array[String], - schema: Option[StructType], - partitionColumns: Option[StructType], - parameters: Map[String, String]): FSBasedRelation = { - val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty)) - new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext) - } -} - -// NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[sql] class ParquetOutputWriter extends OutputWriter { - private var recordWriter: RecordWriter[Void, Row] = _ - private var taskAttemptContext: TaskAttemptContext = _ - - override def init( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): Unit = { - val conf = context.getConfiguration - val outputFormat = { - // When appending new Parquet files to an existing Parquet file directory, to avoid - // overwriting existing data files, we need to find out the max task ID encoded in these data - // file names. - // TODO Make this snippet a utility function for other data source developers - val maxExistingTaskId = { - // Note that `path` may point to a temporary location. Here we retrieve the real - // destination path from the configuration - val outputPath = new Path(conf.get("spark.sql.sources.output.path")) - val fs = outputPath.getFileSystem(conf) - - if (fs.exists(outputPath)) { - // Pattern used to match task ID in part file names, e.g.: - // - // part-r-00001.gz.part - // ^~~~~ - val partFilePattern = """part-.-(\d{1,}).*""".r - - fs.listStatus(outputPath).map(_.getPath.getName).map { - case partFilePattern(id) => id.toInt - case name if name.startsWith("_") => 0 - case name if name.startsWith(".") => 0 - case name => sys.error( - s"""Trying to write Parquet files to directory $outputPath, - |but found items with illegal name "$name" - """.stripMargin.replace('\n', ' ').trim) - }.reduceOption(_ max _).getOrElse(0) - } else { - 0 - } - } - - new ParquetOutputFormat[Row]() { - // Here we override `getDefaultWorkFile` for two reasons: - // - // 1. To allow appending. We need to generate output file name based on the max available - // task ID computed above. - // - // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses - // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all - // partitions in the case of dynamic partitioning. - override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1 - new Path(path, f"part-r-$split%05d$extension") - } - } - } - - recordWriter = outputFormat.getRecordWriter(context) - taskAttemptContext = context - } - - override def write(row: Row): Unit = recordWriter.write(null, row) - - override def close(): Unit = recordWriter.close(taskAttemptContext) -} - -private[sql] class FSBasedParquetRelation( - paths: Array[String], - private val maybeDataSchema: Option[StructType], - private val maybePartitionSpec: Option[PartitionSpec], - parameters: Map[String, String])( - val sqlContext: SQLContext) - extends FSBasedRelation(paths, maybePartitionSpec) - with Logging { - - // Should we merge schemas from all Parquet part-files? - private val shouldMergeSchemas = - parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean - - private val maybeMetastoreSchema = parameters - .get(FSBasedParquetRelation.METASTORE_SCHEMA) - .map(DataType.fromJson(_).asInstanceOf[StructType]) - - private val metadataCache = new MetadataCache - metadataCache.refresh() - - override def equals(other: scala.Any): Boolean = other match { - case that: FSBasedParquetRelation => - val schemaEquality = if (shouldMergeSchemas) { - this.shouldMergeSchemas == that.shouldMergeSchemas - } else { - this.dataSchema == that.dataSchema && - this.schema == that.schema - } - - this.paths.toSet == that.paths.toSet && - schemaEquality && - this.maybeDataSchema == that.maybeDataSchema && - this.partitionColumns == that.partitionColumns - - case _ => false - } - - override def hashCode(): Int = { - if (shouldMergeSchemas) { - Objects.hashCode( - Boolean.box(shouldMergeSchemas), - paths.toSet, - maybeDataSchema, - maybePartitionSpec) - } else { - Objects.hashCode( - Boolean.box(shouldMergeSchemas), - paths.toSet, - dataSchema, - schema, - maybeDataSchema, - maybePartitionSpec) - } - } - - override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter] - - override def dataSchema: StructType = metadataCache.dataSchema - - override private[sql] def refresh(): Unit = { - metadataCache.refresh() - super.refresh() - } - - // Parquet data source always uses Catalyst internal representations. - override val needConversion: Boolean = false - - override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum - - override def prepareForWrite(job: Job): Unit = { - val conf = ContextUtil.getConfiguration(job) - - val committerClass = - conf.getClass( - "spark.sql.parquet.output.committer.class", - classOf[ParquetOutputCommitter], - classOf[ParquetOutputCommitter]) - - conf.setClass( - "mapred.output.committer.class", - committerClass, - classOf[ParquetOutputCommitter]) - - // TODO There's no need to use two kinds of WriteSupport - // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and - // complex types. - val writeSupportClass = - if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) { - classOf[MutableRowWriteSupport] - } else { - classOf[RowWriteSupport] - } - - ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass) - RowWriteSupport.setSchema(dataSchema.toAttributes, conf) - - // Sets compression scheme - conf.set( - ParquetOutputFormat.COMPRESSION, - ParquetRelation - .shortParquetCompressionCodecNames - .getOrElse( - sqlContext.conf.parquetCompressionCodec.toUpperCase, - CompressionCodecName.UNCOMPRESSED).name()) - } - - override def buildScan( - requiredColumns: Array[String], - filters: Array[Filter], - inputPaths: Array[String]): RDD[Row] = { - - val job = new Job(SparkHadoopUtil.get.conf) - val conf = ContextUtil.getConfiguration(job) - - ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport]) - - if (inputPaths.nonEmpty) { - FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*) - } - - // Try to push down filters when filter push-down is enabled. - if (sqlContext.conf.parquetFilterPushDown) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(ParquetFilters.createFilter(dataSchema, _)) - .reduceOption(FilterApi.and) - .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) - } - - conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { - val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) - ParquetTypesConverter.convertToString(requestedSchema.toAttributes) - }) - - conf.set( - RowWriteSupport.SPARK_ROW_SCHEMA, - ParquetTypesConverter.convertToString(dataSchema.toAttributes)) - - // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata - val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean - conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString) - - val inputFileStatuses = - metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString)) - - val footers = inputFileStatuses.map(metadataCache.footers) - - // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. - // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and - // footers. Especially when a global arbitrative schema (either from metastore or data source - // DDL) is available. - new NewHadoopRDD( - sqlContext.sparkContext, - classOf[FilteringParquetRowInputFormat], - classOf[Void], - classOf[Row], - conf) { - - val cacheMetadata = useMetadataCache - - @transient val cachedStatuses = inputFileStatuses.map { f => - // In order to encode the authority of a Path containing special characters such as /, - // we need to use the string returned by the URI of the path to create a new Path. - val pathWithAuthority = new Path(f.getPath.toUri.toString) - - new FileStatus( - f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, - f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority) - }.toSeq - - @transient val cachedFooters = footers.map { f => - // In order to encode the authority of a Path containing special characters such as /, - // we need to use the string returned by the URI of the path to create a new Path. - new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata) - }.toSeq - - // Overridden so we can inject our own cached files statuses. - override def getPartitions: Array[SparkPartition] = { - val inputFormat = if (cacheMetadata) { - new FilteringParquetRowInputFormat { - override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses - - override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters - } - } else { - new FilteringParquetRowInputFormat - } - - val jobContext = newJobContext(getConf, jobId) - val rawSplits = inputFormat.getSplits(jobContext) - - Array.tabulate[SparkPartition](rawSplits.size) { i => - new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) - } - } - }.values - } - - private class MetadataCache { - // `FileStatus` objects of all "_metadata" files. - private var metadataStatuses: Array[FileStatus] = _ - - // `FileStatus` objects of all "_common_metadata" files. - private var commonMetadataStatuses: Array[FileStatus] = _ - - // Parquet footer cache. - var footers: Map[FileStatus, Footer] = _ - - // `FileStatus` objects of all data files (Parquet part-files). - var dataStatuses: Array[FileStatus] = _ - - // Schema of the actual Parquet files, without partition columns discovered from partition - // directory paths. - var dataSchema: StructType = _ - - // Schema of the whole table, including partition columns. - var schema: StructType = _ - - /** - * Refreshes `FileStatus`es, footers, partition spec, and table schema. - */ - def refresh(): Unit = { - // Support either reading a collection of raw Parquet part-files, or a collection of folders - // containing Parquet files (e.g. partitioned Parquet table). - val baseStatuses = paths.distinct.flatMap { p => - val path = new Path(p) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory) - Try(fs.getFileStatus(qualified)).toOption - } - assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir)) - - // Lists `FileStatus`es of all leaf nodes (files) under all base directories. - val leaves = baseStatuses.flatMap { f => - val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf) - SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f => - isSummaryFile(f.getPath) || - !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) - } - } - - dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) - metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE) - commonMetadataStatuses = - leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) - - footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f => - val parquetMetadata = ParquetFileReader.readFooter( - SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER) - f -> new Footer(f.getPath, parquetMetadata) - }.seq.toMap - - dataSchema = { - val dataSchema0 = - maybeDataSchema - .orElse(readSchema()) - .orElse(maybeMetastoreSchema) - .getOrElse(sys.error("Failed to get the schema.")) - - // If this Parquet relation is converted from a Hive Metastore table, must reconcile case - // case insensitivity issue and possible schema mismatch (probably caused by schema - // evolution). - maybeMetastoreSchema - .map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0)) - .getOrElse(dataSchema0) - } - } - - private def isSummaryFile(file: Path): Boolean = { - file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || - file.getName == ParquetFileWriter.PARQUET_METADATA_FILE - } - - private def readSchema(): Option[StructType] = { - // Sees which file(s) we need to touch in order to figure out the schema. - // - // Always tries the summary files first if users don't require a merged schema. In this case, - // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row - // groups information, and could be much smaller for large Parquet files with lots of row - // groups. - // - // NOTE: Metadata stored in the summary files are merged from all part-files. However, for - // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know - // how to merge them correctly if some key is associated with different values in different - // part-files. When this happens, Parquet simply gives up generating the summary file. This - // implies that if a summary file presents, then: - // - // 1. Either all part-files have exactly the same Spark SQL schema, or - // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus - // their schemas may differ from each other). - // - // Here we tend to be pessimistic and take the second case into account. Basically this means - // we can't trust the summary files if users require a merged schema, and must touch all part- - // files to do the merge. - val filesToTouch = - if (shouldMergeSchemas) { - // Also includes summary files, 'cause there might be empty partition directories. - (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq - } else { - // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet - // don't have this. - commonMetadataStatuses.headOption - // Falls back to "_metadata" - .orElse(metadataStatuses.headOption) - // Summary file(s) not found, the Parquet file is either corrupted, or different part- - // files contain conflicting user defined metadata (two or more values are associated - // with a same key in different files). In either case, we fall back to any of the - // first part-file, and just assume all schemas are consistent. - .orElse(dataStatuses.headOption) - .toSeq - } - - assert( - filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined, - "No schema defined, " + - s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.") - - FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext) - } - } -} - -private[sql] object FSBasedParquetRelation extends Logging { - // Whether we should merge schemas collected from all Parquet part-files. - private[sql] val MERGE_SCHEMA = "mergeSchema" - - // Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used - // internally. - private[sql] val METASTORE_SCHEMA = "metastoreSchema" - - private[parquet] def readSchema( - footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { - footers.map { footer => - val metadata = footer.getParquetMetadata.getFileMetaData - val parquetSchema = metadata.getSchema - val maybeSparkSchema = metadata - .getKeyValueMetaData - .toMap - .get(RowReadSupport.SPARK_METADATA_KEY) - .flatMap { serializedSchema => - // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to - // whatever is available. - Try(DataType.fromJson(serializedSchema)) - .recover { case _: Throwable => - logInfo( - s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + - "falling back to the deprecated DataType.fromCaseClassString parser.") - DataType.fromCaseClassString(serializedSchema) - } - .recover { case cause: Throwable => - logWarning( - s"""Failed to parse serialized Spark schema in Parquet key-value metadata: - |\t$serializedSchema - """.stripMargin, - cause) - } - .map(_.asInstanceOf[StructType]) - .toOption - } - - maybeSparkSchema.getOrElse { - // Falls back to Parquet schema if Spark SQL schema is absent. - StructType.fromAttributes( - // TODO Really no need to use `Attribute` here, we only need to know the data type. - ParquetTypesConverter.convertToAttributes( - parquetSchema, - sqlContext.conf.isParquetBinaryAsString, - sqlContext.conf.isParquetINT96AsTimestamp)) - } - }.reduceOption { (left, right) => - try left.merge(right) catch { case e: Throwable => - throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) - } - } - } - - /** - * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore - * schema and Parquet schema. - * - * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the - * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't - * distinguish binary and string). This method generates a correct schema by merging Metastore - * schema data types and Parquet schema field names. - */ - private[parquet] def mergeMetastoreParquetSchema( - metastoreSchema: StructType, - parquetSchema: StructType): StructType = { - def schemaConflictMessage: String = - s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema: - |${metastoreSchema.prettyJson} - | - |Parquet schema: - |${parquetSchema.prettyJson} - """.stripMargin - - val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema) - - assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage) - - val ordinalMap = metastoreSchema.zipWithIndex.map { - case (field, index) => field.name.toLowerCase -> index - }.toMap - - val reorderedParquetSchema = mergedParquetSchema.sortBy(f => - ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1)) - - StructType(metastoreSchema.zip(reorderedParquetSchema).map { - // Uses Parquet field names but retains Metastore data types. - case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase => - mSchema.copy(name = pSchema.name) - case _ => - throw new SparkException(schemaConflictMessage) - }) - } - - /** - * Returns the original schema from the Parquet file with any missing nullable fields from the - * Hive Metastore schema merged in. - * - * When constructing a DataFrame from a collection of structured data, the resulting object has - * a schema corresponding to the union of the fields present in each element of the collection. - * Spark SQL simply assigns a null value to any field that isn't present for a particular row. - * In some cases, it is possible that a given table partition stored as a Parquet file doesn't - * contain a particular nullable field in its schema despite that field being present in the - * table schema obtained from the Hive Metastore. This method returns a schema representing the - * Parquet file schema along with any additional nullable fields from the Metastore schema - * merged in. - */ - private[parquet] def mergeMissingNullableFields( - metastoreSchema: StructType, - parquetSchema: StructType): StructType = { - val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap - val missingFields = metastoreSchema - .map(_.name.toLowerCase) - .diff(parquetSchema.map(_.name.toLowerCase)) - .map(fieldMap(_)) - .filter(_.nullable) - StructType(parquetSchema ++ missingFields) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala new file mode 100644 index 0000000..946062f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ +import scala.util.Try + +import com.google.common.base.Objects +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import parquet.filter2.predicate.FilterApi +import parquet.format.converter.ParquetMetadataConverter +import parquet.hadoop._ +import parquet.hadoop.metadata.CompressionCodecName +import parquet.hadoop.util.ContextUtil + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.RDD._ +import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD} +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.{Row, SQLConf, SQLContext} +import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} + +private[sql] class DefaultSource extends HadoopFsRelationProvider { + override def createRelation( + sqlContext: SQLContext, + paths: Array[String], + schema: Option[StructType], + partitionColumns: Option[StructType], + parameters: Map[String, String]): HadoopFsRelation = { + val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty)) + new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext) + } +} + +// NOTE: This class is instantiated and used on executor side only, no need to be serializable. +private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext) + extends OutputWriter { + + private val recordWriter: RecordWriter[Void, Row] = { + val conf = context.getConfiguration + val outputFormat = { + // When appending new Parquet files to an existing Parquet file directory, to avoid + // overwriting existing data files, we need to find out the max task ID encoded in these data + // file names. + // TODO Make this snippet a utility function for other data source developers + val maxExistingTaskId = { + // Note that `path` may point to a temporary location. Here we retrieve the real + // destination path from the configuration + val outputPath = new Path(conf.get("spark.sql.sources.output.path")) + val fs = outputPath.getFileSystem(conf) + + if (fs.exists(outputPath)) { + // Pattern used to match task ID in part file names, e.g.: + // + // part-r-00001.gz.parquet + // ^~~~~ + val partFilePattern = """part-.-(\d{1,}).*""".r + + fs.listStatus(outputPath).map(_.getPath.getName).map { + case partFilePattern(id) => id.toInt + case name if name.startsWith("_") => 0 + case name if name.startsWith(".") => 0 + case name => sys.error( + s"Trying to write Parquet files to directory $outputPath, " + + s"but found items with illegal name '$name'.") + }.reduceOption(_ max _).getOrElse(0) + } else { + 0 + } + } + + new ParquetOutputFormat[Row]() { + // Here we override `getDefaultWorkFile` for two reasons: + // + // 1. To allow appending. We need to generate output file name based on the max available + // task ID computed above. + // + // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses + // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all + // partitions in the case of dynamic partitioning. + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1 + new Path(path, f"part-r-$split%05d$extension") + } + } + } + + outputFormat.getRecordWriter(context) + } + + override def write(row: Row): Unit = recordWriter.write(null, row) + + override def close(): Unit = recordWriter.close(context) +} + +private[sql] class ParquetRelation2( + override val paths: Array[String], + private val maybeDataSchema: Option[StructType], + private val maybePartitionSpec: Option[PartitionSpec], + parameters: Map[String, String])( + val sqlContext: SQLContext) + extends HadoopFsRelation(maybePartitionSpec) + with Logging { + + // Should we merge schemas from all Parquet part-files? + private val shouldMergeSchemas = + parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean + + private val maybeMetastoreSchema = parameters + .get(ParquetRelation2.METASTORE_SCHEMA) + .map(DataType.fromJson(_).asInstanceOf[StructType]) + + private lazy val metadataCache: MetadataCache = { + val meta = new MetadataCache + meta.refresh() + meta + } + + override def equals(other: scala.Any): Boolean = other match { + case that: ParquetRelation2 => + val schemaEquality = if (shouldMergeSchemas) { + this.shouldMergeSchemas == that.shouldMergeSchemas + } else { + this.dataSchema == that.dataSchema && + this.schema == that.schema + } + + this.paths.toSet == that.paths.toSet && + schemaEquality && + this.maybeDataSchema == that.maybeDataSchema && + this.partitionColumns == that.partitionColumns + + case _ => false + } + + override def hashCode(): Int = { + if (shouldMergeSchemas) { + Objects.hashCode( + Boolean.box(shouldMergeSchemas), + paths.toSet, + maybeDataSchema, + maybePartitionSpec) + } else { + Objects.hashCode( + Boolean.box(shouldMergeSchemas), + paths.toSet, + dataSchema, + schema, + maybeDataSchema, + maybePartitionSpec) + } + } + + override def dataSchema: StructType = metadataCache.dataSchema + + override private[sql] def refresh(): Unit = { + metadataCache.refresh() + super.refresh() + } + + // Parquet data source always uses Catalyst internal representations. + override val needConversion: Boolean = false + + override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum + + override def userDefinedPartitionColumns: Option[StructType] = + maybePartitionSpec.map(_.partitionColumns) + + override def prepareJobForWrite(job: Job): OutputWriterFactory = { + val conf = ContextUtil.getConfiguration(job) + + val committerClass = + conf.getClass( + "spark.sql.parquet.output.committer.class", + classOf[ParquetOutputCommitter], + classOf[ParquetOutputCommitter]) + + conf.setClass( + "mapred.output.committer.class", + committerClass, + classOf[ParquetOutputCommitter]) + + // TODO There's no need to use two kinds of WriteSupport + // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and + // complex types. + val writeSupportClass = + if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) { + classOf[MutableRowWriteSupport] + } else { + classOf[RowWriteSupport] + } + + ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass) + RowWriteSupport.setSchema(dataSchema.toAttributes, conf) + + // Sets compression scheme + conf.set( + ParquetOutputFormat.COMPRESSION, + ParquetRelation + .shortParquetCompressionCodecNames + .getOrElse( + sqlContext.conf.parquetCompressionCodec.toUpperCase, + CompressionCodecName.UNCOMPRESSED).name()) + + new OutputWriterFactory { + override def newInstance( + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + new ParquetOutputWriter(path, context) + } + } + } + + override def buildScan( + requiredColumns: Array[String], + filters: Array[Filter], + inputPaths: Array[String]): RDD[Row] = { + + val job = new Job(SparkHadoopUtil.get.conf) + val conf = ContextUtil.getConfiguration(job) + + ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport]) + + if (inputPaths.nonEmpty) { + FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*) + } + + // Try to push down filters when filter push-down is enabled. + if (sqlContext.conf.parquetFilterPushDown) { + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(ParquetFilters.createFilter(dataSchema, _)) + .reduceOption(FilterApi.and) + .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) + } + + conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { + val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) + ParquetTypesConverter.convertToString(requestedSchema.toAttributes) + }) + + conf.set( + RowWriteSupport.SPARK_ROW_SCHEMA, + ParquetTypesConverter.convertToString(dataSchema.toAttributes)) + + // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata + val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean + conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString) + + val inputFileStatuses = + metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString)) + + val footers = inputFileStatuses.map(metadataCache.footers) + + // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. + // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and + // footers. Especially when a global arbitrative schema (either from metastore or data source + // DDL) is available. + new NewHadoopRDD( + sqlContext.sparkContext, + classOf[FilteringParquetRowInputFormat], + classOf[Void], + classOf[Row], + conf) { + + val cacheMetadata = useMetadataCache + + @transient val cachedStatuses = inputFileStatuses.map { f => + // In order to encode the authority of a Path containing special characters such as /, + // we need to use the string returned by the URI of the path to create a new Path. + val pathWithAuthority = new Path(f.getPath.toUri.toString) + + new FileStatus( + f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, + f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority) + }.toSeq + + @transient val cachedFooters = footers.map { f => + // In order to encode the authority of a Path containing special characters such as /, + // we need to use the string returned by the URI of the path to create a new Path. + new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata) + }.toSeq + + // Overridden so we can inject our own cached files statuses. + override def getPartitions: Array[SparkPartition] = { + val inputFormat = if (cacheMetadata) { + new FilteringParquetRowInputFormat { + override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses + + override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters + } + } else { + new FilteringParquetRowInputFormat + } + + val jobContext = newJobContext(getConf, jobId) + val rawSplits = inputFormat.getSplits(jobContext) + + Array.tabulate[SparkPartition](rawSplits.size) { i => + new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + } + } + }.values + } + + private class MetadataCache { + // `FileStatus` objects of all "_metadata" files. + private var metadataStatuses: Array[FileStatus] = _ + + // `FileStatus` objects of all "_common_metadata" files. + private var commonMetadataStatuses: Array[FileStatus] = _ + + // Parquet footer cache. + var footers: Map[FileStatus, Footer] = _ + + // `FileStatus` objects of all data files (Parquet part-files). + var dataStatuses: Array[FileStatus] = _ + + // Schema of the actual Parquet files, without partition columns discovered from partition + // directory paths. + var dataSchema: StructType = _ + + // Schema of the whole table, including partition columns. + var schema: StructType = _ + + /** + * Refreshes `FileStatus`es, footers, partition spec, and table schema. + */ + def refresh(): Unit = { + // Support either reading a collection of raw Parquet part-files, or a collection of folders + // containing Parquet files (e.g. partitioned Parquet table). + val baseStatuses = paths.distinct.flatMap { p => + val path = new Path(p) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory) + Try(fs.getFileStatus(qualified)).toOption + } + assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir)) + + // Lists `FileStatus`es of all leaf nodes (files) under all base directories. + val leaves = baseStatuses.flatMap { f => + val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf) + SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f => + isSummaryFile(f.getPath) || + !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) + } + } + + dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) + metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE) + commonMetadataStatuses = + leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) + + footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f => + val parquetMetadata = ParquetFileReader.readFooter( + SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER) + f -> new Footer(f.getPath, parquetMetadata) + }.seq.toMap + + dataSchema = { + val dataSchema0 = + maybeDataSchema + .orElse(readSchema()) + .orElse(maybeMetastoreSchema) + .getOrElse(sys.error("Failed to get the schema.")) + + // If this Parquet relation is converted from a Hive Metastore table, must reconcile case + // case insensitivity issue and possible schema mismatch (probably caused by schema + // evolution). + maybeMetastoreSchema + .map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0)) + .getOrElse(dataSchema0) + } + } + + private def isSummaryFile(file: Path): Boolean = { + file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || + file.getName == ParquetFileWriter.PARQUET_METADATA_FILE + } + + private def readSchema(): Option[StructType] = { + // Sees which file(s) we need to touch in order to figure out the schema. + // + // Always tries the summary files first if users don't require a merged schema. In this case, + // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row + // groups information, and could be much smaller for large Parquet files with lots of row + // groups. + // + // NOTE: Metadata stored in the summary files are merged from all part-files. However, for + // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know + // how to merge them correctly if some key is associated with different values in different + // part-files. When this happens, Parquet simply gives up generating the summary file. This + // implies that if a summary file presents, then: + // + // 1. Either all part-files have exactly the same Spark SQL schema, or + // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus + // their schemas may differ from each other). + // + // Here we tend to be pessimistic and take the second case into account. Basically this means + // we can't trust the summary files if users require a merged schema, and must touch all part- + // files to do the merge. + val filesToTouch = + if (shouldMergeSchemas) { + // Also includes summary files, 'cause there might be empty partition directories. + (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq + } else { + // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet + // don't have this. + commonMetadataStatuses.headOption + // Falls back to "_metadata" + .orElse(metadataStatuses.headOption) + // Summary file(s) not found, the Parquet file is either corrupted, or different part- + // files contain conflicting user defined metadata (two or more values are associated + // with a same key in different files). In either case, we fall back to any of the + // first part-file, and just assume all schemas are consistent. + .orElse(dataStatuses.headOption) + .toSeq + } + + assert( + filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined, + "No schema defined, " + + s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.") + + ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext) + } + } +} + +private[sql] object ParquetRelation2 extends Logging { + // Whether we should merge schemas collected from all Parquet part-files. + private[sql] val MERGE_SCHEMA = "mergeSchema" + + // Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used + // internally. + private[sql] val METASTORE_SCHEMA = "metastoreSchema" + + private[parquet] def readSchema( + footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { + footers.map { footer => + val metadata = footer.getParquetMetadata.getFileMetaData + val parquetSchema = metadata.getSchema + val maybeSparkSchema = metadata + .getKeyValueMetaData + .toMap + .get(RowReadSupport.SPARK_METADATA_KEY) + .flatMap { serializedSchema => + // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to + // whatever is available. + Try(DataType.fromJson(serializedSchema)) + .recover { case _: Throwable => + logInfo( + s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + + "falling back to the deprecated DataType.fromCaseClassString parser.") + DataType.fromCaseClassString(serializedSchema) + } + .recover { case cause: Throwable => + logWarning( + s"""Failed to parse serialized Spark schema in Parquet key-value metadata: + |\t$serializedSchema + """.stripMargin, + cause) + } + .map(_.asInstanceOf[StructType]) + .toOption + } + + maybeSparkSchema.getOrElse { + // Falls back to Parquet schema if Spark SQL schema is absent. + StructType.fromAttributes( + // TODO Really no need to use `Attribute` here, we only need to know the data type. + ParquetTypesConverter.convertToAttributes( + parquetSchema, + sqlContext.conf.isParquetBinaryAsString, + sqlContext.conf.isParquetINT96AsTimestamp)) + } + }.reduceOption { (left, right) => + try left.merge(right) catch { case e: Throwable => + throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) + } + } + } + + /** + * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore + * schema and Parquet schema. + * + * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the + * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't + * distinguish binary and string). This method generates a correct schema by merging Metastore + * schema data types and Parquet schema field names. + */ + private[parquet] def mergeMetastoreParquetSchema( + metastoreSchema: StructType, + parquetSchema: StructType): StructType = { + def schemaConflictMessage: String = + s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema: + |${metastoreSchema.prettyJson} + | + |Parquet schema: + |${parquetSchema.prettyJson} + """.stripMargin + + val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema) + + assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage) + + val ordinalMap = metastoreSchema.zipWithIndex.map { + case (field, index) => field.name.toLowerCase -> index + }.toMap + + val reorderedParquetSchema = mergedParquetSchema.sortBy(f => + ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1)) + + StructType(metastoreSchema.zip(reorderedParquetSchema).map { + // Uses Parquet field names but retains Metastore data types. + case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase => + mSchema.copy(name = pSchema.name) + case _ => + throw new SparkException(schemaConflictMessage) + }) + } + + /** + * Returns the original schema from the Parquet file with any missing nullable fields from the + * Hive Metastore schema merged in. + * + * When constructing a DataFrame from a collection of structured data, the resulting object has + * a schema corresponding to the union of the fields present in each element of the collection. + * Spark SQL simply assigns a null value to any field that isn't present for a particular row. + * In some cases, it is possible that a given table partition stored as a Parquet file doesn't + * contain a particular nullable field in its schema despite that field being present in the + * table schema obtained from the Hive Metastore. This method returns a schema representing the + * Parquet file schema along with any additional nullable fields from the Metastore schema + * merged in. + */ + private[parquet] def mergeMissingNullableFields( + metastoreSchema: StructType, + parquetSchema: StructType): StructType = { + val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap + val missingFields = metastoreSchema + .map(_.name.toLowerCase) + .diff(parquetSchema.map(_.name.toLowerCase)) + .map(fieldMap(_)) + .filter(_.nullable) + StructType(parquetSchema ++ missingFields) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index ee099ab..e6324b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -59,7 +59,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { (a, _) => t.buildScan(a)) :: Nil // Scanning partitioned FSBasedRelation - case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) + case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) if t.partitionSpec.partitionColumns.nonEmpty => val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray @@ -87,7 +87,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { selectedPartitions) :: Nil // Scanning non-partitioned FSBasedRelation - case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) => + case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) => val inputPaths = t.paths.map(new Path(_)).flatMap { path => val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration) val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory) @@ -111,10 +111,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: FSBasedRelation), part, query, overwrite, false) if part.isEmpty => + l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty => val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append execution.ExecutedCommand( - InsertIntoFSBasedRelation(t, query, Array.empty[String], mode)) :: Nil + InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil case _ => Nil } @@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { partitionColumns: StructType, partitions: Array[Partition]) = { val output = projections.map(_.toAttribute) - val relation = logicalRelation.relation.asInstanceOf[FSBasedRelation] + val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation] // Builds RDD[Row]s for each selected partition. val perPartitionRows = partitions.map { case Partition(partitionValues, dir) => http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index d30f7f6..d1f0cda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -35,6 +35,10 @@ private[sql] case class Partition(values: Row, path: String) private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition]) private[sql] object PartitioningUtils { + // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't + // depend on Hive. + private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" + private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) { require(columnNames.size == literals.size) } http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 7879328..a09bb08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -58,8 +58,8 @@ private[sql] case class InsertIntoDataSource( } } -private[sql] case class InsertIntoFSBasedRelation( - @transient relation: FSBasedRelation, +private[sql] case class InsertIntoHadoopFsRelation( + @transient relation: HadoopFsRelation, @transient query: LogicalPlan, partitionColumns: Array[String], mode: SaveMode) @@ -102,7 +102,7 @@ private[sql] case class InsertIntoFSBasedRelation( insert(new DefaultWriterContainer(relation, job), df) } else { val writerContainer = new DynamicPartitionWriterContainer( - relation, job, partitionColumns, "__HIVE_DEFAULT_PARTITION__") + relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME) insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns) } } @@ -234,7 +234,7 @@ private[sql] case class InsertIntoFSBasedRelation( } private[sql] abstract class BaseWriterContainer( - @transient val relation: FSBasedRelation, + @transient val relation: HadoopFsRelation, @transient job: Job) extends SparkHadoopMapReduceUtil with Logging @@ -261,7 +261,7 @@ private[sql] abstract class BaseWriterContainer( protected val dataSchema = relation.dataSchema - protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass + protected var outputWriterFactory: OutputWriterFactory = _ private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _ @@ -269,7 +269,7 @@ private[sql] abstract class BaseWriterContainer( setupIDs(0, 0, 0) setupConf() taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) - relation.prepareForWrite(job) + outputWriterFactory = relation.prepareJobForWrite(job) outputFormatClass = job.getOutputFormatClass outputCommitter = newOutputCommitter(taskAttemptContext) outputCommitter.setupJob(jobContext) @@ -346,16 +346,15 @@ private[sql] abstract class BaseWriterContainer( } private[sql] class DefaultWriterContainer( - @transient relation: FSBasedRelation, + @transient relation: HadoopFsRelation, @transient job: Job) extends BaseWriterContainer(relation, job) { @transient private var writer: OutputWriter = _ override protected def initWriters(): Unit = { - writer = outputWriterClass.newInstance() taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath) - writer.init(getWorkPath, dataSchema, taskAttemptContext) + writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) } override def outputWriterForRow(row: Row): OutputWriter = writer @@ -372,7 +371,7 @@ private[sql] class DefaultWriterContainer( } private[sql] class DynamicPartitionWriterContainer( - @transient relation: FSBasedRelation, + @transient relation: HadoopFsRelation, @transient job: Job, partitionColumns: Array[String], defaultPartitionName: String) @@ -398,12 +397,10 @@ private[sql] class DynamicPartitionWriterContainer( outputWriters.getOrElseUpdate(partitionPath, { val path = new Path(getWorkPath, partitionPath) - val writer = outputWriterClass.newInstance() taskAttemptContext.getConfiguration.set( "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) - writer.init(path.toString, dataSchema, taskAttemptContext) - writer + outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) }) } http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 595c5eb..37a569d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -226,7 +226,7 @@ private[sql] object ResolvedDataSource { case Some(schema: StructType) => clazz.newInstance() match { case dataSource: SchemaRelationProvider => dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema) - case dataSource: FSBasedRelationProvider => + case dataSource: HadoopFsRelationProvider => val maybePartitionsSchema = if (partitionColumns.isEmpty) { None } else { @@ -256,7 +256,7 @@ private[sql] object ResolvedDataSource { case None => clazz.newInstance() match { case dataSource: RelationProvider => dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) - case dataSource: FSBasedRelationProvider => + case dataSource: HadoopFsRelationProvider => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val paths = { val patternPath = new Path(caseInsensitiveOptions("path")) @@ -296,7 +296,7 @@ private[sql] object ResolvedDataSource { val relation = clazz.newInstance() match { case dataSource: CreatableRelationProvider => dataSource.createRelation(sqlContext, mode, options, data) - case dataSource: FSBasedRelationProvider => + case dataSource: HadoopFsRelationProvider => // Don't glob path for the write path. The contracts here are: // 1. Only one output path can be specified on the write path; // 2. Output path must be a legal HDFS style file system path; @@ -315,7 +315,7 @@ private[sql] object ResolvedDataSource { Some(partitionColumnsSchema(data.schema, partitionColumns)), caseInsensitiveOptions) sqlContext.executePlan( - InsertIntoFSBasedRelation( + InsertIntoHadoopFsRelation( r, data.logicalPlan, partitionColumns.toArray, http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 6f31530..274ab44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, _} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.types.{StructField, StructType} @@ -94,7 +94,7 @@ trait SchemaRelationProvider { * ::DeveloperApi:: * Implemented by objects that produce relations for a specific kind of data source * with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a - * USING clause specified (to specify the implemented [[FSBasedRelationProvider]]), a user defined + * USING clause specified (to specify the implemented [[HadoopFsRelationProvider]]), a user defined * schema, and an optional list of partition columns, this interface is used to pass in the * parameters specified by a user. * @@ -105,15 +105,15 @@ trait SchemaRelationProvider { * * A new instance of this class with be instantiated each time a DDL call is made. * - * The difference between a [[RelationProvider]] and a [[FSBasedRelationProvider]] is + * The difference between a [[RelationProvider]] and a [[HadoopFsRelationProvider]] is * that users need to provide a schema and a (possibly empty) list of partition columns when * using a SchemaRelationProvider. A relation provider can inherits both [[RelationProvider]], - * and [[FSBasedRelationProvider]] if it can support schema inference, user-specified + * and [[HadoopFsRelationProvider]] if it can support schema inference, user-specified * schemas, and accessing partitioned relations. * * @since 1.4.0 */ -trait FSBasedRelationProvider { +trait HadoopFsRelationProvider { /** * Returns a new base relation with the given parameters, a user defined schema, and a list of * partition columns. Note: the parameters' keywords are case insensitive and this insensitivity @@ -124,7 +124,7 @@ trait FSBasedRelationProvider { paths: Array[String], schema: Option[StructType], partitionColumns: Option[StructType], - parameters: Map[String, String]): FSBasedRelation + parameters: Map[String, String]): HadoopFsRelation } /** @@ -280,33 +280,42 @@ trait CatalystScan { /** * ::Experimental:: - * [[OutputWriter]] is used together with [[FSBasedRelation]] for persisting rows to the - * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor. - * An [[OutputWriter]] instance is created and initialized when a new output file is opened on - * executor side. This instance is used to persist rows to this single output file. + * A factory that produces [[OutputWriter]]s. A new [[OutputWriterFactory]] is created on driver + * side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized + * to executor side to create actual [[OutputWriter]]s on the fly. * * @since 1.4.0 */ @Experimental -abstract class OutputWriter { +abstract class OutputWriterFactory extends Serializable { /** - * Initializes this [[OutputWriter]] before any rows are persisted. + * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side + * to instantiate new [[OutputWriter]]s. * * @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that * this may not point to the final output file. For example, `FileOutputFormat` writes to * temporary directories and then merge written files back to the final destination. In * this case, `path` points to a temporary output file under the temporary directory. * @param dataSchema Schema of the rows to be written. Partition columns are not included in the - * schema if the corresponding relation is partitioned. + * schema if the relation being written is partitioned. * @param context The Hadoop MapReduce task context. * * @since 1.4.0 */ - def init( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): Unit = () + def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter +} +/** + * ::Experimental:: + * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the + * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor. + * An [[OutputWriter]] instance is created and initialized when a new output file is opened on + * executor side. This instance is used to persist rows to this single output file. + * + * @since 1.4.0 + */ +@Experimental +abstract class OutputWriter { /** * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned * tables, dynamic partition columns are not included in rows to be written. @@ -333,74 +342,71 @@ abstract class OutputWriter { * filter using selected predicates before producing an RDD containing all matching tuples as * [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file * systems, it's able to discover partitioning information from the paths of input directories, and - * perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]] must - * override one of the three `buildScan` methods to implement the read path. + * perform partition pruning before start reading the data. Subclasses of [[HadoopFsRelation()]] + * must override one of the three `buildScan` methods to implement the read path. * * For the write path, it provides the ability to write to both non-partitioned and partitioned * tables. Directory layout of the partitioned tables is compatible with Hive. * * @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument is for * implementing metastore table conversion. - * @param paths Base paths of this relation. For partitioned relations, it should be the root - * directories of all partition directories. - * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an optional + * + * @param maybePartitionSpec An [[HadoopFsRelation]] can be created with an optional * [[PartitionSpec]], so that partition discovery can be skipped. * * @since 1.4.0 */ @Experimental -abstract class FSBasedRelation private[sql]( - val paths: Array[String], - maybePartitionSpec: Option[PartitionSpec]) +abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec]) extends BaseRelation { + def this() = this(None) + + private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + + private val codegenEnabled = sqlContext.conf.codegenEnabled + + private var _partitionSpec: PartitionSpec = _ + + final private[sql] def partitionSpec: PartitionSpec = { + if (_partitionSpec == null) { + _partitionSpec = maybePartitionSpec + .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable)) + .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition]))) + .getOrElse { + if (sqlContext.conf.partitionDiscoveryEnabled()) { + discoverPartitions() + } else { + PartitionSpec(StructType(Nil), Array.empty[Partition]) + } + } + } + _partitionSpec + } + /** - * Constructs an [[FSBasedRelation]]. - * - * @param paths Base paths of this relation. For partitioned relations, it should be either root - * directories of all partition directories. - * @param partitionColumns Partition columns of this relation. + * Base paths of this relation. For partitioned relations, it should be either root directories + * of all partition directories. * * @since 1.4.0 */ - def this(paths: Array[String], partitionColumns: StructType) = - this(paths, { - if (partitionColumns.isEmpty) None - else Some(PartitionSpec(partitionColumns, Array.empty[Partition])) - }) + def paths: Array[String] /** - * Constructs an [[FSBasedRelation]]. - * - * @param paths Base paths of this relation. For partitioned relations, it should be root - * directories of all partition directories. + * Partition columns. Can be either defined by [[userDefinedPartitionColumns]] or automatically + * discovered. Note that they should always be nullable. * * @since 1.4.0 */ - def this(paths: Array[String]) = this(paths, None) - - private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) - - private val codegenEnabled = sqlContext.conf.codegenEnabled - - private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec => - spec.copy(partitionColumns = spec.partitionColumns.asNullable) - }.getOrElse { - if (sqlContext.conf.partitionDiscoveryEnabled()) { - discoverPartitions() - } else { - PartitionSpec(StructType(Nil), Array.empty[Partition]) - } - } - - private[sql] def partitionSpec: PartitionSpec = _partitionSpec + final def partitionColumns: StructType = + userDefinedPartitionColumns.getOrElse(partitionSpec.partitionColumns) /** - * Partition columns. Note that they are always nullable. + * Optional user defined partition columns. * * @since 1.4.0 */ - def partitionColumns: StructType = partitionSpec.partitionColumns + def userDefinedPartitionColumns: Option[StructType] = None private[sql] def refresh(): Unit = { if (sqlContext.conf.partitionDiscoveryEnabled()) { @@ -419,7 +425,7 @@ abstract class FSBasedRelation private[sql]( }.map(_.getPath) if (leafDirs.nonEmpty) { - PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__") + PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME) } else { PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition]) } @@ -458,7 +464,7 @@ abstract class FSBasedRelation private[sql]( * @since 1.4.0 */ def buildScan(inputPaths: Array[String]): RDD[Row] = { - throw new RuntimeException( + throw new UnsupportedOperationException( "At least one buildScan() method should be overridden to read the relation.") } @@ -520,8 +526,8 @@ abstract class FSBasedRelation private[sql]( } /** - * Client side preparation for data writing can be put here. For example, user defined output - * committer can be configured here. + * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can + * be put here. For example, user defined output committer can be configured here. * * Note that the only side effect expected here is mutating `job` via its setters. Especially, * Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states @@ -529,13 +535,5 @@ abstract class FSBasedRelation private[sql]( * * @since 1.4.0 */ - def prepareForWrite(job: Job): Unit = () - - /** - * This method is responsible for producing a new [[OutputWriter]] for each newly opened output - * file on the executor side. - * - * @since 1.4.0 - */ - def outputWriterClass: Class[_ <: OutputWriter] + def prepareJobForWrite(job: Job): OutputWriterFactory } http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index aad1d24..1eacdde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) => // OK - case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) => // OK + case logical.InsertIntoTable(LogicalRelation(_: HadoopFsRelation), _, _, _, _) => // OK case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) => // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index 3bbc5b0..5ad4395 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { }.flatten.reduceOption(_ && _) val forParquetDataSource = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters + case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters }.flatten.reduceOption(_ && _) forParquetTableScan.orElse(forParquetDataSource) http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala index fc90e3e..c964b6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -204,7 +204,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructField("lowerCase", StringType), StructField("UPPERCase", DoubleType, nullable = false)))) { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("lowercase", StringType), StructField("uppercase", DoubleType, nullable = false))), @@ -219,7 +219,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructType(Seq( StructField("UPPERCase", DoubleType, nullable = false)))) { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("uppercase", DoubleType, nullable = false))), @@ -230,7 +230,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Metastore schema contains additional non-nullable fields. assert(intercept[Throwable] { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("uppercase", DoubleType, nullable = false), StructField("lowerCase", BinaryType, nullable = false))), @@ -241,7 +241,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Conflicting non-nullable field names intercept[Throwable] { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq(StructField("lower", StringType, nullable = false))), StructType(Seq(StructField("lowerCase", BinaryType)))) } @@ -255,7 +255,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructField("firstField", StringType, nullable = true), StructField("secondField", StringType, nullable = true), StructField("thirdfield", StringType, nullable = true)))) { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("firstfield", StringType, nullable = true), StructField("secondfield", StringType, nullable = true), @@ -268,7 +268,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Merge should fail if the Metastore contains any additional fields that are not // nullable. assert(intercept[Throwable] { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("firstfield", StringType, nullable = true), StructField("secondfield", StringType, nullable = true), http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b0e82c8..2aa80b4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.hive.client._ -import org.apache.spark.sql.parquet.FSBasedParquetRelation +import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources} @@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // serialize the Metastore schema to JSON and pass it as a data source option because of the // evil case insensitivity issue, which is reconciled within `ParquetRelation2`. val parquetOptions = Map( - FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json, - FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString) + ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json, + ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString) val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) @@ -238,7 +238,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss - case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) => + case logical@LogicalRelation(parquetRelation: ParquetRelation2) => // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached = @@ -281,7 +281,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec)) val parquetRelation = cached.getOrElse { val created = LogicalRelation( - new FSBasedParquetRelation( + new ParquetRelation2( paths.toArray, None, Some(partitionSpec), parquetOptions)(hive)) cachedDataSourceTables.put(tableIdentifier, created) created @@ -294,7 +294,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val cached = getCached(tableIdentifier, paths, metastoreSchema, None) val parquetRelation = cached.getOrElse { val created = LogicalRelation( - new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive)) + new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive)) cachedDataSourceTables.put(tableIdentifier, created) created } http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 8e405e0..6609763 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -194,7 +194,7 @@ case class CreateMetastoreDataSourceAsSelect( sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath) val createdRelation = LogicalRelation(resolved.relation) EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match { - case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) => + case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) => if (l.relation != createdRelation.relation) { val errorDescription = s"Cannot append to table $tableName because the resolved relation does not " + http://git-wip-us.apache.org/repos/asf/spark/blob/bcb2c5d1/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index da5d203..1bf1c1b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.parquet.FSBasedParquetRelation +import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -579,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(p: FSBasedParquetRelation) => // OK + case LogicalRelation(p: ParquetRelation2) => // OK case _ => fail( "test_parquet_ctas should be converted to " + - s"${classOf[FSBasedParquetRelation].getCanonicalName}") + s"${classOf[ParquetRelation2].getCanonicalName}") } // Clenup and reset confs. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
