This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f8075337814 Refresh Iceberg Table metadata every 2 minutes instead of 
before each intermediate file written (#37102)
f8075337814 is described below

commit f8075337814fd80fc33dbe5ebf741c82a71af6c6
Author: JayajP <[email protected]>
AuthorDate: Tue Dec 16 08:57:16 2025 -0800

    Refresh Iceberg Table metadata every 2 minutes instead of before each 
intermediate file written (#37102)
    
    * Initial implementation
    
    * Swap back to vendored guava
    
    * Update staleness threshold to 2 minutes to match pr title
---
 .../beam/sdk/io/iceberg/RecordWriterManager.java   | 69 +++++++++++++++++-----
 .../sdk/io/iceberg/RecordWriterManagerTest.java    | 64 +++++++++++++++++++-
 2 files changed, 115 insertions(+), 18 deletions(-)

diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
index 6ddd943eb19..da62fb65884 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
@@ -21,6 +21,8 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.YearMonth;
 import java.time.ZoneOffset;
@@ -135,7 +137,8 @@ class RecordWriterManager implements AutoCloseable {
                       RuntimeException rethrow =
                           new RuntimeException(
                               String.format(
-                                  "Encountered an error when closing data 
writer for table '%s', path: %s",
+                                  "Encountered an error when closing data 
writer for table '%s',"
+                                      + " path: %s",
                                   icebergDestination.getTableIdentifier(), 
recordWriter.path()),
                               e);
                       exceptions.add(rethrow);
@@ -256,8 +259,40 @@ class RecordWriterManager implements AutoCloseable {
   private final Map<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>>
       totalSerializableDataFiles = Maps.newHashMap();
 
+  static final class LastRefreshedTable {
+    final Table table;
+    volatile Instant lastRefreshTime;
+    static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(2);
+
+    LastRefreshedTable(Table table, Instant lastRefreshTime) {
+      this.table = table;
+      this.lastRefreshTime = lastRefreshTime;
+    }
+
+    /**
+     * Refreshes the table metadata if it is considered stale (older than 2 
minutes).
+     *
+     * <p>This method first performs a non-synchronized check on the table's 
freshness. This
+     * provides a lock-free fast path that avoids synchronization overhead in 
the common case where
+     * the table does not need to be refreshed. If the table might be stale, 
it then enters a
+     * synchronized block to ensure that only one thread performs the refresh 
operation.
+     */
+    void refreshIfStale() {
+      // Fast path: Avoid entering the synchronized block if the table is not 
stale.
+      if (lastRefreshTime.isAfter(Instant.now().minus(STALENESS_THRESHOLD))) {
+        return;
+      }
+      synchronized (this) {
+        if 
(lastRefreshTime.isBefore(Instant.now().minus(STALENESS_THRESHOLD))) {
+          table.refresh();
+          lastRefreshTime = Instant.now();
+        }
+      }
+    }
+  }
+
   @VisibleForTesting
-  static final Cache<TableIdentifier, Table> TABLE_CACHE =
+  static final Cache<TableIdentifier, LastRefreshedTable> 
LAST_REFRESHED_TABLE_CACHE =
       CacheBuilder.newBuilder().expireAfterAccess(10, 
TimeUnit.MINUTES).build();
 
   private boolean isClosed = false;
@@ -272,22 +307,22 @@ class RecordWriterManager implements AutoCloseable {
   /**
    * Returns an Iceberg {@link Table}.
    *
-   * <p>First attempts to fetch the table from the {@link #TABLE_CACHE}. If 
it's not there, we
-   * attempt to load it using the Iceberg API. If the table doesn't exist at 
all, we attempt to
-   * create it, inferring the table schema from the record schema.
+   * <p>First attempts to fetch the table from the {@link 
#LAST_REFRESHED_TABLE_CACHE}. If it's not
+   * there, we attempt to load it using the Iceberg API. If the table doesn't 
exist at all, we
+   * attempt to create it, inferring the table schema from the record schema.
    *
    * <p>Note that this is a best-effort operation that depends on the {@link 
Catalog}
    * implementation. Although it is expected, some implementations may not 
support creating a table
    * using the Iceberg API.
    */
-  private Table getOrCreateTable(IcebergDestination destination, Schema 
dataSchema) {
+  @VisibleForTesting
+  Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
     TableIdentifier identifier = destination.getTableIdentifier();
-    @Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
-    if (table != null) {
-      // If fetching from cache, refresh the table to avoid working with stale 
metadata
-      // (e.g. partition spec)
-      table.refresh();
-      return table;
+    @Nullable
+    LastRefreshedTable lastRefreshedTable = 
LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier);
+    if (lastRefreshedTable != null && lastRefreshedTable.table != null) {
+      lastRefreshedTable.refreshIfStale();
+      return lastRefreshedTable.table;
     }
 
     Namespace namespace = identifier.namespace();
@@ -299,7 +334,8 @@ class RecordWriterManager implements AutoCloseable {
             ? createConfig.getTableProperties()
             : Maps.newHashMap();
 
-    synchronized (TABLE_CACHE) {
+    @Nullable Table table = null;
+    synchronized (LAST_REFRESHED_TABLE_CACHE) {
       // Create namespace if it does not exist yet
       if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) {
         SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog;
@@ -323,7 +359,8 @@ class RecordWriterManager implements AutoCloseable {
         try {
           table = catalog.createTable(identifier, tableSchema, partitionSpec, 
tableProperties);
           LOG.info(
-              "Created Iceberg table '{}' with schema: {}\n, partition spec: 
{}, table properties: {}",
+              "Created Iceberg table '{}' with schema: {}\n"
+                  + ", partition spec: {}, table properties: {}",
               identifier,
               tableSchema,
               partitionSpec,
@@ -334,8 +371,8 @@ class RecordWriterManager implements AutoCloseable {
         }
       }
     }
-
-    TABLE_CACHE.put(identifier, table);
+    lastRefreshedTable = new LastRefreshedTable(table, Instant.now());
+    LAST_REFRESHED_TABLE_CACHE.put(identifier, lastRefreshedTable);
     return table;
   }
 
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
index 36b74967f0b..7bce0b16cb1 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
@@ -28,10 +28,16 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.net.URLEncoder;
 import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
@@ -102,7 +108,7 @@ public class RecordWriterManagerTest {
     windowedDestination =
         getWindowedDestination("table_" + testName.getMethodName(), 
PARTITION_SPEC);
     catalog = new HadoopCatalog(new Configuration(), warehouse.location);
-    RecordWriterManager.TABLE_CACHE.invalidateAll();
+    RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.invalidateAll();
   }
 
   private WindowedValue<IcebergDestination> getWindowedDestination(
@@ -451,10 +457,15 @@ public class RecordWriterManagerTest {
     assertThat(dataFile.path().toString(), containsString("bool=true"));
 
     // table is cached
-    assertEquals(1, RecordWriterManager.TABLE_CACHE.size());
+    assertEquals(1, RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.size());
 
     // update spec
     table.updateSpec().addField("id").removeField("bool").commit();
+    // Make the cached table stale to force reloading its metadata.
+    RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.getIfPresent(
+                windowedDestination.getValue().getTableIdentifier())
+            .lastRefreshTime =
+        Instant.EPOCH;
 
     // write a second data file
     // should refresh the table and use the new partition spec
@@ -938,4 +949,53 @@ public class RecordWriterManagerTest {
       }
     }
   }
+
+  @Test
+  public void testGetOrCreateTable_refreshLogic() {
+    Table mockTable = mock(Table.class);
+    TableIdentifier identifier = TableIdentifier.of("db", "table");
+    IcebergDestination destination =
+        IcebergDestination.builder()
+            .setTableIdentifier(identifier)
+            .setFileFormat(FileFormat.PARQUET)
+            .setTableCreateConfig(
+                IcebergTableCreateConfig.builder()
+                    .setPartitionFields(null)
+                    .setSchema(BEAM_SCHEMA)
+                    .build())
+            .build();
+    // The schema is only used if the table is created, so a null is fine for 
this
+    // test.
+    Schema beamSchema = null;
+
+    // Instantiate a RecordWriterManager with a dummy catalog.
+    RecordWriterManager writer = new RecordWriterManager(null, "p", 1L, 1);
+
+    // Clean up cache before test
+    RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.invalidateAll();
+
+    // --- 1. Test the fast path (entry is not stale) ---
+    Instant freshTimestamp = Instant.now().minus(Duration.ofMinutes(1));
+    RecordWriterManager.LastRefreshedTable freshEntry =
+        new RecordWriterManager.LastRefreshedTable(mockTable, freshTimestamp);
+    RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.put(identifier, freshEntry);
+
+    // Access the table
+    writer.getOrCreateTable(destination, beamSchema);
+
+    // Verify that refresh() was NOT called because the entry is fresh.
+    verify(mockTable, never()).refresh();
+
+    // --- 2. Test the stale path (entry is stale) ---
+    Instant staleTimestamp = Instant.now().minus(Duration.ofMinutes(5));
+    RecordWriterManager.LastRefreshedTable staleEntry =
+        new RecordWriterManager.LastRefreshedTable(mockTable, staleTimestamp);
+    RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.put(identifier, staleEntry);
+
+    // Access the table again
+    writer.getOrCreateTable(destination, beamSchema);
+
+    // Verify that refresh() WAS called exactly once because the entry was 
stale.
+    verify(mockTable, times(1)).refresh();
+  }
 }

Reply via email to