[ 
https://issues.apache.org/jira/browse/SPARK-21162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-21162:
---------------------------------
    Labels: bulk-closed  (was: )

> Cannot count rows in an empty Hive table stored as parquet when 
> spark.sql.parquet.cacheMetadata is set to false
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21162
>                 URL: https://issues.apache.org/jira/browse/SPARK-21162
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.2, 1.6.3
>            Reporter: Tom Ogle
>            Priority: Major
>              Labels: bulk-closed
>
> With spark.sql.parquet.cacheMetadata set to false, creating an empty Hive 
> table stored as Parquet and then trying to count the rows using SparkSQL 
> throws an IOException. The issue does not affect Spark 2. This issue is 
> inconvenient in environments using Spark 1.6.x where 
> spark.sql.parquet.cacheMetadata is explicitly set to false for some reason, 
> such as in Google DataProc 1.0.
> Here is the stacktrace:
> {code}
> 17/06/21 15:30:10 INFO ParquetRelation: Reading Parquet file(s) from 
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#30L])
> +- TungstenExchange SinglePartition, None
>    +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#33L])
>       +- Scan ParquetRelation: my_test_db.test_table[] InputPaths: 
> <snip>/my_test_db.db/test_table
>       at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
>       at 
> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
>       at 
> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
>       at 
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
>       at 
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
>       at 
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
>       at 
> org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1516)
>       at 
> org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1515)
>       at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
>       at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1515)
>       at App$.main(App.scala:23)
>       at App.main(App.scala)
> Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
> execute, tree:
> TungstenExchange SinglePartition, None
> +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#33L])
>    +- Scan ParquetRelation: my_test_db.test_table[] InputPaths: 
> <snip>/my_test_db.db/test_table
>       at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
>       at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
>       at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>       ... 19 more
> Caused by: java.io.IOException: No input paths specified in job
>       at 
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231)
>       at 
> org.apache.parquet.hadoop.ParquetInputFormat.listStatus(ParquetInputFormat.java:339)
>       at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1$$anon$4.listStatus(ParquetRelation.scala:358)
>       at 
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
>       at 
> org.apache.parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:294)
>       at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1.getPartitions(ParquetRelation.scala:363)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>       at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
>       at 
> org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:220)
>       at 
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
>       at 
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
>       at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>       ... 27 more
> {code}
> Here is some Scala code to reproduce the issue locally:
> App.scala:
> {code}
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.{SparkConf, SparkContext}
> object App {
>   def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setAppName("Testing 
> Issue").setMaster("local[*]")
>     val sc = new SparkContext(conf)
>     val hiveContext = new HiveContext(sc)
>     hiveContext.setConf("spark.sql.parquet.cacheMetadata", "false")
>     val databaseName = "my_test_db"
>     val tableName = "test_table"
>     val fullTableName = databaseName + "." + tableName
>     hiveContext.sql("DROP TABLE IF EXISTS " + fullTableName)
>     hiveContext.sql("DROP DATABASE IF EXISTS " + databaseName)
>     hiveContext.sql("CREATE DATABASE IF NOT EXISTS " + databaseName)
>     hiveContext.sql(
>       s"""CREATE TABLE IF NOT EXISTS $fullTableName
>          | (x string) stored as parquet
>        """.stripMargin)
>     hiveContext.table(fullTableName).count()
>     sc.stop()
>   }
> }
> {code}
> build.sbt:
> {code}
> name := "test-issue"
> version := "1.0"
> scalaVersion := "2.10.5"
> val sparkVersion = "1.6.3"
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % sparkVersion,
>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>   "org.apache.spark" %% "spark-hive" % sparkVersion
> )
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to