This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ca27526d759 Iceberg fileio close (#37168)
ca27526d759 is described below
commit ca27526d759ea617166958940671019dea6547e2
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon Dec 22 17:23:06 2025 -0600
Iceberg fileio close (#37168)
* fix(iceberg): prevent premature FileIO closure in RecordWriter
Keep FileIO open for writer lifetime to avoid connection pool shutdown
issues
Add test to verify FileIO remains open until writer close
* trigger ITs
* also apply in AppendFilesToTables
---------
Co-authored-by: liferoad <[email protected]>
---
.../IO_Iceberg_Integration_Tests.json | 2 +-
.../beam/sdk/io/iceberg/AppendFilesToTables.java | 14 +--
.../apache/beam/sdk/io/iceberg/RecordWriter.java | 50 +++++++---
.../sdk/io/iceberg/RecordWriterManagerTest.java | 105 +++++++++++++++++++++
4 files changed, 151 insertions(+), 20 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 34a6e02150e..b73af5e61a4 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 4
+ "modification": 1
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
index 12888b4e4e0..1789932d69a 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
@@ -189,14 +189,14 @@ class AppendFilesToTables
ManifestWriter<DataFile> writer;
try (FileIO io = table.io()) {
writer = createManifestWriter(table.location(), uuid, spec, io);
+ for (DataFile file : files) {
+ writer.add(file);
+ committedDataFileByteSize.update(file.fileSizeInBytes());
+ committedDataFileRecordCount.update(file.recordCount());
+ }
+ writer.close();
+ update.appendManifest(writer.toManifestFile());
}
- for (DataFile file : files) {
- writer.add(file);
- committedDataFileByteSize.update(file.fileSizeInBytes());
- committedDataFileRecordCount.update(file.recordCount());
- }
- writer.close();
- update.appendManifest(writer.toManifestFile());
}
update.commit();
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
index d4a61c6d3e1..d233b0ac05b 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +47,7 @@ class RecordWriter {
private final Table table;
private final String absoluteFilename;
private final FileFormat fileFormat;
+ private @Nullable FileIO io;
RecordWriter(
Catalog catalog, IcebergDestination destination, String filename,
PartitionKey partitionKey)
@@ -72,12 +74,14 @@ class RecordWriter {
}
OutputFile outputFile;
EncryptionKeyMetadata keyMetadata;
- try (FileIO io = table.io()) {
- OutputFile tmpFile = io.newOutputFile(absoluteFilename);
- EncryptedOutputFile encryptedOutputFile =
table.encryption().encrypt(tmpFile);
- outputFile = encryptedOutputFile.encryptingOutputFile();
- keyMetadata = encryptedOutputFile.keyMetadata();
- }
+ // Keep FileIO open for the lifetime of this writer to avoid
+ // premature shutdown of underlying client pools (e.g., S3),
+ // which manifests as "Connection pool shut down" (Issue #36438).
+ this.io = table.io();
+ OutputFile tmpFile = io.newOutputFile(absoluteFilename);
+ EncryptedOutputFile encryptedOutputFile =
table.encryption().encrypt(tmpFile);
+ outputFile = encryptedOutputFile.encryptingOutputFile();
+ keyMetadata = encryptedOutputFile.keyMetadata();
switch (fileFormat) {
case AVRO:
@@ -120,16 +124,38 @@ class RecordWriter {
}
public void close() throws IOException {
+ IOException closeError = null;
try {
icebergDataWriter.close();
} catch (IOException e) {
- throw new IOException(
- String.format(
- "Failed to close %s writer for table %s, path: %s",
- fileFormat, table.name(), absoluteFilename),
- e);
+ closeError =
+ new IOException(
+ String.format(
+ "Failed to close %s writer for table %s, path: %s",
+ fileFormat, table.name(), absoluteFilename),
+ e);
+ } finally {
+ // Always attempt to close FileIO and decrement metrics
+ if (io != null) {
+ try {
+ io.close();
+ } catch (Exception ioCloseError) {
+ if (closeError != null) {
+ closeError.addSuppressed(ioCloseError);
+ } else {
+ closeError = new IOException("Failed to close FileIO",
ioCloseError);
+ }
+ } finally {
+ io = null;
+ }
+ }
+ activeIcebergWriters.dec();
+ }
+
+ if (closeError != null) {
+ throw closeError;
}
- activeIcebergWriters.dec();
+
DataFile dataFile = icebergDataWriter.toDataFile();
LOG.info(
"Closed {} writer for table '{}' ({} records, {} bytes), path: {}",
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
index 7bce0b16cb1..375d9073711 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.values.Row;
@@ -65,6 +66,10 @@ import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
@@ -83,6 +88,7 @@ import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
/** Test class for {@link RecordWriterManager}. */
@RunWith(JUnit4.class)
@@ -950,6 +956,105 @@ public class RecordWriterManagerTest {
}
}
+ @Test
+ public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
+ TableIdentifier tableId =
+ TableIdentifier.of(
+ "default",
+ "table_"
+ + testName.getMethodName()
+ + "_"
+ + UUID.randomUUID().toString().replace("-", "").substring(0,
6));
+ Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA);
+
+ CloseTrackingFileIO trackingFileIO = new CloseTrackingFileIO(table.io());
+ Table spyTable = Mockito.spy(table);
+ Mockito.doReturn(trackingFileIO).when(spyTable).io();
+
+ PartitionKey partitionKey = new PartitionKey(spyTable.spec(),
spyTable.schema());
+ RecordWriter writer =
+ new RecordWriter(spyTable, FileFormat.PARQUET, "file.parquet",
partitionKey);
+
+ Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+
+ writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
+ writer.close();
+
+ assertTrue("FileIO should be closed after writer close",
trackingFileIO.closed);
+ }
+
+ private static final class CloseTrackingFileIO implements FileIO {
+ private final FileIO delegate;
+ volatile boolean closed = false;
+
+ CloseTrackingFileIO(FileIO delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return delegate.newInputFile(path);
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ OutputFile underlying = delegate.newOutputFile(path);
+ return new CloseAwareOutputFile(underlying, this);
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ delegate.deleteFile(path);
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return delegate.properties();
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ delegate.close();
+ }
+ }
+
+ private static final class CloseAwareOutputFile implements OutputFile {
+ private final OutputFile delegate;
+ private final CloseTrackingFileIO io;
+
+ CloseAwareOutputFile(OutputFile delegate, CloseTrackingFileIO io) {
+ this.delegate = delegate;
+ this.io = io;
+ }
+
+ @Override
+ public PositionOutputStream create() {
+ if (io.closed) {
+ throw new IllegalStateException("Connection pool shut down");
+ }
+ return delegate.create();
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite() {
+ if (io.closed) {
+ throw new IllegalStateException("Connection pool shut down");
+ }
+ return delegate.createOrOverwrite();
+ }
+
+ @Override
+ public String location() {
+ return delegate.location();
+ }
+
+ @Override
+ public InputFile toInputFile() {
+ return delegate.toInputFile();
+ }
+ }
+
@Test
public void testGetOrCreateTable_refreshLogic() {
Table mockTable = mock(Table.class);