ahmedabu98 commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r2005256219


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java:
##########
@@ -17,44 +17,62 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Utility to fetch and cache Iceberg {@link Table}s. */
 class TableCache {
-  private static final Cache<TableIdentifier, Table> CACHE =
-      CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES).build();
+  private static final Map<String, Catalog> CATALOG_CACHE = new 
ConcurrentHashMap<>();
+  private static final LoadingCache<String, Table> INTERNAL_CACHE =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(1, TimeUnit.HOURS)
+          .refreshAfterWrite(3, TimeUnit.MINUTES)
+          .build(
+              new CacheLoader<String, Table>() {
+                @Override
+                public Table load(String identifier) {
+                  return checkStateNotNull(CATALOG_CACHE.get(identifier))
+                      .loadTable(TableIdentifier.parse(identifier));
+                }
+
+                @Override
+                public ListenableFuture<Table> reload(String unusedIdentifier, 
Table table) {
+                  table.refresh();
+                  return Futures.immediateFuture(table);
+                }
+              });;
 
-  static Table get(TableIdentifier identifier, Catalog catalog) {
+  static Table get(String identifier) {
+    checkStateNotNull(INTERNAL_CACHE, "Please call TableCache.setup() first.");
     try {
-      return CACHE.get(identifier, () -> catalog.loadTable(identifier));
+      return INTERNAL_CACHE.get(identifier);
     } catch (ExecutionException e) {
       throw new RuntimeException(
           "Encountered a problem fetching table " + identifier + " from 
cache.", e);
     }
   }
 
-  static Table get(String identifier, Catalog catalog) {
-    return get(TableIdentifier.parse(identifier), catalog);
-  }
-
-  static Table getRefreshed(TableIdentifier identifier, Catalog catalog) {
-    @Nullable Table table = CACHE.getIfPresent(identifier);
-    if (table == null) {
-      return get(identifier, catalog);
-    }
-    table.refresh();
-    CACHE.put(identifier, table);
-    return table;
+  /** Forces a table refresh and returns. */
+  static Table getRefreshed(String identifier) {
+    checkStateNotNull(INTERNAL_CACHE, "Please call TableCache.setup() first.");
+    INTERNAL_CACHE.refresh(identifier);
+    return get(identifier);
   }
 
-  static Table getRefreshed(String identifier, Catalog catalog) {
-    return getRefreshed(TableIdentifier.parse(identifier), catalog);
+  static void setup(IcebergScanConfig scanConfig) {
+    CATALOG_CACHE.putIfAbsent(
+        scanConfig.getTableIdentifier(), 
scanConfig.getCatalogConfig().catalog());

Review Comment:
   Yep good idea will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to