This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push: new 41df43f [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs. 41df43f is described below commit 41df43fa263090b65620a986f4130aef0b32bc3c Author: Ryan Blue <b...@apache.org> AuthorDate: Tue Feb 19 10:06:10 2019 -0800 [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs. ## What changes were proposed in this pull request? Backport SPARK-26873 (#23777) to branch-2.3. ## How was this patch tested? Existing tests to cover regressions. Closes #23832 from rdblue/SPARK-26873-branch-2.3. Authored-by: Ryan Blue <b...@apache.org> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../apache/spark/sql/execution/datasources/FileFormatWriter.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 2f701ed..5be821d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -190,12 +190,14 @@ object FileFormatWriter extends Logging { global = false, child = plan).execute() } + val jobIdInstant = new Date().getTime val ret = new Array[WriteTaskResult](rdd.partitions.length) sparkSession.sparkContext.runJob( rdd, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { executeTask( description = description, + jobIdInstant = jobIdInstant, sparkStageId = taskContext.stageId(), sparkPartitionId = taskContext.partitionId(), sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, @@ -228,13 +230,14 @@ object FileFormatWriter extends Logging { /** Writes data out in a single Spark task. */ private def executeTask( description: WriteJobDescription, + jobIdInstant: Long, sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[InternalRow]): WriteTaskResult = { - val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId) + val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org