kbendick commented on a change in pull request #3543:
URL: https://github.com/apache/iceberg/pull/3543#discussion_r748672693



##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -20,33 +20,143 @@
 package org.apache.iceberg;
 
 import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.CacheWriter;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
keyLoggingRemovalListener =
+      (key, value, cause) -> LOG.info("Expired {} from the TableCache", key);

Review comment:
       I was using this mostly for debugging. I think it might make a good 
debug level log in case people notice some behavior and think it might be 
attributable to this. But I'm happy to remove it entirely too.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -20,33 +20,143 @@
 package org.apache.iceberg;
 
 import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.CacheWriter;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
keyLoggingRemovalListener =
+      (key, value, cause) -> LOG.info("Expired {} from the TableCache", key);
+
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, false, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, boolean expirationEnabled, long 
expirationIntervalMilllis) {
+    return wrap(catalog, true, expirationEnabled, expirationIntervalMilllis);
+  }
+
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, boolean 
expirationEnabled,
+      long expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, expirationEnabled, 
expirationIntervalMillis);
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  @VisibleForTesting
+  static Catalog wrap(Catalog catalog, boolean expirationEnabled, long 
expirationIntervalMillis, Ticker ticker) {
+    return new CachingCatalog(catalog, true, expirationEnabled, 
expirationIntervalMillis, ticker);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  private final long expirationIntervalMillis;
+  private final Cache<TableIdentifier, Table> tableCache;
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, boolean 
isExpirationEnabled,
+      long expirationIntervalInMillis) {
+    this(catalog, caseSensitive, isExpirationEnabled, 
expirationIntervalInMillis, Ticker.systemTicker());
+  }
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, boolean 
isExpirationEnabled,
+      long expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    this.expirationEnabled = isExpirationEnabled;
+    this.expirationIntervalMillis = expirationIntervalMillis;
+
+    this.tableCache = createTableCache(ticker);
+  }
+
+  /**
+   * Return the age of an entry in the cache.
+   * <p>
+   * This method is only visible for testing the cache expiration policy, as 
cache invalidation is handled
+   * by the catalog and not the cache itself.
+   * <p>
+   * Returns the age of the cache entry corresponding to the identifier,  or 
{@code Optional.empty} if the table
+   * is not present in the cache or if no expireAfterAccess policy is present 
in this CachingCatalog.
+   */
+  @VisibleForTesting
+  Optional<Duration> cachedEntryAge(TableIdentifier identifier) {
+    return tableCache.policy()
+        .expireAfterAccess()
+        .flatMap(tableExpiration -> tableExpiration.ageOf(identifier));
+  }
+
+  // Returns the cached Table entry corresponding to the given identifier iff
+  // it's in the cache. Grabs the table in a way that doesn't count as an 
access
+  // and thus won't affect the cached entry's ttl (if enabled).
+  @VisibleForTesting
+  Optional<Table> tableFromCacheQuietly(TableIdentifier identifier) {
+    // Ensure async cleanup actions have happened.
+    tableCache.cleanUp();
+    return 
Optional.ofNullable(tableCache.policy().getIfPresentQuietly(identifier));
+  }
+
+  @VisibleForTesting
+  Cache<TableIdentifier, Table> cache() {
+    return tableCache;
+  }
+
+  @VisibleForTesting
+  Optional<Duration> getTimeToTTL(TableIdentifier identifier) {
+    return tableCache
+        .policy()
+        .expireAfterAccess()  // Assumes expireAfterAccess, which is what we 
set at cache level.
+        .flatMap(tableExpiration -> tableExpiration.ageOf(identifier)) // Get 
the time the table has been cached.
+        .map(age -> Duration.ofMillis(expirationIntervalMillis).minus(age));
+  }
+
+  private Cache<TableIdentifier, Table> createTableCache(Ticker ticker) {
+    Caffeine<TableIdentifier, Table> cacheBuilder = Caffeine
+        .newBuilder()
+        .softValues()
+        .removalListener(keyLoggingRemovalListener)
+        .writer(new CacheWriter<TableIdentifier, Table>() {
+          @Override
+          // TODO - Consider expiring and syncing any metadata tables that 
have a different snapshotId
+          //        upon write.
+          public void write(TableIdentifier tableIdentifier, Table table) {
+            LOG.info("Table {} was written to the catalog with snapshotId {}", 
tableIdentifier,
+                table.currentSnapshot() == null ? null : 
table.currentSnapshot().snapshotId());
+          }
+
+          @Override
+          public void delete(TableIdentifier tableIdentifier, Table table, 
RemovalCause cause) {
+            // On expiration, remove any associated metadata tables so that 
subsequent catalog loads won't
+            // return stale metadata tables w.r.t. the underlying data tables 
they would return.
+            //
+            // TODO - Should we put metadata tables back into the catalog if 
their associated table is still
+            //        cached to keep tables and metadata tables on the same 
snapshot?
+            if (expirationEnabled && 
!MetadataTableUtils.hasMetadataTableName(tableIdentifier)) {

Review comment:
       Good catch. Will be much simpler if it's all in the `onTableExpiration` 
function.

##########
File path: core/src/test/java/org/apache/iceberg/CachingCatalogTestHelper.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.junit.Assert;
+
+/**
+ * Utility class for accessing package-private VisibleForTesting annotated 
methods
+ * outside the org.apache.iceberg package.
+ */
+public class CachingCatalogTestHelper {
+
+  // TODO - Consider making this a delegator?
+  private CachingCatalogTestHelper() {

Review comment:
       I personally didn't care for this either.
   
   The reason to have this is that the style checker complains that the 
`@VisibleForTesting` annotation is used for things that aren't package-private. 
The methods can't be package private otherwise as `TestCachingCatalog` is in 
org.apache.iceberg.hadoop but `CachingCatalog` is in org.apache.iceberg.
   
   It was _much_ cleaner without this. I can try to move the tests instead?

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -20,33 +20,143 @@
 package org.apache.iceberg;
 
 import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.CacheWriter;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
keyLoggingRemovalListener =
+      (key, value, cause) -> LOG.info("Expired {} from the TableCache", key);
+
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, false, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, boolean expirationEnabled, long 
expirationIntervalMilllis) {
+    return wrap(catalog, true, expirationEnabled, expirationIntervalMilllis);
+  }
+
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, boolean 
expirationEnabled,
+      long expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, expirationEnabled, 
expirationIntervalMillis);
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  @VisibleForTesting
+  static Catalog wrap(Catalog catalog, boolean expirationEnabled, long 
expirationIntervalMillis, Ticker ticker) {
+    return new CachingCatalog(catalog, true, expirationEnabled, 
expirationIntervalMillis, ticker);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  private final long expirationIntervalMillis;
+  private final Cache<TableIdentifier, Table> tableCache;
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, boolean 
isExpirationEnabled,
+      long expirationIntervalInMillis) {
+    this(catalog, caseSensitive, isExpirationEnabled, 
expirationIntervalInMillis, Ticker.systemTicker());
+  }
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, boolean 
isExpirationEnabled,
+      long expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    this.expirationEnabled = isExpirationEnabled;
+    this.expirationIntervalMillis = expirationIntervalMillis;
+
+    this.tableCache = createTableCache(ticker);
+  }
+
+  /**
+   * Return the age of an entry in the cache.
+   * <p>
+   * This method is only visible for testing the cache expiration policy, as 
cache invalidation is handled
+   * by the catalog and not the cache itself.
+   * <p>
+   * Returns the age of the cache entry corresponding to the identifier,  or 
{@code Optional.empty} if the table
+   * is not present in the cache or if no expireAfterAccess policy is present 
in this CachingCatalog.
+   */
+  @VisibleForTesting
+  Optional<Duration> cachedEntryAge(TableIdentifier identifier) {
+    return tableCache.policy()
+        .expireAfterAccess()
+        .flatMap(tableExpiration -> tableExpiration.ageOf(identifier));
+  }
+
+  // Returns the cached Table entry corresponding to the given identifier iff
+  // it's in the cache. Grabs the table in a way that doesn't count as an 
access
+  // and thus won't affect the cached entry's ttl (if enabled).
+  @VisibleForTesting
+  Optional<Table> tableFromCacheQuietly(TableIdentifier identifier) {
+    // Ensure async cleanup actions have happened.
+    tableCache.cleanUp();
+    return 
Optional.ofNullable(tableCache.policy().getIfPresentQuietly(identifier));
+  }
+
+  @VisibleForTesting
+  Cache<TableIdentifier, Table> cache() {
+    return tableCache;
+  }
+
+  @VisibleForTesting
+  Optional<Duration> getTimeToTTL(TableIdentifier identifier) {
+    return tableCache
+        .policy()
+        .expireAfterAccess()  // Assumes expireAfterAccess, which is what we 
set at cache level.
+        .flatMap(tableExpiration -> tableExpiration.ageOf(identifier)) // Get 
the time the table has been cached.
+        .map(age -> Duration.ofMillis(expirationIntervalMillis).minus(age));
+  }
+
+  private Cache<TableIdentifier, Table> createTableCache(Ticker ticker) {
+    Caffeine<TableIdentifier, Table> cacheBuilder = Caffeine
+        .newBuilder()
+        .softValues()
+        .removalListener(keyLoggingRemovalListener)
+        .writer(new CacheWriter<TableIdentifier, Table>() {
+          @Override
+          // TODO - Consider expiring and syncing any metadata tables that 
have a different snapshotId
+          //        upon write.
+          public void write(TableIdentifier tableIdentifier, Table table) {
+            LOG.info("Table {} was written to the catalog with snapshotId {}", 
tableIdentifier,

Review comment:
       Yeah this is so left over from debugging (and from testing with Spark). 
This one really isn't needed and I'll remove it entirely.

##########
File path: core/src/main/java/org/apache/iceberg/CatalogProperties.java
##########
@@ -30,6 +30,14 @@ private CatalogProperties() {
   public static final String FILE_IO_IMPL = "io-impl";
   public static final String WAREHOUSE_LOCATION = "warehouse";
 
+  public static final String TABLE_CACHE_ENABLED = "cache-enabled";
+  public static final boolean TABLE_CACHE_ENABLED_DEFAULT = true;
+  public static final String TABLE_CACHE_EXPIRATION_ENABLED = 
"cache.expiration-enabled";

Review comment:
       Sure that makes sense to me. Since it presently has to be set on the 
catalog configs, I wasn't that keen on having too many additional catalog 
configuration requirements.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -20,33 +20,143 @@
 package org.apache.iceberg;
 
 import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.CacheWriter;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
keyLoggingRemovalListener =

Review comment:
       Good call. I added this kind of late in debugging and forgot to update.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -72,6 +182,8 @@ public Table loadTable(TableIdentifier ident) {
     TableIdentifier canonicalized = canonicalizeIdentifier(ident);
     Table cached = tableCache.getIfPresent(canonicalized);
     if (cached != null) {
+      // TODO - If this is a metadata table consider, checking that this is in 
sync
+      //        with the version that is presently cached. Could be confusing 
to user's though.

Review comment:
       Will remove the comment then. Thanks for the input.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to