Repository: spark
Updated Branches:
  refs/heads/master dfc9fc02c -> 361ebc282


http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
deleted file mode 100644
index 38f50c1..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.orc
-
-import java.net.URI
-import java.util.Properties
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.io.orc._
-import 
org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, 
StructObjectInspector}
-import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
-import org.apache.hadoop.io.{NullWritable, Writable}
-import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, 
OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.{HadoopRDD, RDD}
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
-import org.apache.spark.sql.sources.{Filter, _}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
-
-private[sql] class DefaultSource
-  extends FileFormat with DataSourceRegister with Serializable {
-
-  override def shortName(): String = "orc"
-
-  override def toString: String = "ORC"
-
-  override def inferSchema(
-      sparkSession: SparkSession,
-      options: Map[String, String],
-      files: Seq[FileStatus]): Option[StructType] = {
-    OrcFileOperator.readSchema(
-      files.map(_.getPath.toUri.toString),
-      Some(sparkSession.sessionState.newHadoopConf())
-    )
-  }
-
-  override def prepareWrite(
-      sparkSession: SparkSession,
-      job: Job,
-      options: Map[String, String],
-      dataSchema: StructType): OutputWriterFactory = {
-    val orcOptions = new OrcOptions(options)
-
-    val configuration = job.getConfiguration
-
-    configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec)
-    configuration match {
-      case conf: JobConf =>
-        conf.setOutputFormat(classOf[OrcOutputFormat])
-      case conf =>
-        conf.setClass(
-          "mapred.output.format.class",
-          classOf[OrcOutputFormat],
-          classOf[MapRedOutputFormat[_, _]])
-    }
-
-    new OutputWriterFactory {
-      override def newInstance(
-          path: String,
-          bucketId: Option[Int],
-          dataSchema: StructType,
-          context: TaskAttemptContext): OutputWriter = {
-        new OrcOutputWriter(path, bucketId, dataSchema, context)
-      }
-    }
-  }
-
-  override def buildReader(
-      sparkSession: SparkSession,
-      dataSchema: StructType,
-      partitionSchema: StructType,
-      requiredSchema: StructType,
-      filters: Seq[Filter],
-      options: Map[String, String],
-      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
-    if (sparkSession.sessionState.conf.orcFilterPushDown) {
-      // Sets pushed predicates
-      OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f =>
-        hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
-        hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
-      }
-    }
-
-    val broadcastedHadoopConf =
-      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
-
-    (file: PartitionedFile) => {
-      val conf = broadcastedHadoopConf.value.value
-
-      // SPARK-8501: Empty ORC files always have an empty schema stored in 
their footer. In this
-      // case, `OrcFileOperator.readSchema` returns `None`, and we can't read 
the underlying file
-      // using the given physical schema. Instead, we simply return an empty 
iterator.
-      val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), 
Some(conf))
-      if (maybePhysicalSchema.isEmpty) {
-        Iterator.empty
-      } else {
-        val physicalSchema = maybePhysicalSchema.get
-        OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
-
-        val orcRecordReader = {
-          val job = Job.getInstance(conf)
-          FileInputFormat.setInputPaths(job, file.filePath)
-
-          val fileSplit = new FileSplit(
-            new Path(new URI(file.filePath)), file.start, file.length, 
Array.empty
-          )
-          // Custom OrcRecordReader is used to get
-          // ObjectInspector during recordReader creation itself and can
-          // avoid NameNode call in unwrapOrcStructs per file.
-          // Specifically would be helpful for partitioned datasets.
-          val orcReader = OrcFile.createReader(
-            new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
-          new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, 
fileSplit.getLength)
-        }
-
-        // Unwraps `OrcStruct`s to `UnsafeRow`s
-        OrcRelation.unwrapOrcStructs(
-          conf,
-          requiredSchema,
-          
Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
-          new RecordReaderIterator[OrcStruct](orcRecordReader))
-      }
-    }
-  }
-}
-
-private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration)
-  extends HiveInspectors {
-
-  def serialize(row: InternalRow): Writable = {
-    wrapOrcStruct(cachedOrcStruct, structOI, row)
-    serializer.serialize(cachedOrcStruct, structOI)
-  }
-
-  private[this] val serializer = {
-    val table = new Properties()
-    table.setProperty("columns", dataSchema.fieldNames.mkString(","))
-    table.setProperty("columns.types", 
dataSchema.map(_.dataType.catalogString).mkString(":"))
-
-    val serde = new OrcSerde
-    serde.initialize(conf, table)
-    serde
-  }
-
-  // Object inspector converted from the schema of the relation to be 
serialized.
-  private[this] val structOI = {
-    val typeInfo = 
TypeInfoUtils.getTypeInfoFromTypeString(dataSchema.catalogString)
-    OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
-      .asInstanceOf[SettableStructObjectInspector]
-  }
-
-  private[this] val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
-
-  private[this] def wrapOrcStruct(
-      struct: OrcStruct,
-      oi: SettableStructObjectInspector,
-      row: InternalRow): Unit = {
-    val fieldRefs = oi.getAllStructFieldRefs
-    var i = 0
-    while (i < fieldRefs.size) {
-
-      oi.setStructFieldData(
-        struct,
-        fieldRefs.get(i),
-        wrap(
-          row.get(i, dataSchema(i).dataType),
-          fieldRefs.get(i).getFieldObjectInspector,
-          dataSchema(i).dataType))
-      i += 1
-    }
-  }
-}
-
-private[orc] class OrcOutputWriter(
-    path: String,
-    bucketId: Option[Int],
-    dataSchema: StructType,
-    context: TaskAttemptContext)
-  extends OutputWriter {
-
-  private[this] val conf = context.getConfiguration
-
-  private[this] val serializer = new OrcSerializer(dataSchema, conf)
-
-  // `OrcRecordWriter.close()` creates an empty file if no rows are written at 
all.  We use this
-  // flag to decide whether `OrcRecordWriter.close()` needs to be called.
-  private var recordWriterInstantiated = false
-
-  private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
-    recordWriterInstantiated = true
-    val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
-    val taskAttemptId = context.getTaskAttemptID
-    val partition = taskAttemptId.getTaskID.getId
-    val bucketString = 
bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
-    val compressionExtension = {
-      val name = conf.get(OrcRelation.ORC_COMPRESSION)
-      OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
-    }
-    // It has the `.orc` extension at the end because (de)compression tools
-    // such as gunzip would not be able to decompress this as the compression
-    // is not applied on this whole file but on each "stream" in ORC format.
-    val filename = 
f"part-r-$partition%05d-$uniqueWriteJobId$bucketString$compressionExtension.orc"
-
-    new OrcOutputFormat().getRecordWriter(
-      new Path(path, filename).getFileSystem(conf),
-      conf.asInstanceOf[JobConf],
-      new Path(path, filename).toString,
-      Reporter.NULL
-    ).asInstanceOf[RecordWriter[NullWritable, Writable]]
-  }
-
-  override def write(row: Row): Unit =
-    throw new UnsupportedOperationException("call writeInternal")
-
-  override protected[sql] def writeInternal(row: InternalRow): Unit = {
-    recordWriter.write(NullWritable.get(), serializer.serialize(row))
-  }
-
-  override def close(): Unit = {
-    if (recordWriterInstantiated) {
-      recordWriter.close(Reporter.NULL)
-    }
-  }
-}
-
-private[orc] case class OrcTableScan(
-    @transient sparkSession: SparkSession,
-    attributes: Seq[Attribute],
-    filters: Array[Filter],
-    @transient inputPaths: Seq[FileStatus])
-  extends Logging
-  with HiveInspectors {
-
-  def execute(): RDD[InternalRow] = {
-    val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
-    val conf = job.getConfiguration
-
-    // Figure out the actual schema from the ORC source (without partition 
columns) so that we
-    // can pick the correct ordinals.  Note that this assumes that all files 
have the same schema.
-    val orcFormat = new DefaultSource
-    val dataSchema =
-      orcFormat
-        .inferSchema(sparkSession, Map.empty, inputPaths)
-        .getOrElse(sys.error("Failed to read schema from target ORC files."))
-
-    // Tries to push down filters if ORC filter push-down is enabled
-    if (sparkSession.sessionState.conf.orcFilterPushDown) {
-      OrcFilters.createFilter(dataSchema, filters).foreach { f =>
-        conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
-        conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
-      }
-    }
-
-    // Sets requested columns
-    OrcRelation.setRequiredColumns(conf, dataSchema, 
StructType.fromAttributes(attributes))
-
-    if (inputPaths.isEmpty) {
-      // the input path probably be pruned, return an empty RDD.
-      return sparkSession.sparkContext.emptyRDD[InternalRow]
-    }
-    FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*)
-
-    val inputFormatClass =
-      classOf[OrcInputFormat]
-        .asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]]
-
-    val rdd = sparkSession.sparkContext.hadoopRDD(
-      conf.asInstanceOf[JobConf],
-      inputFormatClass,
-      classOf[NullWritable],
-      classOf[Writable]
-    ).asInstanceOf[HadoopRDD[NullWritable, Writable]]
-
-    val wrappedConf = new SerializableConfiguration(conf)
-
-    rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
-      val writableIterator = iterator.map(_._2)
-      val maybeStructOI = 
OrcFileOperator.getObjectInspector(split.getPath.toString, Some(conf))
-      OrcRelation.unwrapOrcStructs(
-        wrappedConf.value,
-        StructType.fromAttributes(attributes),
-        maybeStructOI,
-        writableIterator
-      )
-    }
-  }
-}
-
-private[orc] object OrcTableScan {
-  // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is 
unfortunately not public.
-  private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
-}
-
-private[orc] object OrcRelation extends HiveInspectors {
-  // The references of Hive's classes will be minimized.
-  val ORC_COMPRESSION = "orc.compress"
-
-  // The extensions for ORC compression codecs
-  val extensionsForCompressionCodecNames = Map(
-    "NONE" -> "",
-    "SNAPPY" -> ".snappy",
-    "ZLIB" -> ".zlib",
-    "LZO" -> ".lzo")
-
-  def unwrapOrcStructs(
-      conf: Configuration,
-      dataSchema: StructType,
-      maybeStructOI: Option[StructObjectInspector],
-      iterator: Iterator[Writable]): Iterator[InternalRow] = {
-    val deserializer = new OrcSerde
-    val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType))
-    val unsafeProjection = UnsafeProjection.create(dataSchema)
-
-    def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = {
-      val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map {
-        case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal
-      }.unzip
-
-      val unwrappers = fieldRefs.map(unwrapperFor)
-
-      iterator.map { value =>
-        val raw = deserializer.deserialize(value)
-        var i = 0
-        while (i < fieldRefs.length) {
-          val fieldValue = oi.getStructFieldData(raw, fieldRefs(i))
-          if (fieldValue == null) {
-            mutableRow.setNullAt(fieldOrdinals(i))
-          } else {
-            unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
-          }
-          i += 1
-        }
-        unsafeProjection(mutableRow)
-      }
-    }
-
-    maybeStructOI.map(unwrap).getOrElse(Iterator.empty)
-  }
-
-  def setRequiredColumns(
-      conf: Configuration, physicalSchema: StructType, requestedSchema: 
StructType): Unit = {
-    val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): 
Integer)
-    val (sortedIDs, sortedNames) = 
ids.zip(requestedSchema.fieldNames).sorted.unzip
-    HiveShim.appendReadColumns(conf, sortedIDs, sortedNames)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/361ebc28/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 0207b4e..5dfa58f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
 class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
   import testImplicits._
 
-  override val dataSourceName: String = classOf[DefaultSource].getCanonicalName
+  override val dataSourceName: String = classOf[OrcFileFormat].getCanonicalName
 
   // ORC does not play well with NullType and UDT.
   override protected def supportsDataType(dataType: DataType): Boolean = 
dataType match {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to