[
https://issues.apache.org/jira/browse/SPARK-27030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782943#comment-16782943
]
Shivu Sondur commented on SPARK-27030:
--------------------------------------
I am checking this issue
> DataFrameWriter.insertInto fails when writing in parallel to a hive table
> -------------------------------------------------------------------------
>
> Key: SPARK-27030
> URL: https://issues.apache.org/jira/browse/SPARK-27030
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.0
> Reporter: Lev Katzav
> Priority: Major
>
> When writing to a hive table, the following temp directory is used:
> {code:java}
> /path/to/table/_temporary/0/{code}
> (the 0 at the end comes from the config
> {code:java}
> "mapreduce.job.application.attempt.id"{code}
> since that config is missing, it falls back to 0)
> when there are 2 processes that write to the same table, there could be the
> following race condition:
> # p1 creates temp folder and uses it
> # p2 uses temp folder
> # p1 finishes and deletes temp folder
> # p2 fails since temp folder is missing
>
> It is possible to recreate this error locally with the following code:
> (the code runs locally, but I experienced the same error when running on a
> cluster
> with 2 jobs writing to the same table)
> {code:java}
> import org.apache.spark.sql.functions._
> val df = spark
> .range(1000)
> .toDF("a")
> .withColumn("partition", lit(0))
> .cache()
> //create db
> sqlContext.sql("CREATE DATABASE IF NOT EXISTS db").count()
> //create table
> df
> .write
> .partitionBy("partition")
> .saveAsTable("db.table")
> val x = (1 to 100).par
> x.tasksupport = new ForkJoinTaskSupport( new ForkJoinPool(10))
> //insert to different partitions in parallel
> x.foreach { p =>
> val df2 = df
> .withColumn("partition",lit(p))
> df2
> .write
> .mode(SaveMode.Overwrite)
> .insertInto("db.table")
> }
> {code}
>
> the error would be:
> {code:java}
> java.io.FileNotFoundException: File
> file:/path/to/warehouse/db.db/table/_temporary/0 does not exist
> at
> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:406)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:669)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:283)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:325)
> at
> org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
> at
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:185)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
> at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:325)
> at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:311)
> at
> company.name.spark.hive.SparkHiveUtilsTest$$anonfun$3$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(SparkHiveUtilsTest.scala:190)
> at
> scala.collection.parallel.immutable.ParRange$ParRangeIterator.foreach(ParRange.scala:91)
> at
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
> at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> at
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
> at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
> at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
> at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
> at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code}
>
> A possible work around that I found is to set the config
> "mapreduce.job.application.attempt.id"
> to a random integer in every job in SparkConf, and thus making each job write
> to a different path, but that won't work when there is a single spark context
> (since the config is the same)
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]