This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 5001349 [SPARK-33230][SQL] Hadoop committers to get unique job ID in
"spark.sql.sources.writeJobUUID"
5001349 is described below
commit 50013491918a83570843a6f9f53ceeed8e447ada
Author: Steve Loughran <[email protected]>
AuthorDate: Mon Oct 26 12:31:05 2020 -0700
[SPARK-33230][SQL] Hadoop committers to get unique job ID in
"spark.sql.sources.writeJobUUID"
### What changes were proposed in this pull request?
This reinstates the old option `spark.sql.sources.write.jobUUID` to set a
unique jobId in the jobconf so that hadoop MR committers have a unique ID which
is (a) consistent across tasks and workers and (b) not brittle compared to
generated-timestamp job IDs. The latter matches that of what JobID requires,
but as they are generated per-thread, may not always be unique within a cluster.
### Why are the changes needed?
If a committer (e.g s3a staging committer) uses job-attempt-ID as a unique
ID then any two jobs started within the same second have the same ID, so can
clash.
### Does this PR introduce _any_ user-facing change?
Good Q. It is "developer-facing" in the context of anyone writing a
committer. But it reinstates a property which was in Spark 1.x and "went away"
### How was this patch tested?
Testing: no test here. You'd have to create a new committer which extracted
the value in both job and task(s) and verified consistency. That is possible
(with a task output whose records contained the UUID), but it would be pretty
convoluted and a high maintenance cost.
Because it's trying to address a race condition, it's hard to regenerate
the problem downstream and so verify a fix in a test run...I'll just look at
the logs to see what temporary dir is being used in the cluster FS and verify
it's a UUID
Closes #30141 from steveloughran/SPARK-33230-jobId.
Authored-by: Steve Loughran <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 02fa19f102122f06e4358cf86c5e903fda28b289)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/execution/datasources/FileFormatWriter.scala | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 538b294..b0b1d69 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -103,7 +103,7 @@ object FileFormatWriter extends Logging {
fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions,
dataSchema)
val description = new WriteJobDescription(
- uuid = UUID.randomUUID().toString,
+ uuid = UUID.randomUUID.toString,
serializableHadoopConf = new
SerializableConfiguration(job.getConfiguration),
outputWriterFactory = outputWriterFactory,
allColumns = outputSpec.outputColumns,
@@ -134,6 +134,10 @@ object FileFormatWriter extends Logging {
SQLExecution.checkSQLExecutionId(sparkSession)
+ // propagate the decription UUID into the jobs, so that committers
+ // get an ID guaranteed to be unique.
+ job.getConfiguration.set("spark.sql.sources.writeJobUUID",
description.uuid)
+
// This call shouldn't be put into the `try` block below because it only
initializes and
// prepares the job, any exception thrown from here shouldn't cause
abortJob() to be called.
committer.setupJob(job)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]