hi,
i am struggling to understand if concurrent writes to same basedir but
different partitions are save with file sources such as parquet.

i tested this in spark 2.4 and spark 3.0.0-SNAPSHOT with real concurrent
jobs on hdfs and it seemed to work fine. admittedly this was a rather
limited test.
as the jobs are running i see on hdfs:

drwxr-xr-x   - koert koert          0 2019-08-29 18:18
out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:18
out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:17 out/_temporary/0

it seems each job has its own temporary directory it writes to, set to
".spark-staging-" + jobId
this is consistent with the stagingDir i found in
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol, and i can find
it being used specifically for dynamic partition overwrite mode.
so it thought with this i am all set.
i dont really know what this _temporary/0 is for but it did not seem to get
in the way.

but then i added some unit tests that also do concurrent writes to
different partitions with dynamic partition overwrite mode (these test are
much more rigorous than my ad-hoc tests on hdfs), and now i see errors like
this:

java.io.FileNotFoundException: File file:/some/base/dir/_temporary/0 does
not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:376)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:570)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
at
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:172)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:123)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:168)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:192)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:164)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:105)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:105)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:718)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:718)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:330)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:314)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
at com.tresata.spark.sql.source.ParquetSource.writeBatchImpl

so this seems to hint that the issue is with that _temporary/0. why is it
trying to do a listStatus on this _temporary/0?
did i always have this issue and was just lucky enough to not run into it
on hdfs, or is it specific to RawLocalFileSystem?

thanks!
koert

Reply via email to