This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 0c45e9658392872c6f60fa6f4ba29ad06e26c2ff
Author: Ryan Blue <[email protected]>
AuthorDate: Thu Sep 1 12:44:19 2022 -0700

    Core: Fix exception handling in BaseTaskWriter (#5683)
    
    * Core: Fix exception handling in BaseTaskWriter.
    
    * Fix state check.
---
 .../org/apache/iceberg/aws/s3/S3OutputStream.java  | 13 ---
 .../apache/iceberg/aws/s3/TestS3OutputStream.java  |  7 +-
 .../java/org/apache/iceberg/io/BaseTaskWriter.java | 96 ++++++++++++++--------
 .../apache/iceberg/io/SortedPosDeleteWriter.java   | 18 +++-
 4 files changed, 82 insertions(+), 52 deletions(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
index 80ed982367..ebb1ad82f6 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
@@ -103,7 +103,6 @@ class S3OutputStream extends PositionOutputStream {
 
   private long pos = 0;
   private boolean closed = false;
-  private Throwable closeFailureException;
 
   @SuppressWarnings("StaticAssignmentInConstructor")
   S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties, 
MetricsContext metrics)
@@ -241,15 +240,6 @@ class S3OutputStream extends PositionOutputStream {
 
   @Override
   public void close() throws IOException {
-
-    // A failed s3 close removes state that is required for a successful close.
-    // Any future close on this stream should fail.
-    if (closeFailureException != null) {
-      throw new IOException(
-          "Attempted to close an S3 output stream that failed to close 
earlier",
-          closeFailureException);
-    }
-
     if (closed) {
       return;
     }
@@ -260,9 +250,6 @@ class S3OutputStream extends PositionOutputStream {
     try {
       stream.close();
       completeUploads();
-    } catch (Exception e) {
-      closeFailureException = e;
-      throw e;
     } finally {
       cleanUpStagingFiles();
     }
diff --git 
a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java 
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
index 28d9c5da6c..465f8c50f7 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
@@ -36,6 +36,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -162,7 +163,7 @@ public class TestS3OutputStream {
   }
 
   @Test
-  public void testCloseFailureShouldPersistOnFutureClose() throws IOException {
+  public void testDoubleClose() throws IOException {
     IllegalStateException mockException =
         new IllegalStateException("mock failure to completeUploads on close");
     Mockito.doThrow(mockException)
@@ -174,9 +175,7 @@ public class TestS3OutputStream {
         .isInstanceOf(mockException.getClass())
         .hasMessageContaining(mockException.getMessage());
 
-    Assertions.assertThatThrownBy(stream::close)
-        .isInstanceOf(IOException.class)
-        .hasCause(mockException);
+    Assertions.assertThatNoException().isThrownBy(stream::close);
   }
 
   private void writeTest() {
diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java 
b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index c80084f3d3..fe96b2e994 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -53,6 +53,7 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
   private final OutputFileFactory fileFactory;
   private final FileIO io;
   private final long targetFileSize;
+  private Throwable failure;
 
   protected BaseTaskWriter(PartitionSpec spec, FileFormat format, 
FileAppenderFactory<T> appenderFactory,
                            OutputFileFactory fileFactory, FileIO io, long 
targetFileSize) {
@@ -68,6 +69,12 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
     return spec;
   }
 
+  protected void setFailure(Throwable throwable) {
+    if (failure == null) {
+      this.failure = throwable;
+    }
+  }
+
   @Override
   public void abort() throws IOException {
     close();
@@ -84,6 +91,8 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
   public WriteResult complete() throws IOException {
     close();
 
+    Preconditions.checkState(failure == null, "Cannot return results from 
failed writer", failure);
+
     return WriteResult.builder()
         .addDataFiles(completedDataFiles)
         .addDeleteFiles(completedDeleteFiles)
@@ -181,28 +190,43 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
 
     @Override
     public void close() throws IOException {
-      // Close data writer and add completed data files.
-      if (dataWriter != null) {
-        dataWriter.close();
-        dataWriter = null;
-      }
+      try {
+        // Close data writer and add completed data files.
+        if (dataWriter != null) {
+          try {
+            dataWriter.close();
+          } finally {
+            dataWriter = null;
+          }
+        }
 
-      // Close eq-delete writer and add completed equality-delete files.
-      if (eqDeleteWriter != null) {
-        eqDeleteWriter.close();
-        eqDeleteWriter = null;
-      }
+        // Close eq-delete writer and add completed equality-delete files.
+        if (eqDeleteWriter != null) {
+          try {
+            eqDeleteWriter.close();
+          } finally {
+            eqDeleteWriter = null;
+          }
+        }
 
-      if (insertedRowMap != null) {
-        insertedRowMap.clear();
-        insertedRowMap = null;
-      }
+        if (insertedRowMap != null) {
+          insertedRowMap.clear();
+          insertedRowMap = null;
+        }
 
-      // Add the completed pos-delete files.
-      if (posDeleteWriter != null) {
-        completedDeleteFiles.addAll(posDeleteWriter.complete());
-        referencedDataFiles.addAll(posDeleteWriter.referencedDataFiles());
-        posDeleteWriter = null;
+        // Add the completed pos-delete files.
+        if (posDeleteWriter != null) {
+          try {
+            // complete will call close
+            completedDeleteFiles.addAll(posDeleteWriter.complete());
+            referencedDataFiles.addAll(posDeleteWriter.referencedDataFiles());
+          } finally {
+            posDeleteWriter = null;
+          }
+        }
+      } catch (IOException | RuntimeException e) {
+        setFailure(e);
+        throw e;
       }
     }
   }
@@ -287,21 +311,29 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
 
     private void closeCurrent() throws IOException {
       if (currentWriter != null) {
-        currentWriter.close();
-
-        if (currentRows == 0L) {
-          try {
-            io.deleteFile(currentFile.encryptingOutputFile());
-          } catch (UncheckedIOException e) {
-            // the file may not have been created, and it isn't worth failing 
the job to clean up, skip deleting
+        try {
+          currentWriter.close();
+
+          if (currentRows == 0L) {
+            try {
+              io.deleteFile(currentFile.encryptingOutputFile());
+            } catch (UncheckedIOException e) {
+              // the file may not have been created, and it isn't worth 
failing the job to clean up,
+              // skip deleting
+            }
+          } else {
+            complete(currentWriter);
           }
-        } else {
-          complete(currentWriter);
-        }
 
-        this.currentFile = null;
-        this.currentWriter = null;
-        this.currentRows = 0;
+        } catch (IOException | RuntimeException e) {
+          setFailure(e);
+          throw e;
+
+        } finally {
+          this.currentFile = null;
+          this.currentWriter = null;
+          this.currentRows = 0;
+        }
       }
     }
 
diff --git 
a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java 
b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
index 36a0313a4e..1fd5c00792 100644
--- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
@@ -53,6 +53,7 @@ class SortedPosDeleteWriter<T> implements 
FileWriter<PositionDelete<T>, DeleteWr
 
   private int records = 0;
   private boolean closed = false;
+  private Throwable failure;
 
   SortedPosDeleteWriter(FileAppenderFactory<T> appenderFactory,
                         OutputFileFactory fileFactory,
@@ -73,6 +74,12 @@ class SortedPosDeleteWriter<T> implements 
FileWriter<PositionDelete<T>, DeleteWr
     this(appenderFactory, fileFactory, format, partition, 
DEFAULT_RECORDS_NUM_THRESHOLD);
   }
 
+  protected void setFailure(Throwable throwable) {
+    if (failure == null) {
+      this.failure = throwable;
+    }
+  }
+
   @Override
   public long length() {
     throw new UnsupportedOperationException(this.getClass().getName() + " does 
not implement length");
@@ -106,6 +113,8 @@ class SortedPosDeleteWriter<T> implements 
FileWriter<PositionDelete<T>, DeleteWr
   public List<DeleteFile> complete() throws IOException {
     close();
 
+    Preconditions.checkState(failure == null, "Cannot return results from 
failed writer", failure);
+
     return completedFiles;
   }
 
@@ -116,8 +125,8 @@ class SortedPosDeleteWriter<T> implements 
FileWriter<PositionDelete<T>, DeleteWr
   @Override
   public void close() throws IOException {
     if (!closed) {
-      flushDeletes();
       this.closed = true;
+      flushDeletes();
     }
   }
 
@@ -157,8 +166,11 @@ class SortedPosDeleteWriter<T> implements 
FileWriter<PositionDelete<T>, DeleteWr
         positions.forEach(posRow -> closeableWriter.delete(path, posRow.pos(), 
posRow.row()));
       }
     } catch (IOException e) {
-      throw new UncheckedIOException("Failed to write the sorted path/pos 
pairs to pos-delete file: " +
-          outputFile.encryptingOutputFile().location(), e);
+      setFailure(e);
+      throw new UncheckedIOException(
+          "Failed to write the sorted path/pos pairs to pos-delete file: " +
+              outputFile.encryptingOutputFile().location(),
+          e);
     }
 
     // Clear the buffered pos-deletions.

Reply via email to