Repository: spark
Updated Branches:
  refs/heads/master a884daad8 -> 1e8861598


[SPARK-14070][SQL] Use ORC data source for SQL queries on ORC tables

## What changes were proposed in this pull request?

This patch enables use of OrcRelation for SQL queries which read data from Hive 
tables. Changes in this patch:

- Added a new rule `OrcConversions` which would alter the plan to use 
`OrcRelation`. In this diff, the conversion is done only for reads.
- Added a new config `spark.sql.hive.convertMetastoreOrc` to control the 
conversion

BEFORE

```
scala>  hqlContext.sql("SELECT * FROM orc_table").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias(*, None)]
+- 'UnresolvedRelation `orc_table`, None

== Analyzed Logical Plan ==
key: string, value: string
Project [key#171,value#172]
+- MetastoreRelation default, orc_table, None

== Optimized Logical Plan ==
MetastoreRelation default, orc_table, None

== Physical Plan ==
HiveTableScan [key#171,value#172], MetastoreRelation default, orc_table, None
```

AFTER

```
scala> hqlContext.sql("SELECT * FROM orc_table").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias(*, None)]
+- 'UnresolvedRelation `orc_table`, None

== Analyzed Logical Plan ==
key: string, value: string
Project [key#76,value#77]
+- SubqueryAlias orc_table
   +- Relation[key#76,value#77] ORC part: struct<>, data: 
struct<key:string,value:string>

== Optimized Logical Plan ==
Relation[key#76,value#77] ORC part: struct<>, data: 
struct<key:string,value:string>

== Physical Plan ==
WholeStageCodegen
:  +- Scan ORC part: struct<>, data: 
struct<key:string,value:string>[key#76,value#77] InputPaths: 
file:/user/hive/warehouse/orc_table
```

## How was this patch tested?

- Added a new unit test. Ran existing unit tests
- Ran with production like data

## Performance gains

Ran on a production table in Facebook (note that the data was in DWRF file 
format which is similar to ORC)

Best case : when there was no matching rows for the predicate in the query 
(everything is filtered out)

```
                      CPU time          Wall time     Total wall time across 
all tasks
================================================================
Without the change   541_515 sec    25.0 mins    165.8 hours
With change              407 sec       1.5 mins     15 mins
```

Average case: A subset of rows in the data match the query predicate

```
                        CPU time        Wall time     Total wall time across 
all tasks
================================================================
Without the change   624_630 sec     31.0 mins    199.0 h
With change           14_769 sec      5.3 mins      7.7 h
```

Author: Tejas Patil <[email protected]>

Closes #11891 from tejasapatil/orc_ppd.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e886159
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e886159
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e886159

Branch: refs/heads/master
Commit: 1e886159849e3918445d3fdc3c4cef86c6c1a236
Parents: a884daa
Author: Tejas Patil <[email protected]>
Authored: Fri Apr 1 13:13:16 2016 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Fri Apr 1 13:13:16 2016 -0700

----------------------------------------------------------------------
 .../hive/execution/HiveCompatibilitySuite.scala |   8 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |  12 +
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 234 +++++++++++++------
 .../spark/sql/hive/HiveSessionCatalog.scala     |   1 +
 .../spark/sql/hive/HiveSessionState.scala       |   1 +
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |  39 ++++
 6 files changed, 220 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e886159/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 695b5ef..d8695bc 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.hive.execution
 import java.io.File
 import java.util.{Locale, TimeZone}
 
-import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.internal.SQLConf
 
@@ -38,6 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
   private val originalLocale = Locale.getDefault
   private val originalColumnBatchSize = TestHive.conf.columnBatchSize
   private val originalInMemoryPartitionPruning = 
TestHive.conf.inMemoryPartitionPruning
+  private val originalConvertMetastoreOrc = TestHive.convertMetastoreOrc
 
   def testCases: Seq[(String, File)] = {
     hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
@@ -56,6 +58,9 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
     TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
     // Use Hive hash expression instead of the native one
     TestHive.sessionState.functionRegistry.unregisterFunction("hash")
+    // Ensures that the plans generation use metastore relation and not 
OrcRelation
+    // Was done because SqlBuilder does not work with plans having logical 
relation
+    TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, false)
     RuleExecutor.resetTime()
   }
 
@@ -66,6 +71,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
       Locale.setDefault(originalLocale)
       TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
       TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, 
originalInMemoryPartitionPruning)
+      TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, 
originalConvertMetastoreOrc)
       TestHive.sessionState.functionRegistry.restore()
 
       // For debugging dump some statistics about how much time was spent in 
various optimizer rules.

http://git-wip-us.apache.org/repos/asf/spark/blob/1e886159/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index c0b6d16..073b954 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -155,6 +155,13 @@ class HiveContext private[hive](
     getConf(CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
 
   /**
+   * When true, enables an experimental feature where metastore tables that 
use the Orc SerDe
+   * are automatically converted to use the Spark SQL ORC table scan, instead 
of the Hive
+   * SerDe.
+   */
+  protected[sql] def convertMetastoreOrc: Boolean = 
getConf(CONVERT_METASTORE_ORC)
+
+  /**
    * When true, a table created by a Hive CTAS statement (no USING clause) 
will be
    * converted to a data source table, using the data source set by 
spark.sql.sources.default.
    * The table in CTAS statement will be converted when it meets any of the 
following conditions:
@@ -442,6 +449,11 @@ private[hive] object HiveContext extends Logging {
       "different Parquet data files. This configuration is only effective " +
       "when \"spark.sql.hive.convertMetastoreParquet\" is true.")
 
+  val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc",
+    defaultValue = Some(true),
+    doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables 
instead of " +
+      "the built in support.")
+
   val CONVERT_CTAS = booleanConf("spark.sql.hive.convertCTAS",
     defaultValue = Some(false),
     doc = "When true, a table created by a Hive CTAS statement (no USING 
clause) will be " +

http://git-wip-us.apache.org/repos/asf/spark/blob/1e886159/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2cdc931..14f3319 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -40,12 +40,13 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.{datasources, FileRelation}
+import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource, 
ParquetRelation}
+import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => 
ParquetDefaultSource, ParquetRelation}
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
-import org.apache.spark.sql.sources.{HadoopFsRelation, HDFSFileCatalog}
+import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
+import org.apache.spark.sql.sources.{FileFormat, HadoopFsRelation, 
HDFSFileCatalog}
 import org.apache.spark.sql.types._
 
 private[hive] case class HiveSerDe(
@@ -451,58 +452,72 @@ private[hive] class HiveMetastoreCatalog(val client: 
HiveClient, hive: HiveConte
     }
   }
 
-  private def convertToParquetRelation(metastoreRelation: MetastoreRelation): 
LogicalRelation = {
-    val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
-    val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
-
-    val parquetOptions = Map(
-      ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
-      ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
-        metastoreRelation.tableName,
-        Some(metastoreRelation.databaseName)
-      ).unquotedString
-    )
-    val tableIdentifier =
-      QualifiedTableName(metastoreRelation.databaseName, 
metastoreRelation.tableName)
-
-    def getCached(
-        tableIdentifier: QualifiedTableName,
-        pathsInMetastore: Seq[String],
-        schemaInMetastore: StructType,
-        partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
-      cachedDataSourceTables.getIfPresent(tableIdentifier) match {
-        case null => None // Cache miss
-        case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, 
_) =>
-          // If we have the same paths, same schema, and same partition spec,
-          // we will use the cached Parquet Relation.
-          val useCached =
-            parquetRelation.location.paths.map(_.toString).toSet == 
pathsInMetastore.toSet &&
-            logical.schema.sameType(metastoreSchema) &&
-            parquetRelation.partitionSpec == 
partitionSpecInMetastore.getOrElse {
-              PartitionSpec(StructType(Nil), 
Array.empty[datasources.PartitionDirectory])
+  private def getCached(
+      tableIdentifier: QualifiedTableName,
+      metastoreRelation: MetastoreRelation,
+      schemaInMetastore: StructType,
+      expectedFileFormat: Class[_ <: FileFormat],
+      expectedBucketSpec: Option[BucketSpec],
+      partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
+
+    cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+      case null => None // Cache miss
+      case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
+        val pathsInMetastore = 
metastoreRelation.table.storage.locationUri.toSeq
+        val cachedRelationFileFormatClass = relation.fileFormat.getClass
+
+        expectedFileFormat match {
+          case `cachedRelationFileFormatClass` =>
+            // If we have the same paths, same schema, and same partition spec,
+            // we will use the cached relation.
+            val useCached =
+              relation.location.paths.map(_.toString).toSet == 
pathsInMetastore.toSet &&
+                logical.schema.sameType(schemaInMetastore) &&
+                relation.bucketSpec == expectedBucketSpec &&
+                relation.partitionSpec == partitionSpecInMetastore.getOrElse {
+                  PartitionSpec(StructType(Nil), 
Array.empty[PartitionDirectory])
+                }
+
+            if (useCached) {
+              Some(logical)
+            } else {
+              // If the cached relation is not updated, we invalidate it right 
away.
+              cachedDataSourceTables.invalidate(tableIdentifier)
+              None
             }
-
-          if (useCached) {
-            Some(logical)
-          } else {
-            // If the cached relation is not updated, we invalidate it right 
away.
+          case _ =>
+            logWarning(
+              
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " +
+                s"should be stored as $expectedFileFormat. However, we are 
getting " +
+                s"a ${relation.fileFormat} from the metastore cache. This 
cached " +
+                s"entry will be invalidated.")
             cachedDataSourceTables.invalidate(tableIdentifier)
             None
-          }
-        case other =>
-          logWarning(
-            s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} 
should be stored " +
-              s"as Parquet. However, we are getting a $other from the 
metastore cache. " +
-              s"This cached entry will be invalidated.")
-          cachedDataSourceTables.invalidate(tableIdentifier)
-          None
-      }
+        }
+      case other =>
+        logWarning(
+          s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} 
should be stored " +
+            s"as $expectedFileFormat. However, we are getting a $other from 
the metastore cache. " +
+            s"This cached entry will be invalidated.")
+        cachedDataSourceTables.invalidate(tableIdentifier)
+        None
     }
+  }
+
+  private def convertToLogicalRelation(metastoreRelation: MetastoreRelation,
+                                       options: Map[String, String],
+                                       defaultSource: FileFormat,
+                                       fileFormatClass: Class[_ <: FileFormat],
+                                       fileType: String): LogicalRelation = {
+    val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
+    val tableIdentifier =
+      QualifiedTableName(metastoreRelation.databaseName, 
metastoreRelation.tableName)
+    val bucketSpec = None  // We don't support hive bucketed tables, only ones 
we write out.
 
     val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
       val partitionSchema = 
StructType.fromAttributes(metastoreRelation.partitionKeys)
       val partitionColumnDataTypes = partitionSchema.map(_.dataType)
-      // We're converting the entire table into ParquetRelation, so predicates 
to Hive metastore
+      // We're converting the entire table into HadoopFsRelation, so 
predicates to Hive metastore
       // are empty.
       val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
         val location = p.getLocation
@@ -515,54 +530,65 @@ private[hive] class HiveMetastoreCatalog(val client: 
HiveClient, hive: HiveConte
 
       val cached = getCached(
         tableIdentifier,
-        metastoreRelation.table.storage.locationUri.toSeq,
+        metastoreRelation,
         metastoreSchema,
+        fileFormatClass,
+        bucketSpec,
         Some(partitionSpec))
 
-      val parquetRelation = cached.getOrElse {
+      val hadoopFsRelation = cached.getOrElse {
         val paths = new Path(metastoreRelation.table.storage.locationUri.get) 
:: Nil
         val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
-        val format = new DefaultSource()
-        val inferredSchema = format.inferSchema(hive, parquetOptions, 
fileCatalog.allFiles())
 
-        val mergedSchema = inferredSchema.map { inferred =>
-          ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, 
inferred)
-        }.getOrElse(metastoreSchema)
+        val inferredSchema = if (fileType.equals("parquet")) {
+          val inferredSchema = defaultSource.inferSchema(hive, options, 
fileCatalog.allFiles())
+          inferredSchema.map { inferred =>
+            ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, 
inferred)
+          }.getOrElse(metastoreSchema)
+        } else {
+          defaultSource.inferSchema(hive, options, fileCatalog.allFiles()).get
+        }
 
         val relation = HadoopFsRelation(
           sqlContext = hive,
           location = fileCatalog,
           partitionSchema = partitionSchema,
-          dataSchema = mergedSchema,
-          bucketSpec = None, // We don't support hive bucketed tables, only 
ones we write out.
-          fileFormat = new DefaultSource(),
-          options = parquetOptions)
+          dataSchema = inferredSchema,
+          bucketSpec = bucketSpec,
+          fileFormat = defaultSource,
+          options = options)
 
         val created = LogicalRelation(relation)
         cachedDataSourceTables.put(tableIdentifier, created)
         created
       }
 
-      parquetRelation
+      hadoopFsRelation
     } else {
       val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
 
-      val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
-      val parquetRelation = cached.getOrElse {
+      val cached = getCached(tableIdentifier,
+        metastoreRelation,
+        metastoreSchema,
+        fileFormatClass,
+        bucketSpec,
+        None)
+      val logicalRelation = cached.getOrElse {
         val created =
           LogicalRelation(
             DataSource(
               sqlContext = hive,
               paths = paths,
               userSpecifiedSchema = Some(metastoreRelation.schema),
-              options = parquetOptions,
-              className = "parquet").resolveRelation())
+              bucketSpec = bucketSpec,
+              options = options,
+              className = fileType).resolveRelation())
 
         cachedDataSourceTables.put(tableIdentifier, created)
         created
       }
 
-      parquetRelation
+      logicalRelation
     }
     result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
   }
@@ -572,6 +598,27 @@ private[hive] class HiveMetastoreCatalog(val client: 
HiveClient, hive: HiveConte
    * data source relations for better performance.
    */
   object ParquetConversions extends Rule[LogicalPlan] {
+    private def shouldConvertMetastoreParquet(relation: MetastoreRelation): 
Boolean = {
+      relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") &&
+        hive.convertMetastoreParquet
+    }
+
+    private def convertToParquetRelation(relation: MetastoreRelation): 
LogicalRelation = {
+      val defaultSource = new ParquetDefaultSource()
+      val fileFormatClass = classOf[ParquetDefaultSource]
+
+      val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
+      val options = Map(
+        ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
+        ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
+          relation.tableName,
+          Some(relation.databaseName)
+        ).unquotedString
+      )
+
+      convertToLogicalRelation(relation, options, defaultSource, 
fileFormatClass, "parquet")
+    }
+
     override def apply(plan: LogicalPlan): LogicalPlan = {
       if (!plan.resolved || plan.analyzed) {
         return plan
@@ -581,22 +628,17 @@ private[hive] class HiveMetastoreCatalog(val client: 
HiveClient, hive: HiveConte
         // Write path
         case InsertIntoTable(r: MetastoreRelation, partition, child, 
overwrite, ifNotExists)
           // Inserting into partitioned table is not supported in Parquet data 
source (yet).
-          if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
-            r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
-          val parquetRelation = convertToParquetRelation(r)
-          InsertIntoTable(parquetRelation, partition, child, overwrite, 
ifNotExists)
+          if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) 
=>
+          InsertIntoTable(convertToParquetRelation(r), partition, child, 
overwrite, ifNotExists)
 
         // Write path
         case InsertIntoHiveTable(r: MetastoreRelation, partition, child, 
overwrite, ifNotExists)
           // Inserting into partitioned table is not supported in Parquet data 
source (yet).
-          if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
-            r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
-          val parquetRelation = convertToParquetRelation(r)
-          InsertIntoTable(parquetRelation, partition, child, overwrite, 
ifNotExists)
+          if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) 
=>
+          InsertIntoTable(convertToParquetRelation(r), partition, child, 
overwrite, ifNotExists)
 
         // Read path
-        case relation: MetastoreRelation if hive.convertMetastoreParquet &&
-          relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") 
=>
+        case relation: MetastoreRelation if 
shouldConvertMetastoreParquet(relation) =>
           val parquetRelation = convertToParquetRelation(relation)
           SubqueryAlias(relation.alias.getOrElse(relation.tableName), 
parquetRelation)
       }
@@ -604,6 +646,50 @@ private[hive] class HiveMetastoreCatalog(val client: 
HiveClient, hive: HiveConte
   }
 
   /**
+   * When scanning Metastore ORC tables, convert them to ORC data source 
relations
+   * for better performance.
+   */
+  object OrcConversions extends Rule[LogicalPlan] {
+    private def shouldConvertMetastoreOrc(relation: MetastoreRelation): 
Boolean = {
+      relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") &&
+        hive.convertMetastoreOrc
+    }
+
+    private def convertToOrcRelation(relation: MetastoreRelation): 
LogicalRelation = {
+      val defaultSource = new OrcDefaultSource()
+      val fileFormatClass = classOf[OrcDefaultSource]
+      val options = Map[String, String]()
+
+      convertToLogicalRelation(relation, options, defaultSource, 
fileFormatClass, "orc")
+    }
+
+    override def apply(plan: LogicalPlan): LogicalPlan = {
+      if (!plan.resolved || plan.analyzed) {
+        return plan
+      }
+
+      plan transformUp {
+        // Write path
+        case InsertIntoTable(r: MetastoreRelation, partition, child, 
overwrite, ifNotExists)
+          // Inserting into partitioned table is not supported in Orc data 
source (yet).
+          if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
+          InsertIntoTable(convertToOrcRelation(r), partition, child, 
overwrite, ifNotExists)
+
+        // Write path
+        case InsertIntoHiveTable(r: MetastoreRelation, partition, child, 
overwrite, ifNotExists)
+          // Inserting into partitioned table is not supported in Orc data 
source (yet).
+          if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
+          InsertIntoTable(convertToOrcRelation(r), partition, child, 
overwrite, ifNotExists)
+
+        // Read path
+        case relation: MetastoreRelation if 
shouldConvertMetastoreOrc(relation) =>
+          val orcRelation = convertToOrcRelation(relation)
+          SubqueryAlias(relation.alias.getOrElse(relation.tableName), 
orcRelation)
+      }
+    }
+  }
+
+  /**
    * Creates any tables required for query execution.
    * For example, because of a CREATE TABLE X AS statement.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/1e886159/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 1cd783e..dfbf22c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -74,6 +74,7 @@ class HiveSessionCatalog(
   private val metastoreCatalog = new HiveMetastoreCatalog(client, context)
 
   val ParquetConversions: Rule[LogicalPlan] = 
metastoreCatalog.ParquetConversions
+  val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
   val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
   val PreInsertionCasts: Rule[LogicalPlan] = metastoreCatalog.PreInsertionCasts
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1e886159/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 11ef0fd..2bdb428 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -57,6 +57,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) 
extends SessionState(ctx)
     new Analyzer(catalog, functionRegistry, conf) {
       override val extendedResolutionRules =
         catalog.ParquetConversions ::
+        catalog.OrcConversions ::
         catalog.CreateTables ::
         catalog.PreInsertionCasts ::
         python.ExtractPythonUDFs ::

http://git-wip-us.apache.org/repos/asf/spark/blob/1e886159/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 92f424b..5ef8194 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -26,6 +26,8 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.internal.SQLConf
@@ -400,4 +402,41 @@ class OrcQuerySuite extends QueryTest with 
BeforeAndAfterAll with OrcTest {
       }
     }
   }
+
+  test("SPARK-14070 Use ORC data source for SQL queries on ORC tables") {
+    withTempPath { dir =>
+      withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true",
+        HiveContext.CONVERT_METASTORE_ORC.key -> "true") {
+        val path = dir.getCanonicalPath
+
+        withTable("dummy_orc") {
+          withTempTable("single") {
+            sqlContext.sql(
+              s"""CREATE TABLE dummy_orc(key INT, value STRING)
+                  |STORED AS ORC
+                  |LOCATION '$path'
+               """.stripMargin)
+
+            val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1)
+            singleRowDF.registerTempTable("single")
+
+            sqlContext.sql(
+              s"""INSERT INTO TABLE dummy_orc
+                  |SELECT key, value FROM single
+               """.stripMargin)
+
+            val df = sqlContext.sql("SELECT * FROM dummy_orc WHERE key=0")
+            checkAnswer(df, singleRowDF)
+
+            val queryExecution = df.queryExecution
+            queryExecution.analyzed.collectFirst {
+              case _: LogicalRelation => ()
+            }.getOrElse {
+              fail(s"Expecting the query plan to have LogicalRelation, but 
got:\n$queryExecution")
+            }
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to