This is an automated email from the ASF dual-hosted git repository. blue pushed a commit to branch 0.14.x in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 0c45e9658392872c6f60fa6f4ba29ad06e26c2ff Author: Ryan Blue <[email protected]> AuthorDate: Thu Sep 1 12:44:19 2022 -0700 Core: Fix exception handling in BaseTaskWriter (#5683) * Core: Fix exception handling in BaseTaskWriter. * Fix state check. --- .../org/apache/iceberg/aws/s3/S3OutputStream.java | 13 --- .../apache/iceberg/aws/s3/TestS3OutputStream.java | 7 +- .../java/org/apache/iceberg/io/BaseTaskWriter.java | 96 ++++++++++++++-------- .../apache/iceberg/io/SortedPosDeleteWriter.java | 18 +++- 4 files changed, 82 insertions(+), 52 deletions(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java index 80ed982367..ebb1ad82f6 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java @@ -103,7 +103,6 @@ class S3OutputStream extends PositionOutputStream { private long pos = 0; private boolean closed = false; - private Throwable closeFailureException; @SuppressWarnings("StaticAssignmentInConstructor") S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties, MetricsContext metrics) @@ -241,15 +240,6 @@ class S3OutputStream extends PositionOutputStream { @Override public void close() throws IOException { - - // A failed s3 close removes state that is required for a successful close. - // Any future close on this stream should fail. - if (closeFailureException != null) { - throw new IOException( - "Attempted to close an S3 output stream that failed to close earlier", - closeFailureException); - } - if (closed) { return; } @@ -260,9 +250,6 @@ class S3OutputStream extends PositionOutputStream { try { stream.close(); completeUploads(); - } catch (Exception e) { - closeFailureException = e; - throw e; } finally { cleanUpStagingFiles(); } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java index 28d9c5da6c..465f8c50f7 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java @@ -36,6 +36,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -162,7 +163,7 @@ public class TestS3OutputStream { } @Test - public void testCloseFailureShouldPersistOnFutureClose() throws IOException { + public void testDoubleClose() throws IOException { IllegalStateException mockException = new IllegalStateException("mock failure to completeUploads on close"); Mockito.doThrow(mockException) @@ -174,9 +175,7 @@ public class TestS3OutputStream { .isInstanceOf(mockException.getClass()) .hasMessageContaining(mockException.getMessage()); - Assertions.assertThatThrownBy(stream::close) - .isInstanceOf(IOException.class) - .hasCause(mockException); + Assertions.assertThatNoException().isThrownBy(stream::close); } private void writeTest() { diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index c80084f3d3..fe96b2e994 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -53,6 +53,7 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> { private final OutputFileFactory fileFactory; private final FileIO io; private final long targetFileSize; + private Throwable failure; protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize) { @@ -68,6 +69,12 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> { return spec; } + protected void setFailure(Throwable throwable) { + if (failure == null) { + this.failure = throwable; + } + } + @Override public void abort() throws IOException { close(); @@ -84,6 +91,8 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> { public WriteResult complete() throws IOException { close(); + Preconditions.checkState(failure == null, "Cannot return results from failed writer", failure); + return WriteResult.builder() .addDataFiles(completedDataFiles) .addDeleteFiles(completedDeleteFiles) @@ -181,28 +190,43 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> { @Override public void close() throws IOException { - // Close data writer and add completed data files. - if (dataWriter != null) { - dataWriter.close(); - dataWriter = null; - } + try { + // Close data writer and add completed data files. + if (dataWriter != null) { + try { + dataWriter.close(); + } finally { + dataWriter = null; + } + } - // Close eq-delete writer and add completed equality-delete files. - if (eqDeleteWriter != null) { - eqDeleteWriter.close(); - eqDeleteWriter = null; - } + // Close eq-delete writer and add completed equality-delete files. + if (eqDeleteWriter != null) { + try { + eqDeleteWriter.close(); + } finally { + eqDeleteWriter = null; + } + } - if (insertedRowMap != null) { - insertedRowMap.clear(); - insertedRowMap = null; - } + if (insertedRowMap != null) { + insertedRowMap.clear(); + insertedRowMap = null; + } - // Add the completed pos-delete files. - if (posDeleteWriter != null) { - completedDeleteFiles.addAll(posDeleteWriter.complete()); - referencedDataFiles.addAll(posDeleteWriter.referencedDataFiles()); - posDeleteWriter = null; + // Add the completed pos-delete files. + if (posDeleteWriter != null) { + try { + // complete will call close + completedDeleteFiles.addAll(posDeleteWriter.complete()); + referencedDataFiles.addAll(posDeleteWriter.referencedDataFiles()); + } finally { + posDeleteWriter = null; + } + } + } catch (IOException | RuntimeException e) { + setFailure(e); + throw e; } } } @@ -287,21 +311,29 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> { private void closeCurrent() throws IOException { if (currentWriter != null) { - currentWriter.close(); - - if (currentRows == 0L) { - try { - io.deleteFile(currentFile.encryptingOutputFile()); - } catch (UncheckedIOException e) { - // the file may not have been created, and it isn't worth failing the job to clean up, skip deleting + try { + currentWriter.close(); + + if (currentRows == 0L) { + try { + io.deleteFile(currentFile.encryptingOutputFile()); + } catch (UncheckedIOException e) { + // the file may not have been created, and it isn't worth failing the job to clean up, + // skip deleting + } + } else { + complete(currentWriter); } - } else { - complete(currentWriter); - } - this.currentFile = null; - this.currentWriter = null; - this.currentRows = 0; + } catch (IOException | RuntimeException e) { + setFailure(e); + throw e; + + } finally { + this.currentFile = null; + this.currentWriter = null; + this.currentRows = 0; + } } } diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index 36a0313a4e..1fd5c00792 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -53,6 +53,7 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr private int records = 0; private boolean closed = false; + private Throwable failure; SortedPosDeleteWriter(FileAppenderFactory<T> appenderFactory, OutputFileFactory fileFactory, @@ -73,6 +74,12 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr this(appenderFactory, fileFactory, format, partition, DEFAULT_RECORDS_NUM_THRESHOLD); } + protected void setFailure(Throwable throwable) { + if (failure == null) { + this.failure = throwable; + } + } + @Override public long length() { throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length"); @@ -106,6 +113,8 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr public List<DeleteFile> complete() throws IOException { close(); + Preconditions.checkState(failure == null, "Cannot return results from failed writer", failure); + return completedFiles; } @@ -116,8 +125,8 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr @Override public void close() throws IOException { if (!closed) { - flushDeletes(); this.closed = true; + flushDeletes(); } } @@ -157,8 +166,11 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr positions.forEach(posRow -> closeableWriter.delete(path, posRow.pos(), posRow.row())); } } catch (IOException e) { - throw new UncheckedIOException("Failed to write the sorted path/pos pairs to pos-delete file: " + - outputFile.encryptingOutputFile().location(), e); + setFailure(e); + throw new UncheckedIOException( + "Failed to write the sorted path/pos pairs to pos-delete file: " + + outputFile.encryptingOutputFile().location(), + e); } // Clear the buffered pos-deletions.
