This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 782bf654ee4c fix(common): Close log writer output stream on append
failure (#18909)
782bf654ee4c is described below
commit 782bf654ee4cac9da7e1244da03d83b298bbbe3a
Author: fhan <[email protected]>
AuthorDate: Tue Jun 9 12:05:09 2026 +0800
fix(common): Close log writer output stream on append failure (#18909)
* fix(common): Close log writer output stream on append failure
* simplify the exception handling
---------
Co-authored-by: fhan <[email protected]>
Co-authored-by: danny0405 <[email protected]>
---
.../org/apache/hudi/exception/ExceptionUtil.java | 18 +-
.../apache/hudi/exception/TestExceptionUtil.java | 20 +++
.../common/table/log/HoodieLogFormatWriter.java | 184 +++++++++++++--------
.../table/log/TestHoodieLogFormatWriter.java | 180 ++++++++++++++++++++
4 files changed, 333 insertions(+), 69 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/exception/ExceptionUtil.java
b/hudi-common/src/main/java/org/apache/hudi/exception/ExceptionUtil.java
index 92e2e3cc5356..40a39542c7a7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/exception/ExceptionUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/exception/ExceptionUtil.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.util.StringUtils;
import javax.annotation.Nonnull;
+import java.io.IOException;
+
/**
* Util class for exception analysis.
*/
@@ -31,7 +33,7 @@ public final class ExceptionUtil {
}
/**
- * Returns true if error message is contained in any nested exception of
provided {@link Throwable}.
+ * Returns true if error message is contained in any nested exception to
provided {@link Throwable}.
*/
public static boolean validateErrorMsg(@Nonnull Throwable t, String
errorMsg) {
if (StringUtils.isNullOrEmpty(errorMsg)) {
@@ -48,4 +50,18 @@ public final class ExceptionUtil {
return false;
}
+
+ /**
+ * Throws the provided exception as-is when it is an {@link IOException} or
+ * {@link RuntimeException}, otherwise wraps it in an {@link IOException}.
+ */
+ public static void throwAsIOExceptionOrRuntimeException(Throwable exception)
throws IOException {
+ if (exception instanceof IOException) {
+ throw (IOException) exception;
+ }
+ if (exception instanceof RuntimeException) {
+ throw (RuntimeException) exception;
+ }
+ throw new IOException(exception);
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/exception/TestExceptionUtil.java
b/hudi-common/src/test/java/org/apache/hudi/exception/TestExceptionUtil.java
index dba850b1b907..c2244896d855 100644
--- a/hudi-common/src/test/java/org/apache/hudi/exception/TestExceptionUtil.java
+++ b/hudi-common/src/test/java/org/apache/hudi/exception/TestExceptionUtil.java
@@ -23,8 +23,11 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
+import static
org.apache.hudi.exception.ExceptionUtil.throwAsIOExceptionOrRuntimeException;
import static org.apache.hudi.exception.ExceptionUtil.validateErrorMsg;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class TestExceptionUtil {
@@ -54,4 +57,21 @@ class TestExceptionUtil {
// Empty string should not be found in any message (including null)
assertFalse(validateErrorMsg(exceptionWithoutMessage, ""));
}
+
+ @Test
+ void testThrowAsIOExceptionOrRuntimeException() {
+ IOException ioException = new IOException("io");
+ IOException thrownIOException = assertThrows(IOException.class, () ->
throwAsIOExceptionOrRuntimeException(ioException));
+ assertSame(ioException, thrownIOException);
+
+ RuntimeException runtimeException = new RuntimeException("runtime");
+ RuntimeException thrownRuntimeException =
+ assertThrows(RuntimeException.class, () ->
throwAsIOExceptionOrRuntimeException(runtimeException));
+ assertSame(runtimeException, thrownRuntimeException);
+
+ Exception checkedException = new Exception("checked");
+ IOException wrappedException =
+ assertThrows(IOException.class, () ->
throwAsIOExceptionOrRuntimeException(checkedException));
+ assertSame(checkedException, wrappedException.getCause());
+ }
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index 2aed1d7dd87a..fe24bd600f1e 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -22,6 +22,7 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.exception.ExceptionUtil;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
@@ -125,68 +126,73 @@ public class HoodieLogFormatWriter extends
HoodieLogFormat.Writer {
@Override
public AppendResult appendBlocks(List<HoodieLogBlock> blocks) throws
IOException {
- // Find current version
- HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
- new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
-
- FSDataOutputStream originalOutputStream = getOutputStream();
- long startPos = originalOutputStream.getPos();
- long sizeWritten = 0;
- // HUDI-2655. here we wrap originalOutputStream to ensure huge blocks can
be correctly written
- FSDataOutputStream outputStream = new
FSDataOutputStream(originalOutputStream, new
FileSystem.Statistics(storage.getScheme()), startPos);
- for (HoodieLogBlock block: blocks) {
- long startSize = outputStream.size();
-
- // 1. Write the magic header for the start of the block
- outputStream.write(HoodieLogFormat.MAGIC);
-
- // bytes for header
- byte[] headerBytes =
HoodieLogBlock.getHeaderMetadataBytes(block.getLogBlockHeader());
- // content bytes
- ByteArrayOutputStream content = block.getContentBytes(storage);
- // bytes for footer
- byte[] footerBytes =
HoodieLogBlock.getFooterMetadataBytes(block.getLogBlockFooter());
-
- // 2. Write the total size of the block (excluding Magic)
- outputStream.writeLong(getLogBlockLength(content.size(),
headerBytes.length, footerBytes.length));
-
- // 3. Write the version of this log block
- outputStream.writeInt(currentLogFormatVersion.getVersion());
- // 4. Write the block type
- outputStream.writeInt(block.getBlockType().ordinal());
-
- // 5. Write the headers for the log block
- outputStream.write(headerBytes);
- // 6. Write the size of the content block
- outputStream.writeLong(content.size());
- // 7. Write the contents of the data block
- content.writeTo(outputStream);
- // 8. Write the footers for the log block
- outputStream.write(footerBytes);
- // 9. Write the total size of the log block (including magic) which is
everything written
- // until now (for reverse pointer)
- // Update: this information is now used in determining if a block is
corrupt by comparing to the
- // block size in header. This change assumes that the block size will
be the last data written
- // to a block. Read will break if any data is written past this point
for a block.
- outputStream.writeLong(outputStream.size() - startSize);
-
- // Fetch the size again, so it accounts also (9).
-
- // HUDI-2655. Check the size written to avoid log blocks whose size
overflow.
- if (outputStream.size() == Integer.MAX_VALUE) {
- throw new HoodieIOException("Blocks appended may overflow. Please
decrease log block size or log block amount");
+ try {
+ // Find current version
+ HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
+ new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
+
+ FSDataOutputStream originalOutputStream = getOutputStream();
+ long startPos = originalOutputStream.getPos();
+ long sizeWritten = 0;
+ // HUDI-2655. here we wrap originalOutputStream to ensure huge blocks
can be correctly written
+ FSDataOutputStream outputStream = new
FSDataOutputStream(originalOutputStream, new
FileSystem.Statistics(storage.getScheme()), startPos);
+ for (HoodieLogBlock block: blocks) {
+ long startSize = outputStream.size();
+
+ // 1. Write the magic header for the start of the block
+ outputStream.write(HoodieLogFormat.MAGIC);
+
+ // bytes for header
+ byte[] headerBytes =
HoodieLogBlock.getHeaderMetadataBytes(block.getLogBlockHeader());
+ // content bytes
+ ByteArrayOutputStream content = block.getContentBytes(storage);
+ // bytes for footer
+ byte[] footerBytes =
HoodieLogBlock.getFooterMetadataBytes(block.getLogBlockFooter());
+
+ // 2. Write the total size of the block (excluding Magic)
+ outputStream.writeLong(getLogBlockLength(content.size(),
headerBytes.length, footerBytes.length));
+
+ // 3. Write the version of this log block
+ outputStream.writeInt(currentLogFormatVersion.getVersion());
+ // 4. Write the block type
+ outputStream.writeInt(block.getBlockType().ordinal());
+
+ // 5. Write the headers for the log block
+ outputStream.write(headerBytes);
+ // 6. Write the size of the content block
+ outputStream.writeLong(content.size());
+ // 7. Write the contents of the data block
+ content.writeTo(outputStream);
+ // 8. Write the footers for the log block
+ outputStream.write(footerBytes);
+ // 9. Write the total size of the log block (including magic) which is
everything written
+ // until now (for reverse pointer)
+ // Update: this information is now used in determining if a block is
corrupt by comparing to the
+ // block size in header. This change assumes that the block size
will be the last data written
+ // to a block. Read will break if any data is written past this
point for a block.
+ outputStream.writeLong(outputStream.size() - startSize);
+
+ // Fetch the size again, so it accounts also (9).
+
+ // HUDI-2655. Check the size written to avoid log blocks whose size
overflow.
+ if (outputStream.size() == Integer.MAX_VALUE) {
+ throw new HoodieIOException("Blocks appended may overflow. Please
decrease log block size or log block amount");
+ }
+ sizeWritten += outputStream.size() - startSize;
}
- sizeWritten += outputStream.size() - startSize;
+ // No flush/hsync here: append-time visibility is not part of the
contract.
+ // Downstream readers only need commit-level visibility, which is
provided
+ // when the writer is closed (see closeStream) or when callers explicitly
+ // invoke sync().
+
+ AppendResult result = new AppendResult(logFile, startPos, sizeWritten);
+ // roll over if size is past the threshold
+ rolloverIfNeeded();
+ return result;
+ } catch (IOException | RuntimeException e) {
+ closeOutputStreamOnAppendFailure(e);
+ throw e;
}
- // No flush/hsync here: append-time visibility is not part of the contract.
- // Downstream readers only need commit-level visibility, which is provided
- // when the writer is closed (see closeStream) or when callers explicitly
- // invoke sync().
-
- AppendResult result = new AppendResult(logFile, startPos, sizeWritten);
- // roll over if size is past the threshold
- rolloverIfNeeded();
- return result;
}
/**
@@ -237,21 +243,63 @@ public class HoodieLogFormatWriter extends
HoodieLogFormat.Writer {
@Override
public void close() throws IOException {
- closeStream();
- // remove the shutdown hook after closing the stream to avoid memory leaks
- if (null != shutdownThread) {
- Runtime.getRuntime().removeShutdownHook(shutdownThread);
+ try {
+ closeStream();
+ } finally {
+ // remove the shutdown hook after closing the stream to avoid memory
leaks
+ if (null != shutdownThread) {
+ Runtime.getRuntime().removeShutdownHook(shutdownThread);
+ shutdownThread = null;
+ }
}
}
private void closeStream() throws IOException {
- if (outputStream != null) {
+ if (outputStream == null) {
+ return;
+ }
+
+ Throwable failure = null;
+ try {
// Persist all buffered data to DataNodes before closing so downstream
// readers can observe a fully-written log file at commit-level
visibility.
sync();
- outputStream.close();
- outputStream = null;
- closed = true;
+ } catch (IOException | RuntimeException e) {
+ failure = e;
+ }
+
+ try {
+ closeOutputStream();
+ } catch (IOException | RuntimeException closeException) {
+ if (failure != null) {
+ failure.addSuppressed(closeException);
+ } else {
+ failure = closeException;
+ }
+ }
+
+ if (failure != null) {
+ ExceptionUtil.throwAsIOExceptionOrRuntimeException(failure);
+ }
+ }
+
+ private void closeOutputStreamOnAppendFailure(Throwable failure) {
+ try {
+ closeOutputStream();
+ } catch (IOException | RuntimeException closeException) {
+ failure.addSuppressed(closeException);
+ log.warn("Failed to close output stream after append failure for log
file {}", logFile, closeException);
+ }
+ }
+
+ private void closeOutputStream() throws IOException {
+ if (outputStream != null) {
+ try {
+ outputStream.close();
+ } finally {
+ outputStream = null;
+ closed = true;
+ }
}
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormatWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormatWriter.java
new file mode 100644
index 000000000000..15316012a97a
--- /dev/null
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormatWriter.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHoodieLogFormatWriter {
+
+ private static final String WRITE_FAIL = "write-fail";
+ private static final String CLOSE_FAIL = "close-fail";
+ private static final String SYNC_FAIL = "sync-fail";
+
+ @TempDir
+ java.nio.file.Path tempDir;
+
+ @Test
+ void testCloseOutputOnAppendWriteException() throws IOException {
+ HoodieStorage storage = HoodieTestUtils.getStorage(tempDir.toString());
+ HoodieLogFormatWriter writer = newWriter(storage);
+ try {
+ CloseTrackingOutputStream outputStream = new
CloseTrackingOutputStream(true, false);
+ writer.withOutputStream(newFSDataOutputStream(outputStream, storage));
+
+ IOException exception = assertThrows(IOException.class, () ->
writer.appendBlock(commandBlock()));
+
+ assertEquals(WRITE_FAIL, exception.getMessage());
+ assertTrue(outputStream.isClosed());
+ assertThrows(IllegalStateException.class, writer::getCurrentSize);
+ } finally {
+ writer.close();
+ }
+ }
+
+ @Test
+ void testPreserveAppendExceptionWhenCloseFails() throws IOException {
+ HoodieStorage storage = HoodieTestUtils.getStorage(tempDir.toString());
+ HoodieLogFormatWriter writer = newWriter(storage);
+ try {
+ CloseTrackingOutputStream outputStream = new
CloseTrackingOutputStream(true, true);
+ writer.withOutputStream(newFSDataOutputStream(outputStream, storage));
+
+ IOException exception = assertThrows(IOException.class, () ->
writer.appendBlock(commandBlock()));
+
+ assertEquals(WRITE_FAIL, exception.getMessage());
+ assertTrue(outputStream.isClosed());
+ assertEquals(1, exception.getSuppressed().length);
+ assertEquals(CLOSE_FAIL, exception.getSuppressed()[0].getMessage());
+ assertThrows(IllegalStateException.class, writer::getCurrentSize);
+ } finally {
+ writer.close();
+ }
+ }
+
+ @Test
+ void testCloseOutputWhenSyncFailsOnClose() throws IOException {
+ HoodieStorage storage = HoodieTestUtils.getStorage(tempDir.toString());
+ HoodieLogFormatWriter writer = newWriter(storage);
+ try {
+ CloseTrackingOutputStream outputStream = new
CloseTrackingOutputStream(false, false);
+ writer.withOutputStream(new SyncFailingFSDataOutputStream(outputStream,
storage));
+
+ IOException exception = assertThrows(IOException.class, writer::close);
+
+ assertEquals(SYNC_FAIL, exception.getMessage());
+ assertTrue(outputStream.isClosed());
+ assertThrows(IllegalStateException.class, writer::getCurrentSize);
+ } finally {
+ writer.close();
+ }
+ }
+
+ private HoodieLogFormatWriter newWriter(HoodieStorage storage) throws
IOException {
+ return HoodieLogFormatWriter.builder()
+ .withParentPath(new StoragePath(tempDir.toString()))
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId("test-fileid")
+ .withInstantTime("100")
+ .withLogVersion(1)
+ .withStorage(storage)
+ .build();
+ }
+
+ private HoodieCommandBlock commandBlock() {
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
+ return new HoodieCommandBlock(header);
+ }
+
+ private FSDataOutputStream newFSDataOutputStream(CloseTrackingOutputStream
outputStream, HoodieStorage storage)
+ throws IOException {
+ return new FSDataOutputStream(outputStream, new
FileSystem.Statistics(storage.getScheme()));
+ }
+
+ private static class CloseTrackingOutputStream extends OutputStream {
+
+ private final boolean failOnWrite;
+ private final boolean failOnClose;
+ private boolean closed;
+
+ private CloseTrackingOutputStream(boolean failOnWrite, boolean
failOnClose) {
+ this.failOnWrite = failOnWrite;
+ this.failOnClose = failOnClose;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (failOnWrite) {
+ throw new IOException(WRITE_FAIL);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (failOnWrite) {
+ throw new IOException(WRITE_FAIL);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ if (failOnClose) {
+ throw new IOException(CLOSE_FAIL);
+ }
+ }
+
+ private boolean isClosed() {
+ return closed;
+ }
+ }
+
+ private static class SyncFailingFSDataOutputStream extends
FSDataOutputStream {
+
+ private SyncFailingFSDataOutputStream(CloseTrackingOutputStream
outputStream, HoodieStorage storage)
+ throws IOException {
+ super(outputStream, new FileSystem.Statistics(storage.getScheme()));
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ throw new IOException(SYNC_FAIL);
+ }
+ }
+}