[
https://issues.apache.org/jira/browse/SPARK-21162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-21162:
---------------------------------
Labels: bulk-closed (was: )
> Cannot count rows in an empty Hive table stored as parquet when
> spark.sql.parquet.cacheMetadata is set to false
> ---------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-21162
> URL: https://issues.apache.org/jira/browse/SPARK-21162
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.2, 1.6.3
> Reporter: Tom Ogle
> Priority: Major
> Labels: bulk-closed
>
> With spark.sql.parquet.cacheMetadata set to false, creating an empty Hive
> table stored as Parquet and then trying to count the rows using SparkSQL
> throws an IOException. The issue does not affect Spark 2. This issue is
> inconvenient in environments using Spark 1.6.x where
> spark.sql.parquet.cacheMetadata is explicitly set to false for some reason,
> such as in Google DataProc 1.0.
> Here is the stacktrace:
> {code}
> 17/06/21 15:30:10 INFO ParquetRelation: Reading Parquet file(s) from
> Exception in thread "main"
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
> output=[count#30L])
> +- TungstenExchange SinglePartition, None
> +- TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#33L])
> +- Scan ParquetRelation: my_test_db.test_table[] InputPaths:
> <snip>/my_test_db.db/test_table
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
> at
> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
> at
> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
> at
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
> at
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
> at
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
> at
> org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1516)
> at
> org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1515)
> at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
> at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1515)
> at App$.main(App.scala:23)
> at App.main(App.scala)
> Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> execute, tree:
> TungstenExchange SinglePartition, None
> +- TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#33L])
> +- Scan ParquetRelation: my_test_db.test_table[] InputPaths:
> <snip>/my_test_db.db/test_table
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
> at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> ... 19 more
> Caused by: java.io.IOException: No input paths specified in job
> at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231)
> at
> org.apache.parquet.hadoop.ParquetInputFormat.listStatus(ParquetInputFormat.java:339)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1$$anon$4.listStatus(ParquetRelation.scala:358)
> at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
> at
> org.apache.parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:294)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1.getPartitions(ParquetRelation.scala:363)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
> at
> org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:220)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> ... 27 more
> {code}
> Here is some Scala code to reproduce the issue locally:
> App.scala:
> {code}
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.{SparkConf, SparkContext}
> object App {
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("Testing
> Issue").setMaster("local[*]")
> val sc = new SparkContext(conf)
> val hiveContext = new HiveContext(sc)
> hiveContext.setConf("spark.sql.parquet.cacheMetadata", "false")
> val databaseName = "my_test_db"
> val tableName = "test_table"
> val fullTableName = databaseName + "." + tableName
> hiveContext.sql("DROP TABLE IF EXISTS " + fullTableName)
> hiveContext.sql("DROP DATABASE IF EXISTS " + databaseName)
> hiveContext.sql("CREATE DATABASE IF NOT EXISTS " + databaseName)
> hiveContext.sql(
> s"""CREATE TABLE IF NOT EXISTS $fullTableName
> | (x string) stored as parquet
> """.stripMargin)
> hiveContext.table(fullTableName).count()
> sc.stop()
> }
> }
> {code}
> build.sbt:
> {code}
> name := "test-issue"
> version := "1.0"
> scalaVersion := "2.10.5"
> val sparkVersion = "1.6.3"
> libraryDependencies ++= Seq(
> "org.apache.spark" %% "spark-core" % sparkVersion,
> "org.apache.spark" %% "spark-sql" % sparkVersion,
> "org.apache.spark" %% "spark-hive" % sparkVersion
> )
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]