This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 06c70ba6c80c [MINOR][SQL] Move iterator.hasNext into try block in
executeTask
06c70ba6c80c is described below
commit 06c70ba6c80c0b218dfb448bc76a3a812d7af047
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Oct 5 21:58:06 2024 -0700
[MINOR][SQL] Move iterator.hasNext into try block in executeTask
### What changes were proposed in this pull request?
This patch moves `iterator.hasNext` into the try block of
`tryWithSafeFinallyAndFailureCallbacks` in `FileFormatWriter.executeTask`.
### Why are the changes needed?
Not only `dataWriter.writeWithIterator(iterator)` causes error,
`iterator.hasNext` could cause error like:
```
org.apache.spark.shuffle.FetchFailedException: Block shuffle_1_106_21 is
corrupted but checksum verification passed
```
As it is not wrapped in the try block, `abort` won't be called on the
committer. But as `setupTask` is called, it is safer to call `abort` in any
case that error happens after it.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48360 from viirya/try_block.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: huaxingao <[email protected]>
---
.../execution/datasources/FileFormatWriter.scala | 45 +++++++++++++---------
1 file changed, 27 insertions(+), 18 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 91749ddd794f..5e6107c4f49c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -383,32 +383,41 @@ object FileFormatWriter extends Logging {
committer.setupTask(taskAttemptContext)
- val dataWriter =
- if (sparkPartitionId != 0 && !iterator.hasNext) {
- // In case of empty job, leave first partition to save meta for file
format like parquet.
- new EmptyDirectoryDataWriter(description, taskAttemptContext,
committer)
- } else if (description.partitionColumns.isEmpty &&
description.bucketSpec.isEmpty) {
- new SingleDirectoryDataWriter(description, taskAttemptContext,
committer)
- } else {
- concurrentOutputWriterSpec match {
- case Some(spec) =>
- new DynamicPartitionDataConcurrentWriter(
- description, taskAttemptContext, committer, spec)
- case _ =>
- new DynamicPartitionDataSingleWriter(description,
taskAttemptContext, committer)
- }
- }
+ var dataWriter: FileFormatDataWriter = null
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+ dataWriter =
+ if (sparkPartitionId != 0 && !iterator.hasNext) {
+ // In case of empty job, leave first partition to save meta for file
format like parquet.
+ new EmptyDirectoryDataWriter(description, taskAttemptContext,
committer)
+ } else if (description.partitionColumns.isEmpty &&
description.bucketSpec.isEmpty) {
+ new SingleDirectoryDataWriter(description, taskAttemptContext,
committer)
+ } else {
+ concurrentOutputWriterSpec match {
+ case Some(spec) =>
+ new DynamicPartitionDataConcurrentWriter(
+ description, taskAttemptContext, committer, spec)
+ case _ =>
+ new DynamicPartitionDataSingleWriter(description,
taskAttemptContext, committer)
+ }
+ }
+
// Execute the task to write rows out and commit the task.
dataWriter.writeWithIterator(iterator)
dataWriter.commit()
})(catchBlock = {
// If there is an error, abort the task
- dataWriter.abort()
- logError(log"Job ${MDC(JOB_ID, jobId)} aborted.")
+ if (dataWriter != null) {
+ dataWriter.abort()
+ } else {
+ committer.abortTask(taskAttemptContext)
+ }
+ logError(log"Job: ${MDC(JOB_ID, jobId)}, Task: ${MDC(TASK_ID, taskId)},
" +
+ log"Task attempt ${MDC(TASK_ATTEMPT_ID, taskAttemptId)} aborted.")
}, finallyBlock = {
- dataWriter.close()
+ if (dataWriter != null) {
+ dataWriter.close()
+ }
})
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]