On Sun, Sep 1, 2019 at 7:54 PM Koert Kuipers <ko...@tresata.com> wrote:
> hi, > i am struggling to understand if concurrent writes to same basedir but > different partitions are save with file sources such as parquet. > > i tested this in spark 2.4 and spark 3.0.0-SNAPSHOT with real concurrent > jobs on hdfs and it seemed to work fine. admittedly this was a rather > limited test. > as the jobs are running i see on hdfs: > > drwxr-xr-x - koert koert 0 2019-08-29 18:18 > out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2 > Found 1 items > drwxr-xr-x - koert koert 0 2019-08-29 18:18 > out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1 > Found 1 items > drwxr-xr-x - koert koert 0 2019-08-29 18:17 out/_temporary/0 > > it seems each job has its own temporary directory it writes to, set to > ".spark-staging-" + jobId > this is consistent with the stagingDir i found in > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol, and i can find > it being used specifically for dynamic partition overwrite mode. > so it thought with this i am all set. > i dont really know what this _temporary/0 is for but it did not seem to > get in the way. > It's a core part of the hadoop MR commit protocols. I think the best (only!) docs of these other than the most confusing piece of co-recursive code I've ever had to step through taking notes of is : https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_draft_005 every MR app attempt has its own attempt ID; when the hadoop MR engine attempt N is restarted it looks for the temp dir of N-1 and can use this to recover from failure. Spark's solution to the app restart problem is "be faster and fix failures by restarting entirely", so the app attempt is always 0 If you have two app attempts writing to same destination path, their output is inevitably going to conflict and as the first job commit will delete the attempt dir then the second will fail. As your stack trace shows. 1. You need to (somehow) get a different attempt ID for each job to avoid that clash. No Idea how to do that; it may need code changes 2 and set "mapreduce.fileoutputcommitter.cleanup.skipped" to false to avoid a full cleanup of _temporary on job commit > > but then i added some unit tests that also do concurrent writes to > different partitions with dynamic partition overwrite mode (these test are > much more rigorous than my ad-hoc tests on hdfs), and now i see errors like > this: > > java.io.FileNotFoundException: File file:/some/base/dir/_temporary/0 does > not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:376) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525) > at > org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:570) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309) > at > org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:172) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:123) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:168) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:192) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:164) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:105) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:105) > at > org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:718) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:718) > at > org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:330) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:314) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235) > at com.tresata.spark.sql.source.ParquetSource.writeBatchImpl > > so this seems to hint that the issue is with that _temporary/0. why is it > trying to do a listStatus on this _temporary/0? > did i always have this issue and was just lucky enough to not run into it > on hdfs, or is it specific to RawLocalFileSystem? > > thanks! > koert >