https://issues.apache.org/jira/browse/SPARK-28945

On Sun, Sep 1, 2019 at 2:53 PM Koert Kuipers <ko...@tresata.com> wrote:

> hi,
> i am struggling to understand if concurrent writes to same basedir but
> different partitions are save with file sources such as parquet.
>
> i tested this in spark 2.4 and spark 3.0.0-SNAPSHOT with real concurrent
> jobs on hdfs and it seemed to work fine. admittedly this was a rather
> limited test.
> as the jobs are running i see on hdfs:
>
> drwxr-xr-x   - koert koert          0 2019-08-29 18:18
> out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2
> Found 1 items
> drwxr-xr-x   - koert koert          0 2019-08-29 18:18
> out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1
> Found 1 items
> drwxr-xr-x   - koert koert          0 2019-08-29 18:17 out/_temporary/0
>
> it seems each job has its own temporary directory it writes to, set to
> ".spark-staging-" + jobId
> this is consistent with the stagingDir i found in
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol, and i can find
> it being used specifically for dynamic partition overwrite mode.
> so it thought with this i am all set.
> i dont really know what this _temporary/0 is for but it did not seem to
> get in the way.
>
> but then i added some unit tests that also do concurrent writes to
> different partitions with dynamic partition overwrite mode (these test are
> much more rigorous than my ad-hoc tests on hdfs), and now i see errors like
> this:
>
> java.io.FileNotFoundException: File file:/some/base/dir/_temporary/0 does
> not exist
> at
> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:376)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:570)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
> at
> org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
> at
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:172)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:123)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:168)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:192)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:164)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:105)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:105)
> at
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:718)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:718)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:330)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:314)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
> at com.tresata.spark.sql.source.ParquetSource.writeBatchImpl
>
> so this seems to hint that the issue is with that _temporary/0. why is it
> trying to do a listStatus on this _temporary/0?
> did i always have this issue and was just lucky enough to not run into it
> on hdfs, or is it specific to RawLocalFileSystem?
>
> thanks!
> koert
>

Reply via email to