Repository: spark
Updated Branches:
  refs/heads/branch-1.2 38cb2c3a3 -> 47931975e


[SPARK-4552][SQL] Avoid exception when reading empty parquet data through Hive

This is a very small fix that catches one specific exception and returns an 
empty table.  #3441 will address this in a more principled way.

Author: Michael Armbrust <[email protected]>

Closes #3586 from marmbrus/fixEmptyParquet and squashes the following commits:

2781d9f [Michael Armbrust] Handle empty lists for newParquet
04dd376 [Michael Armbrust] Avoid exception when reading empty parquet data 
through Hive

(cherry picked from commit 513ef82e85661552e596d0b483b645ac24e86d4d)
Signed-off-by: Michael Armbrust <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47931975
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47931975
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47931975

Branch: refs/heads/branch-1.2
Commit: 47931975eaffaf6f4c2a9b65d56a2f25806a2e12
Parents: 38cb2c3
Author: Michael Armbrust <[email protected]>
Authored: Wed Dec 3 14:13:35 2014 -0800
Committer: Michael Armbrust <[email protected]>
Committed: Wed Dec 3 14:13:46 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   |  5 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  | 96 +++++++++++---------
 .../spark/sql/parquet/parquetSuites.scala       |  6 ++
 3 files changed, 62 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47931975/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 9b89c3b..14f8659 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -191,7 +191,10 @@ case class ParquetRelation2(path: String)(@transient val 
sqlContext: SQLContext)
     val selectedPartitions = partitions.filter(p => 
partitionFilters.forall(_(p)))
     val fs = FileSystem.get(new java.net.URI(path), 
sparkContext.hadoopConfiguration)
     val selectedFiles = selectedPartitions.flatMap(_.files).map(f => 
fs.makeQualified(f.getPath))
-    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, 
selectedFiles:_*)
+    // FileInputFormat cannot handle empty lists.
+    if (selectedFiles.nonEmpty) {
+      org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, 
selectedFiles: _*)
+    }
 
     // Push down filters when possible
     predicates

http://git-wip-us.apache.org/repos/asf/spark/blob/47931975/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 56fc852..edf291f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.types.StringType
-import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
+import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, 
SparkPlan, PhysicalRDD}
 import org.apache.spark.sql.hive
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.parquet.ParquetRelation
@@ -104,53 +104,61 @@ private[hive] trait HiveStrategies {
           case a: AttributeReference => UnresolvedAttribute(a.name)
         })
 
-        if (relation.hiveQlTable.isPartitioned) {
-          val rawPredicate = 
pruningPredicates.reduceOption(And).getOrElse(Literal(true))
-          // Translate the predicate so that it automatically casts the input 
values to the correct
-          // data types during evaluation
-          val castedPredicate = rawPredicate transform {
-            case a: AttributeReference =>
-              val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
-              val key = relation.partitionKeys(idx)
-              Cast(BoundReference(idx, StringType, nullable = true), 
key.dataType)
-          }
-
-          val inputData = new GenericMutableRow(relation.partitionKeys.size)
-          val pruningCondition =
-            if(codegenEnabled) {
-              GeneratePredicate(castedPredicate)
-            } else {
-              InterpretedPredicate(castedPredicate)
+        try {
+          if (relation.hiveQlTable.isPartitioned) {
+            val rawPredicate = 
pruningPredicates.reduceOption(And).getOrElse(Literal(true))
+            // Translate the predicate so that it automatically casts the 
input values to the
+            // correct data types during evaluation.
+            val castedPredicate = rawPredicate transform {
+              case a: AttributeReference =>
+                val idx = relation.partitionKeys.indexWhere(a.exprId == 
_.exprId)
+                val key = relation.partitionKeys(idx)
+                Cast(BoundReference(idx, StringType, nullable = true), 
key.dataType)
             }
 
-          val partitions = relation.hiveQlPartitions.filter { part =>
-            val partitionValues = part.getValues
-            var i = 0
-            while (i < partitionValues.size()) {
-              inputData(i) = partitionValues(i)
-              i += 1
+            val inputData = new GenericMutableRow(relation.partitionKeys.size)
+            val pruningCondition =
+              if (codegenEnabled) {
+                GeneratePredicate(castedPredicate)
+              } else {
+                InterpretedPredicate(castedPredicate)
+              }
+
+            val partitions = relation.hiveQlPartitions.filter { part =>
+              val partitionValues = part.getValues
+              var i = 0
+              while (i < partitionValues.size()) {
+                inputData(i) = partitionValues(i)
+                i += 1
+              }
+              pruningCondition(inputData)
             }
-            pruningCondition(inputData)
-          }
 
-          hiveContext
-            .parquetFile(partitions.map(_.getLocation).mkString(","))
-            .addPartitioningAttributes(relation.partitionKeys)
-            .lowerCase
-            .where(unresolvedOtherPredicates)
-            .select(unresolvedProjection:_*)
-            .queryExecution
-            .executedPlan
-            .fakeOutput(projectList.map(_.toAttribute)):: Nil
-        } else {
-          hiveContext
-            .parquetFile(relation.hiveQlTable.getDataLocation.toString)
-            .lowerCase
-            .where(unresolvedOtherPredicates)
-            .select(unresolvedProjection:_*)
-            .queryExecution
-            .executedPlan
-            .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+            hiveContext
+              .parquetFile(partitions.map(_.getLocation).mkString(","))
+              .addPartitioningAttributes(relation.partitionKeys)
+              .lowerCase
+              .where(unresolvedOtherPredicates)
+              .select(unresolvedProjection: _*)
+              .queryExecution
+              .executedPlan
+              .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+          } else {
+            hiveContext
+              .parquetFile(relation.hiveQlTable.getDataLocation.toString)
+              .lowerCase
+              .where(unresolvedOtherPredicates)
+              .select(unresolvedProjection: _*)
+              .queryExecution
+              .executedPlan
+              .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+          }
+        } catch {
+          // parquetFile will throw an exception when there is no data.
+          // TODO: Remove this hack for Spark 1.3.
+          case iae: java.lang.IllegalArgumentException
+              if iae.getMessage.contains("Can not create a Path from an empty 
string") =>
+            PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil
         }
       case _ => Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/47931975/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 7159ebd..488ebba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -218,6 +218,12 @@ abstract class ParquetTest extends QueryTest with 
BeforeAndAfterAll {
         10)
     }
 
+    test(s"non-existant partition $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
+        0)
+    }
+
     test(s"multi-partition pruned count $table") {
       checkAnswer(
         sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to