This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 06c70ba6c80c [MINOR][SQL] Move iterator.hasNext into try block in 
executeTask
06c70ba6c80c is described below

commit 06c70ba6c80c0b218dfb448bc76a3a812d7af047
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Oct 5 21:58:06 2024 -0700

    [MINOR][SQL] Move iterator.hasNext into try block in executeTask
    
    ### What changes were proposed in this pull request?
    
    This patch moves `iterator.hasNext` into the try block of 
`tryWithSafeFinallyAndFailureCallbacks` in `FileFormatWriter.executeTask`.
    
    ### Why are the changes needed?
    
    Not only `dataWriter.writeWithIterator(iterator)` causes error, 
`iterator.hasNext` could cause error like:
    
    ```
    org.apache.spark.shuffle.FetchFailedException: Block shuffle_1_106_21 is 
corrupted but checksum verification passed
    ```
    
    As it is not wrapped in the try block, `abort` won't be called on the 
committer. But as `setupTask` is called, it is safer to call `abort` in any 
case that error happens after it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48360 from viirya/try_block.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: huaxingao <[email protected]>
---
 .../execution/datasources/FileFormatWriter.scala   | 45 +++++++++++++---------
 1 file changed, 27 insertions(+), 18 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 91749ddd794f..5e6107c4f49c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -383,32 +383,41 @@ object FileFormatWriter extends Logging {
 
     committer.setupTask(taskAttemptContext)
 
-    val dataWriter =
-      if (sparkPartitionId != 0 && !iterator.hasNext) {
-        // In case of empty job, leave first partition to save meta for file 
format like parquet.
-        new EmptyDirectoryDataWriter(description, taskAttemptContext, 
committer)
-      } else if (description.partitionColumns.isEmpty && 
description.bucketSpec.isEmpty) {
-        new SingleDirectoryDataWriter(description, taskAttemptContext, 
committer)
-      } else {
-        concurrentOutputWriterSpec match {
-          case Some(spec) =>
-            new DynamicPartitionDataConcurrentWriter(
-              description, taskAttemptContext, committer, spec)
-          case _ =>
-            new DynamicPartitionDataSingleWriter(description, 
taskAttemptContext, committer)
-        }
-      }
+    var dataWriter: FileFormatDataWriter = null
 
     Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+      dataWriter =
+        if (sparkPartitionId != 0 && !iterator.hasNext) {
+          // In case of empty job, leave first partition to save meta for file 
format like parquet.
+          new EmptyDirectoryDataWriter(description, taskAttemptContext, 
committer)
+        } else if (description.partitionColumns.isEmpty && 
description.bucketSpec.isEmpty) {
+          new SingleDirectoryDataWriter(description, taskAttemptContext, 
committer)
+        } else {
+          concurrentOutputWriterSpec match {
+            case Some(spec) =>
+              new DynamicPartitionDataConcurrentWriter(
+                description, taskAttemptContext, committer, spec)
+            case _ =>
+              new DynamicPartitionDataSingleWriter(description, 
taskAttemptContext, committer)
+          }
+        }
+
       // Execute the task to write rows out and commit the task.
       dataWriter.writeWithIterator(iterator)
       dataWriter.commit()
     })(catchBlock = {
       // If there is an error, abort the task
-      dataWriter.abort()
-      logError(log"Job ${MDC(JOB_ID, jobId)} aborted.")
+      if (dataWriter != null) {
+        dataWriter.abort()
+      } else {
+        committer.abortTask(taskAttemptContext)
+      }
+      logError(log"Job: ${MDC(JOB_ID, jobId)}, Task: ${MDC(TASK_ID, taskId)}, 
" +
+        log"Task attempt ${MDC(TASK_ATTEMPT_ID, taskAttemptId)} aborted.")
     }, finallyBlock = {
-      dataWriter.close()
+      if (dataWriter != null) {
+        dataWriter.close()
+      }
     })
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to