This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 2bd20a96790 [SPARK-42478][SQL][3.3] Make a serializable jobTrackerId
instead of a non-serializable JobID in FileWriterFactory
2bd20a96790 is described below
commit 2bd20a9679003743c82753f1152ea3e7da2aa96a
Author: Yikf <[email protected]>
AuthorDate: Mon Mar 6 14:06:12 2023 -0800
[SPARK-42478][SQL][3.3] Make a serializable jobTrackerId instead of a
non-serializable JobID in FileWriterFactory
This is a backport of https://github.com/apache/spark/pull/40064 for
branch-3.3
### What changes were proposed in this pull request?
Make a serializable jobTrackerId instead of a non-serializable JobID in
FileWriterFactory
### Why are the changes needed?
[SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make
consistent MR job IDs in FileBatchWriter and FileFormatWriter, but it breaks a
serializable issue, JobId is non-serializable.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #40290 from Yikf/backport-SPARK-42478-3.3.
Authored-by: Yikf <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/execution/datasources/v2/FileWriterFactory.scala | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
index ea13e2deac8..4b1a099d3ba 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala
@@ -30,7 +30,12 @@ case class FileWriterFactory (
description: WriteJobDescription,
committer: FileCommitProtocol) extends DataWriterFactory {
- private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
+ // SPARK-42478: jobId across tasks should be consistent to meet the contract
+ // expected by Hadoop committers, but `JobId` cannot be serialized.
+ // thus, persist the serializable jobTrackerID in the class and make jobId a
+ // transient lazy val which recreates it each time to ensure jobId is unique.
+ private[this] val jobTrackerID =
SparkHadoopWriterUtils.createJobTrackerID(new Date)
+ @transient private lazy val jobId =
SparkHadoopWriterUtils.createJobID(jobTrackerID, 0)
override def createWriter(partitionId: Int, realTaskId: Long):
DataWriter[InternalRow] = {
val taskAttemptContext = createTaskAttemptContext(partitionId)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]