ahmedabu98 commented on code in PR #33504: URL: https://github.com/apache/beam/pull/33504#discussion_r2005256219
########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java: ########## @@ -17,44 +17,62 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListenableFuture; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; -import org.checkerframework.checker.nullness.qual.Nullable; /** Utility to fetch and cache Iceberg {@link Table}s. */ class TableCache { - private static final Cache<TableIdentifier, Table> CACHE = - CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES).build(); + private static final Map<String, Catalog> CATALOG_CACHE = new ConcurrentHashMap<>(); + private static final LoadingCache<String, Table> INTERNAL_CACHE = + CacheBuilder.newBuilder() + .expireAfterAccess(1, TimeUnit.HOURS) + .refreshAfterWrite(3, TimeUnit.MINUTES) + .build( + new CacheLoader<String, Table>() { + @Override + public Table load(String identifier) { + return checkStateNotNull(CATALOG_CACHE.get(identifier)) + .loadTable(TableIdentifier.parse(identifier)); + } + + @Override + public ListenableFuture<Table> reload(String unusedIdentifier, Table table) { + table.refresh(); + return Futures.immediateFuture(table); + } + });; - static Table get(TableIdentifier identifier, Catalog catalog) { + static Table get(String identifier) { + checkStateNotNull(INTERNAL_CACHE, "Please call TableCache.setup() first."); try { - return CACHE.get(identifier, () -> catalog.loadTable(identifier)); + return INTERNAL_CACHE.get(identifier); } catch (ExecutionException e) { throw new RuntimeException( "Encountered a problem fetching table " + identifier + " from cache.", e); } } - static Table get(String identifier, Catalog catalog) { - return get(TableIdentifier.parse(identifier), catalog); - } - - static Table getRefreshed(TableIdentifier identifier, Catalog catalog) { - @Nullable Table table = CACHE.getIfPresent(identifier); - if (table == null) { - return get(identifier, catalog); - } - table.refresh(); - CACHE.put(identifier, table); - return table; + /** Forces a table refresh and returns. */ + static Table getRefreshed(String identifier) { + checkStateNotNull(INTERNAL_CACHE, "Please call TableCache.setup() first."); + INTERNAL_CACHE.refresh(identifier); + return get(identifier); } - static Table getRefreshed(String identifier, Catalog catalog) { - return getRefreshed(TableIdentifier.parse(identifier), catalog); + static void setup(IcebergScanConfig scanConfig) { + CATALOG_CACHE.putIfAbsent( + scanConfig.getTableIdentifier(), scanConfig.getCatalogConfig().catalog()); Review Comment: Yep good idea will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org