ahmedabu98 commented on code in PR #37782:
URL: https://github.com/apache/beam/pull/37782#discussion_r2896013329


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java:
##########
@@ -74,11 +71,9 @@ class RecordWriter {
     }
     OutputFile outputFile;
     EncryptionKeyMetadata keyMetadata;
-    // Keep FileIO open for the lifetime of this writer to avoid
-    // premature shutdown of underlying client pools (e.g., S3),
-    // which manifests as "Connection pool shut down" (Issue #36438).
-    this.io = table.io();
-    OutputFile tmpFile = io.newOutputFile(absoluteFilename);
+    // table.io() returns the catalog's shared FileIO instance.

Review Comment:
   nit to avoid confusion
   ```suggestion
       // table.io() returns the table's shared FileIO instance.
   ```



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java:
##########
@@ -403,33 +406,50 @@ public boolean write(WindowedValue<IcebergDestination> 
icebergDestination, Row r
    */
   @Override
   public void close() throws IOException {
-    for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
-        windowedDestinationAndState : destinations.entrySet()) {
-      DestinationState state = windowedDestinationAndState.getValue();
+    try {
+      for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
+          windowedDestinationAndState : destinations.entrySet()) {
+        DestinationState state = windowedDestinationAndState.getValue();
 
-      // removing writers from the state's cache will trigger the logic to 
collect each writer's
-      // data file.
-      state.writers.invalidateAll();
-      // first check for any exceptions swallowed by the cache
-      if (!state.exceptions.isEmpty()) {
-        IllegalStateException exception =
-            new IllegalStateException(
-                String.format("Encountered %s failed writer(s).", 
state.exceptions.size()));
-        for (Exception e : state.exceptions) {
-          exception.addSuppressed(e);
+        // removing writers from the state's cache will trigger the logic to 
collect each writer's
+        // data file.
+        state.writers.invalidateAll();
+        // first check for any exceptions swallowed by the cache
+        if (!state.exceptions.isEmpty()) {
+          IllegalStateException exception =
+              new IllegalStateException(
+                  String.format("Encountered %s failed writer(s).", 
state.exceptions.size()));
+          for (Exception e : state.exceptions) {
+            exception.addSuppressed(e);
+          }
+          throw exception;
         }
-        throw exception;
-      }
 
-      if (state.dataFiles.isEmpty()) {
-        continue;
-      }
+        if (state.dataFiles.isEmpty()) {
+          continue;
+        }
 
-      totalSerializableDataFiles.put(
-          windowedDestinationAndState.getKey(), new 
ArrayList<>(state.dataFiles));
-      state.dataFiles.clear();
+        totalSerializableDataFiles.put(
+            windowedDestinationAndState.getKey(), new 
ArrayList<>(state.dataFiles));
+        state.dataFiles.clear();
+      }
+    } finally {
+      // Close unique FileIO instances now that all writers are done.
+      // table.io() returns the catalog's shared FileIO; we deduplicate by 
identity

Review Comment:
   nit to avoid confusion
   ```suggestion
         // table.io() returns the table's shared FileIO; we deduplicate by 
identity
   ```



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -980,7 +981,86 @@ public void testRecordWriterKeepsFileIOOpenUntilClose() 
throws IOException {
     writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
     writer.close();
 
-    assertTrue("FileIO should be closed after writer close", 
trackingFileIO.closed);
+    // RecordWriter must NOT close FileIO — it's the catalog's shared instance.
+    assertFalse("RecordWriter.close() must not close the shared FileIO", 
trackingFileIO.closed);
+  }
+
+  /**
+   * Verifies that when multiple writers share the same FileIO, closing any 
writer does not close
+   * the shared FileIO — that is the responsibility of 
RecordWriterManager.close().
+   */
+  @Test
+  public void testMultipleWritersSharingFileIOSurviveBatchClose() throws 
IOException {
+    // Create two tables that share the same FileIO (simulating dynamic 
destinations
+    // backed by the same catalog)
+    TableIdentifier tableId1 =
+        TableIdentifier.of(
+            "default",
+            "table_batch_close_a_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6));
+    TableIdentifier tableId2 =
+        TableIdentifier.of(
+            "default",
+            "table_batch_close_b_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6));
+    Table table1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA);
+    Table table2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA);
+
+    // Both tables share the same CloseTrackingFileIO — mirrors how a catalog 
returns
+    // the same shared FileIO instance for all tables
+    CloseTrackingFileIO sharedFileIO = new CloseTrackingFileIO(table1.io());
+    Table spyTable1 = Mockito.spy(table1);
+    Table spyTable2 = Mockito.spy(table2);
+    Mockito.doReturn(sharedFileIO).when(spyTable1).io();
+    Mockito.doReturn(sharedFileIO).when(spyTable2).io();
+
+    PartitionKey pk1 = new PartitionKey(spyTable1.spec(), spyTable1.schema());
+    PartitionKey pk2 = new PartitionKey(spyTable2.spec(), spyTable2.schema());
+
+    RecordWriter writer1 = new RecordWriter(spyTable1, FileFormat.PARQUET, 
"file1.parquet", pk1);
+    RecordWriter writer2 = new RecordWriter(spyTable2, FileFormat.PARQUET, 
"file2.parquet", pk2);
+
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    Record record = IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row);
+
+    writer1.write(record);
+    writer2.write(record);
+
+    writer1.close();
+    assertFalse("FileIO must remain open between batch writer closes", 
sharedFileIO.closed);
+
+    writer2.close();
+    assertFalse("FileIO must remain open after all writers close", 
sharedFileIO.closed);
+
+    // Both writers produced valid data files
+    assertNotNull(writer1.getDataFile());
+    assertNotNull(writer2.getDataFile());
+  }
+
+  /**
+   * Verifies that RecordWriterManager.close() successfully flushes data files 
from multiple
+   * destinations.
+   */
+  @Test
+  public void testRecordWriterManagerCloseFlushesAllDestinations() throws 
IOException {
+    String tableName1 =
+        "table_mgr_io_a_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6);
+    String tableName2 =
+        "table_mgr_io_b_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6);
+    WindowedValue<IcebergDestination> dest1 = 
getWindowedDestination(tableName1, null);
+    WindowedValue<IcebergDestination> dest2 = 
getWindowedDestination(tableName2, null);
+
+    RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
+
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    assertTrue(writerManager.write(dest1, row));
+    assertTrue(writerManager.write(dest2, row));
+    assertEquals(2, writerManager.openWriters);
+
+    writerManager.close();

Review Comment:
   Can we add a check that verifies each table's FileIO is closed successfully?



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -980,7 +981,86 @@ public void testRecordWriterKeepsFileIOOpenUntilClose() 
throws IOException {
     writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
     writer.close();
 
-    assertTrue("FileIO should be closed after writer close", 
trackingFileIO.closed);
+    // RecordWriter must NOT close FileIO — it's the catalog's shared instance.
+    assertFalse("RecordWriter.close() must not close the shared FileIO", 
trackingFileIO.closed);
+  }
+
+  /**
+   * Verifies that when multiple writers share the same FileIO, closing any 
writer does not close
+   * the shared FileIO — that is the responsibility of 
RecordWriterManager.close().
+   */
+  @Test
+  public void testMultipleWritersSharingFileIOSurviveBatchClose() throws 
IOException {
+    // Create two tables that share the same FileIO (simulating dynamic 
destinations
+    // backed by the same catalog)
+    TableIdentifier tableId1 =
+        TableIdentifier.of(
+            "default",
+            "table_batch_close_a_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6));
+    TableIdentifier tableId2 =
+        TableIdentifier.of(
+            "default",
+            "table_batch_close_b_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6));
+    Table table1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA);
+    Table table2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA);
+
+    // Both tables share the same CloseTrackingFileIO — mirrors how a catalog 
returns
+    // the same shared FileIO instance for all tables

Review Comment:
   Actually I think each table has its own FileIO instance



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -980,7 +981,86 @@ public void testRecordWriterKeepsFileIOOpenUntilClose() 
throws IOException {
     writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
     writer.close();
 
-    assertTrue("FileIO should be closed after writer close", 
trackingFileIO.closed);
+    // RecordWriter must NOT close FileIO — it's the catalog's shared instance.

Review Comment:
   another nit to avoid confusion
   ```suggestion
       // RecordWriter must NOT close FileIO — it's the table's shared instance.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to