This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 5001349  [SPARK-33230][SQL] Hadoop committers to get unique job ID in 
"spark.sql.sources.writeJobUUID"
5001349 is described below

commit 50013491918a83570843a6f9f53ceeed8e447ada
Author: Steve Loughran <[email protected]>
AuthorDate: Mon Oct 26 12:31:05 2020 -0700

    [SPARK-33230][SQL] Hadoop committers to get unique job ID in 
"spark.sql.sources.writeJobUUID"
    
    ### What changes were proposed in this pull request?
    
    This reinstates the old option `spark.sql.sources.write.jobUUID` to set a 
unique jobId in the jobconf so that hadoop MR committers have a unique ID which 
is (a) consistent across tasks and workers and (b) not brittle compared to 
generated-timestamp job IDs. The latter matches that of what JobID requires, 
but as they are generated per-thread, may not always be unique within a cluster.
    
    ### Why are the changes needed?
    
    If a committer (e.g s3a staging committer) uses job-attempt-ID as a unique 
ID then any two jobs started within the same second have the same ID, so can 
clash.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Good Q. It is "developer-facing" in the context of anyone writing a 
committer. But it reinstates a property which was in Spark 1.x and "went away"
    
    ### How was this patch tested?
    
    Testing: no test here. You'd have to create a new committer which extracted 
the value in both job and task(s) and verified consistency. That is possible 
(with a task output whose records contained the UUID), but it would be pretty 
convoluted and a high maintenance cost.
    
    Because it's trying to address a race condition, it's hard to regenerate 
the problem downstream and so verify a fix in a test run...I'll just look at 
the logs to see what temporary dir is being used in the cluster FS and verify 
it's a UUID
    
    Closes #30141 from steveloughran/SPARK-33230-jobId.
    
    Authored-by: Steve Loughran <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 02fa19f102122f06e4358cf86c5e903fda28b289)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/sql/execution/datasources/FileFormatWriter.scala   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 538b294..b0b1d69 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -103,7 +103,7 @@ object FileFormatWriter extends Logging {
       fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, 
dataSchema)
 
     val description = new WriteJobDescription(
-      uuid = UUID.randomUUID().toString,
+      uuid = UUID.randomUUID.toString,
       serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
       outputWriterFactory = outputWriterFactory,
       allColumns = outputSpec.outputColumns,
@@ -134,6 +134,10 @@ object FileFormatWriter extends Logging {
 
     SQLExecution.checkSQLExecutionId(sparkSession)
 
+    // propagate the decription UUID into the jobs, so that committers
+    // get an ID guaranteed to be unique.
+    job.getConfiguration.set("spark.sql.sources.writeJobUUID", 
description.uuid)
+
     // This call shouldn't be put into the `try` block below because it only 
initializes and
     // prepares the job, any exception thrown from here shouldn't cause 
abortJob() to be called.
     committer.setupJob(job)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to