This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new bea3f8b58c Core: Broaden exception handling in writer clean up logic 
(#12863)
bea3f8b58c is described below

commit bea3f8b58cebe1458d8edb2172287cae0a04cb38
Author: Xiaoxuan <[email protected]>
AuthorDate: Thu May 1 06:59:46 2025 -0700

    Core: Broaden exception handling in writer clean up logic (#12863)
---
 .../java/org/apache/iceberg/io/BaseTaskWriter.java | 22 +++++++++++++++-------
 .../org/apache/iceberg/io/RollingFileWriter.java   | 21 +++++++++++++++------
 2 files changed, 30 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java 
b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index 0834c7156a..0edf3662a1 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.io;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.DataFile;
@@ -44,8 +43,12 @@ import org.apache.iceberg.util.StructLikeUtil;
 import org.apache.iceberg.util.StructProjection;
 import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseTaskWriter.class);
+
   private final List<DataFile> completedDataFiles = Lists.newArrayList();
   private final List<DeleteFile> completedDeleteFiles = Lists.newArrayList();
   private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
@@ -345,12 +348,17 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
           currentWriter.close();
 
           if (currentRows == 0L) {
-            try {
-              io.deleteFile(currentFile.encryptingOutputFile());
-            } catch (UncheckedIOException e) {
-              // the file may not have been created, and it isn't worth 
failing the job to clean up,
-              // skip deleting
-            }
+            // the file may not have been created or cannot be deleted, and it 
isn't worth failing
+            // the job to clean up, skip deleting
+            Tasks.foreach(currentFile.encryptingOutputFile())
+                .suppressFailureWhenFinished()
+                .onFailure(
+                    (file, exc) ->
+                        LOG.warn(
+                            "Failed to delete the uncommitted empty file 
during writer clean up: {}",
+                            file,
+                            exc))
+                .run(io::deleteFile);
           } else {
             complete(currentWriter);
           }
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java 
b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
index 7232aded44..84b36e0a3a 100644
--- a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
@@ -24,12 +24,16 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A rolling writer capable of splitting incoming data or deletes into 
multiple files within one
  * spec/partition based on the target file size.
  */
 abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements 
FileWriter<T, R> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RollingFileWriter.class);
   private static final int ROWS_DIVISOR = 1000;
 
   private final OutputFileFactory fileFactory;
@@ -125,12 +129,17 @@ abstract class RollingFileWriter<T, W extends 
FileWriter<T, R>, R> implements Fi
       }
 
       if (currentFileRows == 0L) {
-        try {
-          io.deleteFile(currentFile.encryptingOutputFile());
-        } catch (UncheckedIOException e) {
-          // the file may not have been created, and it isn't worth failing 
the job to clean up,
-          // skip deleting
-        }
+        // the file may not have been created or cannot be deleted, and it 
isn't worth failing
+        // the job to clean up, skip deleting
+        Tasks.foreach(currentFile.encryptingOutputFile())
+            .suppressFailureWhenFinished()
+            .onFailure(
+                (file, exc) ->
+                    LOG.warn(
+                        "Failed to delete the uncommitted empty file during 
writer clean up: {}",
+                        file,
+                        exc))
+            .run(io::deleteFile);
       } else {
         addResult(currentWriter.result());
       }

Reply via email to