Repository: spark
Updated Branches:
  refs/heads/master 8989d3a39 -> b547de8a6


[SPARK-14116][SQL] Implements buildReader() for ORC data source

## What changes were proposed in this pull request?

This PR implements `FileFormat.buildReader()` for our ORC data source. It also 
fixed several minor styling issues related to `HadoopFsRelation` planning code 
path.

Note that `OrcNewInputFormat` doesn't rely on `OrcNewSplit` for creating 
`OrcRecordReader`s, plain `FileSplit` is just fine. That's why we can simply 
create the record reader with the help of `OrcNewInputFormat` and `FileSplit`.

## How was this patch tested?

Existing test cases should do the work

Author: Cheng Lian <[email protected]>

Closes #11936 from liancheng/spark-14116-build-reader-for-orc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b547de8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b547de8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b547de8a

Branch: refs/heads/master
Commit: b547de8a60074ca25c5bec3a24511f8042bdf0ad
Parents: 8989d3a
Author: Cheng Lian <[email protected]>
Authored: Sat Mar 26 16:10:35 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Sat Mar 26 16:10:35 2016 -0700

----------------------------------------------------------------------
 .../datasources/DataSourceStrategy.scala        |   6 +-
 .../sql/execution/datasources/FileScanRDD.scala |   4 +-
 .../datasources/FileSourceStrategy.scala        |  29 ++-
 .../datasources/parquet/ParquetRelation.scala   |  10 +-
 .../datasources/text/DefaultSource.scala        |   1 -
 .../spark/sql/hive/orc/OrcFileOperator.scala    |   2 -
 .../apache/spark/sql/hive/orc/OrcRelation.scala | 190 +++++++++++++------
 7 files changed, 151 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b547de8a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 4943702..52c8f3e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -208,9 +208,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
 
               val bucketedRDD = new UnionRDD(t.sqlContext.sparkContext,
                 (0 until spec.numBuckets).map { bucketId =>
-                  bucketedDataMap.get(bucketId).getOrElse {
-                    t.sqlContext.emptyResult: RDD[InternalRow]
-                  }
+                  bucketedDataMap.getOrElse(bucketId, 
t.sqlContext.emptyResult: RDD[InternalRow])
                 })
               bucketedRDD
             }
@@ -387,7 +385,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         result.setColumn(resultIdx, input.column(inputIdx))
         inputIdx += 1
       } else {
-        
require(partitionColumnSchema.fields.filter(_.name.equals(attr.name)).length == 
1)
+        require(partitionColumnSchema.fields.count(_.name == attr.name) == 1)
         var partitionIdx = 0
         partitionColumnSchema.fields.foreach { f => {
           if (f.name.equals(attr.name)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b547de8a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index bbe7f4a..988c785 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -32,7 +32,7 @@ case class PartitionedFile(
     filePath: String,
     start: Long,
     length: Long) {
-  override def toString(): String = {
+  override def toString: String = {
     s"path: $filePath, range: $start-${start + length}, partition values: 
$partitionValues"
   }
 }
@@ -44,7 +44,7 @@ case class PartitionedFile(
  *
  * TODO: This currently does not take locality information about the files 
into account.
  */
-case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends 
Partition
+case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends 
Partition
 
 class FileScanRDD(
     @transient val sqlContext: SQLContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/b547de8a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index de89d5f..4b04fec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -29,7 +29,6 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{DataSourceScan, SparkPlan}
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types._
 
 /**
  * A strategy for planning scans over collections of files that might be 
partitioned or bucketed
@@ -56,9 +55,10 @@ import org.apache.spark.sql.types._
  */
 private[sql] object FileSourceStrategy extends Strategy with Logging {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-    case PhysicalOperation(projects, filters, l@LogicalRelation(files: 
HadoopFsRelation, _, _))
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(files: 
HadoopFsRelation, _, _))
       if (files.fileFormat.toString == "TestFileFormat" ||
-         files.fileFormat.isInstanceOf[parquet.DefaultSource]) &&
+         files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
+         files.fileFormat.toString == "ORC") &&
          files.sqlContext.conf.parquetFileScan =>
       // Filters on this relation fall into four categories based on where we 
can use them to avoid
       // reading unneeded data:
@@ -81,10 +81,10 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
       val bucketColumns =
         AttributeSet(
           files.bucketSpec
-              .map(_.bucketColumnNames)
-              .getOrElse(Nil)
-              .map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
-                  .getOrElse(sys.error(""))))
+            .map(_.bucketColumnNames)
+            .getOrElse(Nil)
+            .map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
+              .getOrElse(sys.error(""))))
 
       // Partition keys are not available in the statistics of the files.
       val dataFilters = 
filters.filter(_.references.intersect(partitionSet).isEmpty)
@@ -101,8 +101,8 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
 
       val readDataColumns =
         dataColumns
-            .filter(requiredAttributes.contains)
-            .filterNot(partitionColumns.contains)
+          .filter(requiredAttributes.contains)
+          .filterNot(partitionColumns.contains)
       val prunedDataSchema = readDataColumns.toStructType
       logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}")
 
@@ -120,13 +120,12 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
         case Some(bucketing) if files.sqlContext.conf.bucketingEnabled =>
           logInfo(s"Planning with ${bucketing.numBuckets} buckets")
           val bucketed =
-            selectedPartitions
-                .flatMap { p =>
-                  p.files.map(f => PartitionedFile(p.values, 
f.getPath.toUri.toString, 0, f.getLen))
-                }.groupBy { f =>
+            selectedPartitions.flatMap { p =>
+              p.files.map(f => PartitionedFile(p.values, 
f.getPath.toUri.toString, 0, f.getLen))
+            }.groupBy { f =>
               BucketingUtils
-                  .getBucketId(new Path(f.filePath).getName)
-                  .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
+                .getBucketId(new Path(f.filePath).getName)
+                .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
             }
 
           (0 until bucketing.numBuckets).map { bucketId =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b547de8a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 2f2d438..d6b84be 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -321,11 +321,11 @@ private[sql] class DefaultSource
     // Try to push down filters when filter push-down is enabled.
     val pushed = if 
(sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
       filters
-          // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
-          // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
-          // is used here.
-          .flatMap(ParquetFilters.createFilter(dataSchema, _))
-          .reduceOption(FilterApi.and)
+        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+        // is used here.
+        .flatMap(ParquetFilters.createFilter(dataSchema, _))
+        .reduceOption(FilterApi.and)
     } else {
       None
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b547de8a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 6af403d..5cfc9e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.text
 
-import com.google.common.base.Objects
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
 import org.apache.hadoop.mapred.{JobConf, TextInputFormat}

http://git-wip-us.apache.org/repos/asf/spark/blob/b547de8a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 0195466..8248a11 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -24,7 +24,6 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.hive.HiveMetastoreTypes
 import org.apache.spark.sql.types.StructType
 
@@ -92,7 +91,6 @@ private[orc] object OrcFileOperator extends Logging {
     // TODO: Check if the paths coming in are already qualified and simplify.
     val origPath = new Path(pathStr)
     val fs = origPath.getFileSystem(conf)
-    val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
     val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
       .filterNot(_.isDirectory)
       .map(_.getPath)

http://git-wip-us.apache.org/repos/asf/spark/blob/b547de8a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index b5dc910..7c4a0a0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive.orc
 
+import java.net.URI
 import java.util.Properties
 
 import org.apache.hadoop.conf.Configuration
@@ -24,12 +25,13 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.io.orc._
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
-import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
+import 
org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, 
StructObjectInspector}
 import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
 import org.apache.hadoop.io.{NullWritable, Writable}
 import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, 
OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
@@ -37,6 +39,7 @@ import org.apache.spark.rdd.{HadoopRDD, RDD}
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim}
 import org.apache.spark.sql.sources.{Filter, _}
@@ -44,7 +47,8 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 import org.apache.spark.util.collection.BitSet
 
-private[sql] class DefaultSource extends FileFormat with DataSourceRegister {
+private[sql] class DefaultSource
+  extends FileFormat with DataSourceRegister with Serializable {
 
   override def shortName(): String = "orc"
 
@@ -55,7 +59,9 @@ private[sql] class DefaultSource extends FileFormat with 
DataSourceRegister {
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
     OrcFileOperator.readSchema(
-      files.map(_.getPath.toUri.toString), 
Some(sqlContext.sparkContext.hadoopConfiguration))
+      files.map(_.getPath.toUri.toString),
+      Some(sqlContext.sparkContext.hadoopConfiguration)
+    )
   }
 
   override def prepareWrite(
@@ -80,8 +86,8 @@ private[sql] class DefaultSource extends FileFormat with 
DataSourceRegister {
       job.getConfiguration.set(
         OrcTableProperties.COMPRESSION.getPropName,
         OrcRelation
-            .shortOrcCompressionCodecNames
-            .getOrElse(codecName, CompressionKind.NONE).name())
+          .shortOrcCompressionCodecNames
+          .getOrElse(codecName, CompressionKind.NONE).name())
     }
 
     job.getConfiguration match {
@@ -117,6 +123,68 @@ private[sql] class DefaultSource extends FileFormat with 
DataSourceRegister {
     val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
     OrcTableScan(sqlContext, output, filters, inputFiles).execute()
   }
+
+  override def buildReader(
+      sqlContext: SQLContext,
+      partitionSchema: StructType,
+      dataSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String]): (PartitionedFile) => 
Iterator[InternalRow] = {
+    val orcConf = new 
Configuration(sqlContext.sparkContext.hadoopConfiguration)
+
+    if (sqlContext.conf.orcFilterPushDown) {
+      // Sets pushed predicates
+      OrcFilters.createFilter(filters.toArray).foreach { f =>
+        orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
+        orcConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
+      }
+    }
+
+    val broadcastedConf = sqlContext.sparkContext.broadcast(new 
SerializableConfiguration(orcConf))
+
+    (file: PartitionedFile) => {
+      val conf = broadcastedConf.value.value
+
+      // SPARK-8501: Empty ORC files always have an empty schema stored in 
their footer.  In this
+      // case, `OrcFileOperator.readSchema` returns `None`, and we can simply 
return an empty
+      // iterator.
+      val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), 
Some(conf))
+      if (maybePhysicalSchema.isEmpty) {
+        Iterator.empty
+      } else {
+        val physicalSchema = maybePhysicalSchema.get
+        OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema)
+
+        val orcRecordReader = {
+          val job = Job.getInstance(conf)
+          FileInputFormat.setInputPaths(job, file.filePath)
+
+          val inputFormat = new OrcNewInputFormat
+          val fileSplit = new FileSplit(
+            new Path(new URI(file.filePath)), file.start, file.length, 
Array.empty
+          )
+
+          val attemptId = new TaskAttemptID(new TaskID(new JobID(), 
TaskType.MAP, 0), 0)
+          val hadoopAttemptContext = new TaskAttemptContextImpl(conf, 
attemptId)
+          inputFormat.createRecordReader(fileSplit, hadoopAttemptContext)
+        }
+
+        // Unwraps `OrcStruct`s to `UnsafeRow`s
+        val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
+          file.filePath, conf, dataSchema, new 
RecordReaderIterator[OrcStruct](orcRecordReader)
+        )
+
+        // Appends partition values
+        val fullOutput = dataSchema.toAttributes ++ 
partitionSchema.toAttributes
+        val joinedRow = new JoinedRow()
+        val appendPartitionColumns = 
GenerateUnsafeProjection.generate(fullOutput, fullOutput)
+
+        unsafeRowIterator.map { dataRow =>
+          appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
+        }
+      }
+    }
+  }
 }
 
 private[orc] class OrcOutputWriter(
@@ -225,55 +293,6 @@ private[orc] case class OrcTableScan(
   extends Logging
   with HiveInspectors {
 
-  private def addColumnIds(
-      dataSchema: StructType,
-      output: Seq[Attribute],
-      conf: Configuration): Unit = {
-    val ids = output.map(a => dataSchema.fieldIndex(a.name): Integer)
-    val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip
-    HiveShim.appendReadColumns(conf, sortedIds, sortedNames)
-  }
-
-  // Transform all given raw `Writable`s into `InternalRow`s.
-  private def fillObject(
-      path: String,
-      conf: Configuration,
-      iterator: Iterator[Writable],
-      nonPartitionKeyAttrs: Seq[Attribute]): Iterator[InternalRow] = {
-    val deserializer = new OrcSerde
-    val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
-    val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
-    val unsafeProjection = 
UnsafeProjection.create(StructType.fromAttributes(nonPartitionKeyAttrs))
-
-    // SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if 
the file contains zero
-    // rows, and thus couldn't give a proper ObjectInspector.  In this case we 
just return an empty
-    // partition since we know that this file is empty.
-    maybeStructOI.map { soi =>
-      val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.zipWithIndex.map {
-        case (attr, ordinal) =>
-          soi.getStructFieldRef(attr.name) -> ordinal
-      }.unzip
-      val unwrappers = fieldRefs.map(unwrapperFor)
-      // Map each tuple to a row object
-      iterator.map { value =>
-        val raw = deserializer.deserialize(value)
-        var i = 0
-        while (i < fieldRefs.length) {
-          val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
-          if (fieldValue == null) {
-            mutableRow.setNullAt(fieldOrdinals(i))
-          } else {
-            unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
-          }
-          i += 1
-        }
-        unsafeProjection(mutableRow)
-      }
-    }.getOrElse {
-      Iterator.empty
-    }
-  }
-
   def execute(): RDD[InternalRow] = {
     val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
     val conf = job.getConfiguration
@@ -291,10 +310,10 @@ private[orc] case class OrcTableScan(
     val orcFormat = new DefaultSource
     val dataSchema =
       orcFormat
-          .inferSchema(sqlContext, Map.empty, inputPaths)
-          .getOrElse(sys.error("Failed to read schema from target ORC files."))
+        .inferSchema(sqlContext, Map.empty, inputPaths)
+        .getOrElse(sys.error("Failed to read schema from target ORC files."))
     // Sets requested columns
-    addColumnIds(dataSchema, attributes, conf)
+    OrcRelation.setRequiredColumns(conf, dataSchema, 
StructType.fromAttributes(attributes))
 
     if (inputPaths.isEmpty) {
       // the input path probably be pruned, return an empty RDD.
@@ -317,7 +336,12 @@ private[orc] case class OrcTableScan(
 
     rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
       val writableIterator = iterator.map(_._2)
-      fillObject(split.getPath.toString, wrappedConf.value, writableIterator, 
attributes)
+      OrcRelation.unwrapOrcStructs(
+        split.getPath.toString,
+        wrappedConf.value,
+        StructType.fromAttributes(attributes),
+        writableIterator
+      )
     }
   }
 }
@@ -327,7 +351,7 @@ private[orc] object OrcTableScan {
   private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
 }
 
-private[orc] object OrcRelation {
+private[orc] object OrcRelation extends HiveInspectors {
   // The ORC compression short names
   val shortOrcCompressionCodecNames = Map(
     "none" -> CompressionKind.NONE,
@@ -343,5 +367,47 @@ private[orc] object OrcRelation {
     CompressionKind.ZLIB.name -> ".zlib",
     CompressionKind.LZO.name -> ".lzo"
   )
-}
 
+  def unwrapOrcStructs(
+      filePath: String,
+      conf: Configuration,
+      dataSchema: StructType,
+      iterator: Iterator[Writable]): Iterator[InternalRow] = {
+    val deserializer = new OrcSerde
+    val maybeStructOI = OrcFileOperator.getObjectInspector(filePath, 
Some(conf))
+    val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType))
+    val unsafeProjection = UnsafeProjection.create(dataSchema)
+
+    def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = {
+      val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map {
+        case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal
+      }.unzip
+
+      val unwrappers = fieldRefs.map(unwrapperFor)
+
+      iterator.map { value =>
+        val raw = deserializer.deserialize(value)
+        var i = 0
+        while (i < fieldRefs.length) {
+          val fieldValue = oi.getStructFieldData(raw, fieldRefs(i))
+          if (fieldValue == null) {
+            mutableRow.setNullAt(fieldOrdinals(i))
+          } else {
+            unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
+          }
+          i += 1
+        }
+        unsafeProjection(mutableRow)
+      }
+    }
+
+    maybeStructOI.map(unwrap).getOrElse(Iterator.empty)
+  }
+
+  def setRequiredColumns(
+      conf: Configuration, physicalSchema: StructType, requestedSchema: 
StructType): Unit = {
+    val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): 
Integer)
+    val (sortedIDs, sortedNames) = 
ids.zip(requestedSchema.fieldNames).sorted.unzip
+    HiveShim.appendReadColumns(conf, sortedIDs, sortedNames)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to