This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new fc981b499a Flink: Log on cache refresh in dynamic sink (#14792)
fc981b499a is described below

commit fc981b499a09cef352f123daa70668d765dfe282
Author: aiborodin <[email protected]>
AuthorDate: Fri Dec 12 05:33:05 2025 +1100

    Flink: Log on cache refresh in dynamic sink (#14792)
---
 .../flink/sink/dynamic/TableMetadataCache.java     | 30 ++++++++++++++++------
 1 file changed, 22 insertions(+), 8 deletions(-)

diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
index e790d9a929..3be8bbcd91 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
@@ -79,7 +79,7 @@ class TableMetadataCache {
     CacheItem cached = tableCache.get(identifier);
     if (cached != null && Boolean.TRUE.equals(cached.tableExists)) {
       return EXISTS;
-    } else if (needsRefresh(cached, true)) {
+    } else if (needsRefresh(identifier, cached, true)) {
       return refreshTable(identifier);
     } else {
       return NOT_EXISTS;
@@ -116,7 +116,7 @@ class TableMetadataCache {
       return branch;
     }
 
-    if (needsRefresh(cached, allowRefresh)) {
+    if (needsRefresh(identifier, cached, allowRefresh)) {
       refreshTable(identifier);
       return branch(identifier, branch, false);
     } else {
@@ -156,7 +156,7 @@ class TableMetadataCache {
       }
     }
 
-    if (needsRefresh(cached, allowRefresh)) {
+    if (needsRefresh(identifier, cached, allowRefresh)) {
       refreshTable(identifier);
       return schema(identifier, input, false, dropUnusedColumns);
     } else if (compatible != null) {
@@ -186,7 +186,7 @@ class TableMetadataCache {
       }
     }
 
-    if (needsRefresh(cached, allowRefresh)) {
+    if (needsRefresh(identifier, cached, allowRefresh)) {
       refreshTable(identifier);
       return spec(identifier, spec, false);
     } else {
@@ -207,10 +207,24 @@ class TableMetadataCache {
     }
   }
 
-  private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
-    return allowRefresh
-        && (cacheItem == null
-            || cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > 
refreshMs);
+  private boolean needsRefresh(
+      TableIdentifier identifier, CacheItem cacheItem, boolean allowRefresh) {
+    if (!allowRefresh) {
+      return false;
+    }
+
+    if (cacheItem == null) {
+      return true;
+    }
+
+    long nowMillis = cacheRefreshClock.millis();
+    long timeElapsedMillis = nowMillis - cacheItem.createdTimestampMillis;
+    if (timeElapsedMillis > refreshMs) {
+      LOG.info("Refreshing table metadata for {} after {} millis", identifier, 
timeElapsedMillis);
+      return true;
+    }
+
+    return false;
   }
 
   public void invalidate(TableIdentifier identifier) {

Reply via email to