[ https://issues.apache.org/jira/browse/SPARK-27030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719158#comment-17719158 ]
Shrikant Prasad commented on SPARK-27030: ----------------------------------------- [~lev] [~shivuson...@gmail.com] Did you find any resolution for this issue? I am encountering the same error. > DataFrameWriter.insertInto fails when writing in parallel to a hive table > ------------------------------------------------------------------------- > > Key: SPARK-27030 > URL: https://issues.apache.org/jira/browse/SPARK-27030 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.0 > Reporter: Lev Katzav > Priority: Major > Labels: bulk-closed > > When writing to a hive table, the following temp directory is used: > {code:java} > /path/to/table/_temporary/0/{code} > (the 0 at the end comes from the config > {code:java} > "mapreduce.job.application.attempt.id"{code} > since that config is missing, it falls back to 0) > when there are 2 processes that write to the same table, there could be the > following race condition: > # p1 creates temp folder and uses it > # p2 uses temp folder > # p1 finishes and deletes temp folder > # p2 fails since temp folder is missing > > It is possible to recreate this error locally with the following code: > (the code runs locally, but I experienced the same error when running on a > cluster > with 2 jobs writing to the same table) > {code:java} > import org.apache.spark.sql.functions._ > val df = spark > .range(1000) > .toDF("a") > .withColumn("partition", lit(0)) > .cache() > //create db > sqlContext.sql("CREATE DATABASE IF NOT EXISTS db").count() > //create table > df > .write > .partitionBy("partition") > .saveAsTable("db.table") > val x = (1 to 100).par > x.tasksupport = new ForkJoinTaskSupport( new ForkJoinPool(10)) > //insert to different partitions in parallel > x.foreach { p => > val df2 = df > .withColumn("partition",lit(p)) > df2 > .write > .mode(SaveMode.Overwrite) > .insertInto("db.table") > } > {code} > > the error would be: > {code:java} > java.io.FileNotFoundException: File > file:/path/to/warehouse/db.db/table/_temporary/0 does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:406) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537) > at > org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:669) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:283) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:325) > at > org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:185) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) > at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:325) > at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:311) > at > company.name.spark.hive.SparkHiveUtilsTest$$anonfun$3$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(SparkHiveUtilsTest.scala:190) > at > scala.collection.parallel.immutable.ParRange$ParRangeIterator.foreach(ParRange.scala:91) > at > scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) > at > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) > at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) > at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) > at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) > at > scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) > at > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159) > at > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443) > at > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149) > at > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) > at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code} > > A possible work around that I found is to set the config > "mapreduce.job.application.attempt.id" > to a random integer in every job in SparkConf, and thus making each job write > to a different path, but that won't work when there is a single spark context > (since the config is the same) > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org