This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f8075337814 Refresh Iceberg Table metadata every 2 minutes instead of
before each intermediate file written (#37102)
f8075337814 is described below
commit f8075337814fd80fc33dbe5ebf741c82a71af6c6
Author: JayajP <[email protected]>
AuthorDate: Tue Dec 16 08:57:16 2025 -0800
Refresh Iceberg Table metadata every 2 minutes instead of before each
intermediate file written (#37102)
* Initial implementation
* Swap back to vendored guava
* Update staleness threshold to 2 minutes to match pr title
---
.../beam/sdk/io/iceberg/RecordWriterManager.java | 69 +++++++++++++++++-----
.../sdk/io/iceberg/RecordWriterManagerTest.java | 64 +++++++++++++++++++-
2 files changed, 115 insertions(+), 18 deletions(-)
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
index 6ddd943eb19..da62fb65884 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
@@ -21,6 +21,8 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.time.ZoneOffset;
@@ -135,7 +137,8 @@ class RecordWriterManager implements AutoCloseable {
RuntimeException rethrow =
new RuntimeException(
String.format(
- "Encountered an error when closing data
writer for table '%s', path: %s",
+ "Encountered an error when closing data
writer for table '%s',"
+ + " path: %s",
icebergDestination.getTableIdentifier(),
recordWriter.path()),
e);
exceptions.add(rethrow);
@@ -256,8 +259,40 @@ class RecordWriterManager implements AutoCloseable {
private final Map<WindowedValue<IcebergDestination>,
List<SerializableDataFile>>
totalSerializableDataFiles = Maps.newHashMap();
+ static final class LastRefreshedTable {
+ final Table table;
+ volatile Instant lastRefreshTime;
+ static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(2);
+
+ LastRefreshedTable(Table table, Instant lastRefreshTime) {
+ this.table = table;
+ this.lastRefreshTime = lastRefreshTime;
+ }
+
+ /**
+ * Refreshes the table metadata if it is considered stale (older than 2
minutes).
+ *
+ * <p>This method first performs a non-synchronized check on the table's
freshness. This
+ * provides a lock-free fast path that avoids synchronization overhead in
the common case where
+ * the table does not need to be refreshed. If the table might be stale,
it then enters a
+ * synchronized block to ensure that only one thread performs the refresh
operation.
+ */
+ void refreshIfStale() {
+ // Fast path: Avoid entering the synchronized block if the table is not
stale.
+ if (lastRefreshTime.isAfter(Instant.now().minus(STALENESS_THRESHOLD))) {
+ return;
+ }
+ synchronized (this) {
+ if
(lastRefreshTime.isBefore(Instant.now().minus(STALENESS_THRESHOLD))) {
+ table.refresh();
+ lastRefreshTime = Instant.now();
+ }
+ }
+ }
+ }
+
@VisibleForTesting
- static final Cache<TableIdentifier, Table> TABLE_CACHE =
+ static final Cache<TableIdentifier, LastRefreshedTable>
LAST_REFRESHED_TABLE_CACHE =
CacheBuilder.newBuilder().expireAfterAccess(10,
TimeUnit.MINUTES).build();
private boolean isClosed = false;
@@ -272,22 +307,22 @@ class RecordWriterManager implements AutoCloseable {
/**
* Returns an Iceberg {@link Table}.
*
- * <p>First attempts to fetch the table from the {@link #TABLE_CACHE}. If
it's not there, we
- * attempt to load it using the Iceberg API. If the table doesn't exist at
all, we attempt to
- * create it, inferring the table schema from the record schema.
+ * <p>First attempts to fetch the table from the {@link
#LAST_REFRESHED_TABLE_CACHE}. If it's not
+ * there, we attempt to load it using the Iceberg API. If the table doesn't
exist at all, we
+ * attempt to create it, inferring the table schema from the record schema.
*
* <p>Note that this is a best-effort operation that depends on the {@link
Catalog}
* implementation. Although it is expected, some implementations may not
support creating a table
* using the Iceberg API.
*/
- private Table getOrCreateTable(IcebergDestination destination, Schema
dataSchema) {
+ @VisibleForTesting
+ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
TableIdentifier identifier = destination.getTableIdentifier();
- @Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
- if (table != null) {
- // If fetching from cache, refresh the table to avoid working with stale
metadata
- // (e.g. partition spec)
- table.refresh();
- return table;
+ @Nullable
+ LastRefreshedTable lastRefreshedTable =
LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier);
+ if (lastRefreshedTable != null && lastRefreshedTable.table != null) {
+ lastRefreshedTable.refreshIfStale();
+ return lastRefreshedTable.table;
}
Namespace namespace = identifier.namespace();
@@ -299,7 +334,8 @@ class RecordWriterManager implements AutoCloseable {
? createConfig.getTableProperties()
: Maps.newHashMap();
- synchronized (TABLE_CACHE) {
+ @Nullable Table table = null;
+ synchronized (LAST_REFRESHED_TABLE_CACHE) {
// Create namespace if it does not exist yet
if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) {
SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog;
@@ -323,7 +359,8 @@ class RecordWriterManager implements AutoCloseable {
try {
table = catalog.createTable(identifier, tableSchema, partitionSpec,
tableProperties);
LOG.info(
- "Created Iceberg table '{}' with schema: {}\n, partition spec:
{}, table properties: {}",
+ "Created Iceberg table '{}' with schema: {}\n"
+ + ", partition spec: {}, table properties: {}",
identifier,
tableSchema,
partitionSpec,
@@ -334,8 +371,8 @@ class RecordWriterManager implements AutoCloseable {
}
}
}
-
- TABLE_CACHE.put(identifier, table);
+ lastRefreshedTable = new LastRefreshedTable(table, Instant.now());
+ LAST_REFRESHED_TABLE_CACHE.put(identifier, lastRefreshedTable);
return table;
}
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
index 36b74967f0b..7bce0b16cb1 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
@@ -28,10 +28,16 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@@ -102,7 +108,7 @@ public class RecordWriterManagerTest {
windowedDestination =
getWindowedDestination("table_" + testName.getMethodName(),
PARTITION_SPEC);
catalog = new HadoopCatalog(new Configuration(), warehouse.location);
- RecordWriterManager.TABLE_CACHE.invalidateAll();
+ RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.invalidateAll();
}
private WindowedValue<IcebergDestination> getWindowedDestination(
@@ -451,10 +457,15 @@ public class RecordWriterManagerTest {
assertThat(dataFile.path().toString(), containsString("bool=true"));
// table is cached
- assertEquals(1, RecordWriterManager.TABLE_CACHE.size());
+ assertEquals(1, RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.size());
// update spec
table.updateSpec().addField("id").removeField("bool").commit();
+ // Make the cached table stale to force reloading its metadata.
+ RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.getIfPresent(
+ windowedDestination.getValue().getTableIdentifier())
+ .lastRefreshTime =
+ Instant.EPOCH;
// write a second data file
// should refresh the table and use the new partition spec
@@ -938,4 +949,53 @@ public class RecordWriterManagerTest {
}
}
}
+
+ @Test
+ public void testGetOrCreateTable_refreshLogic() {
+ Table mockTable = mock(Table.class);
+ TableIdentifier identifier = TableIdentifier.of("db", "table");
+ IcebergDestination destination =
+ IcebergDestination.builder()
+ .setTableIdentifier(identifier)
+ .setFileFormat(FileFormat.PARQUET)
+ .setTableCreateConfig(
+ IcebergTableCreateConfig.builder()
+ .setPartitionFields(null)
+ .setSchema(BEAM_SCHEMA)
+ .build())
+ .build();
+ // The schema is only used if the table is created, so a null is fine for
this
+ // test.
+ Schema beamSchema = null;
+
+ // Instantiate a RecordWriterManager with a dummy catalog.
+ RecordWriterManager writer = new RecordWriterManager(null, "p", 1L, 1);
+
+ // Clean up cache before test
+ RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.invalidateAll();
+
+ // --- 1. Test the fast path (entry is not stale) ---
+ Instant freshTimestamp = Instant.now().minus(Duration.ofMinutes(1));
+ RecordWriterManager.LastRefreshedTable freshEntry =
+ new RecordWriterManager.LastRefreshedTable(mockTable, freshTimestamp);
+ RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.put(identifier, freshEntry);
+
+ // Access the table
+ writer.getOrCreateTable(destination, beamSchema);
+
+ // Verify that refresh() was NOT called because the entry is fresh.
+ verify(mockTable, never()).refresh();
+
+ // --- 2. Test the stale path (entry is stale) ---
+ Instant staleTimestamp = Instant.now().minus(Duration.ofMinutes(5));
+ RecordWriterManager.LastRefreshedTable staleEntry =
+ new RecordWriterManager.LastRefreshedTable(mockTable, staleTimestamp);
+ RecordWriterManager.LAST_REFRESHED_TABLE_CACHE.put(identifier, staleEntry);
+
+ // Access the table again
+ writer.getOrCreateTable(destination, beamSchema);
+
+ // Verify that refresh() WAS called exactly once because the entry was
stale.
+ verify(mockTable, times(1)).refresh();
+ }
}