https://issues.apache.org/jira/browse/SPARK-28945
On Sun, Sep 1, 2019 at 2:53 PM Koert Kuipers <ko...@tresata.com> wrote: > hi, > i am struggling to understand if concurrent writes to same basedir but > different partitions are save with file sources such as parquet. > > i tested this in spark 2.4 and spark 3.0.0-SNAPSHOT with real concurrent > jobs on hdfs and it seemed to work fine. admittedly this was a rather > limited test. > as the jobs are running i see on hdfs: > > drwxr-xr-x - koert koert 0 2019-08-29 18:18 > out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2 > Found 1 items > drwxr-xr-x - koert koert 0 2019-08-29 18:18 > out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1 > Found 1 items > drwxr-xr-x - koert koert 0 2019-08-29 18:17 out/_temporary/0 > > it seems each job has its own temporary directory it writes to, set to > ".spark-staging-" + jobId > this is consistent with the stagingDir i found in > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol, and i can find > it being used specifically for dynamic partition overwrite mode. > so it thought with this i am all set. > i dont really know what this _temporary/0 is for but it did not seem to > get in the way. > > but then i added some unit tests that also do concurrent writes to > different partitions with dynamic partition overwrite mode (these test are > much more rigorous than my ad-hoc tests on hdfs), and now i see errors like > this: > > java.io.FileNotFoundException: File file:/some/base/dir/_temporary/0 does > not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:376) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525) > at > org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:570) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309) > at > org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:172) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:123) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:168) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:192) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:164) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:105) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:105) > at > org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:718) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:718) > at > org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:330) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:314) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235) > at com.tresata.spark.sql.source.ParquetSource.writeBatchImpl > > so this seems to hint that the issue is with that _temporary/0. why is it > trying to do a listStatus on this _temporary/0? > did i always have this issue and was just lucky enough to not run into it > on hdfs, or is it specific to RawLocalFileSystem? > > thanks! > koert >