This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 41df43f  [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop 
Job IDs.
41df43f is described below

commit 41df43fa263090b65620a986f4130aef0b32bc3c
Author: Ryan Blue <b...@apache.org>
AuthorDate: Tue Feb 19 10:06:10 2019 -0800

    [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.
    
    ## What changes were proposed in this pull request?
    
    Backport SPARK-26873 (#23777) to branch-2.3.
    
    ## How was this patch tested?
    
    Existing tests to cover regressions.
    
    Closes #23832 from rdblue/SPARK-26873-branch-2.3.
    
    Authored-by: Ryan Blue <b...@apache.org>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../apache/spark/sql/execution/datasources/FileFormatWriter.scala    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 2f701ed..5be821d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -190,12 +190,14 @@ object FileFormatWriter extends Logging {
           global = false,
           child = plan).execute()
       }
+      val jobIdInstant = new Date().getTime
       val ret = new Array[WriteTaskResult](rdd.partitions.length)
       sparkSession.sparkContext.runJob(
         rdd,
         (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
           executeTask(
             description = description,
+            jobIdInstant = jobIdInstant,
             sparkStageId = taskContext.stageId(),
             sparkPartitionId = taskContext.partitionId(),
             sparkAttemptNumber = taskContext.taskAttemptId().toInt & 
Integer.MAX_VALUE,
@@ -228,13 +230,14 @@ object FileFormatWriter extends Logging {
   /** Writes data out in a single Spark task. */
   private def executeTask(
       description: WriteJobDescription,
+      jobIdInstant: Long,
       sparkStageId: Int,
       sparkPartitionId: Int,
       sparkAttemptNumber: Int,
       committer: FileCommitProtocol,
       iterator: Iterator[InternalRow]): WriteTaskResult = {
 
-    val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
+    val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), 
sparkStageId)
     val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
     val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to