This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new bea3f8b58c Core: Broaden exception handling in writer clean up logic
(#12863)
bea3f8b58c is described below
commit bea3f8b58cebe1458d8edb2172287cae0a04cb38
Author: Xiaoxuan <[email protected]>
AuthorDate: Thu May 1 06:59:46 2025 -0700
Core: Broaden exception handling in writer clean up logic (#12863)
---
.../java/org/apache/iceberg/io/BaseTaskWriter.java | 22 +++++++++++++++-------
.../org/apache/iceberg/io/RollingFileWriter.java | 21 +++++++++++++++------
2 files changed, 30 insertions(+), 13 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index 0834c7156a..0edf3662a1 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.io;
import java.io.Closeable;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
@@ -44,8 +43,12 @@ import org.apache.iceberg.util.StructLikeUtil;
import org.apache.iceberg.util.StructProjection;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseTaskWriter.class);
+
private final List<DataFile> completedDataFiles = Lists.newArrayList();
private final List<DeleteFile> completedDeleteFiles = Lists.newArrayList();
private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
@@ -345,12 +348,17 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
currentWriter.close();
if (currentRows == 0L) {
- try {
- io.deleteFile(currentFile.encryptingOutputFile());
- } catch (UncheckedIOException e) {
- // the file may not have been created, and it isn't worth
failing the job to clean up,
- // skip deleting
- }
+ // the file may not have been created or cannot be deleted, and it
isn't worth failing
+ // the job to clean up, skip deleting
+ Tasks.foreach(currentFile.encryptingOutputFile())
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (file, exc) ->
+ LOG.warn(
+ "Failed to delete the uncommitted empty file
during writer clean up: {}",
+ file,
+ exc))
+ .run(io::deleteFile);
} else {
complete(currentWriter);
}
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
index 7232aded44..84b36e0a3a 100644
--- a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
@@ -24,12 +24,16 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A rolling writer capable of splitting incoming data or deletes into
multiple files within one
* spec/partition based on the target file size.
*/
abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements
FileWriter<T, R> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RollingFileWriter.class);
private static final int ROWS_DIVISOR = 1000;
private final OutputFileFactory fileFactory;
@@ -125,12 +129,17 @@ abstract class RollingFileWriter<T, W extends
FileWriter<T, R>, R> implements Fi
}
if (currentFileRows == 0L) {
- try {
- io.deleteFile(currentFile.encryptingOutputFile());
- } catch (UncheckedIOException e) {
- // the file may not have been created, and it isn't worth failing
the job to clean up,
- // skip deleting
- }
+ // the file may not have been created or cannot be deleted, and it
isn't worth failing
+ // the job to clean up, skip deleting
+ Tasks.foreach(currentFile.encryptingOutputFile())
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (file, exc) ->
+ LOG.warn(
+ "Failed to delete the uncommitted empty file during
writer clean up: {}",
+ file,
+ exc))
+ .run(io::deleteFile);
} else {
addResult(currentWriter.result());
}