jackye1995 commented on a change in pull request #3543:
URL: https://github.com/apache/iceberg/pull/3543#discussion_r767229011



##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -29,24 +34,99 @@
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps an Iceberg Catalog to cache tables.
+ * <p>
+ * When {@code expirationIntervalMillis} is positive, the cache will expire 
tables after

Review comment:
       we can link to the explanation using `{@link 
CatalogProperties#TABLE_CACHE_EXPIRATION_INTERVAL_MS}` instead of duplicating 
the explanation in 2 places.

##########
File path: core/src/main/java/org/apache/iceberg/CatalogProperties.java
##########
@@ -30,6 +30,15 @@ private CatalogProperties() {
   public static final String FILE_IO_IMPL = "io-impl";
   public static final String WAREHOUSE_LOCATION = "warehouse";
 
+  public static final String TABLE_CACHE_ENABLED = "cache-enabled";
+  public static final boolean TABLE_CACHE_ENABLED_DEFAULT = true;
+  // If cache expiration interval is 0, we disable caching.

Review comment:
       nit: newline from the last variable, and can use `/**` for block comment 
instead of `//`

##########
File path: core/src/test/java/org/apache/iceberg/util/FakeTicker.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A {@code Ticker} whose value can be advanced programmatically in tests
+ */
+public class FakeTicker implements Ticker {
+
+  public FakeTicker() {
+  }
+
+  private final AtomicLong nanos = new AtomicLong();
+
+  public FakeTicker advance(Duration duration) {
+    nanos.addAndGet(duration.toNanos());
+    return this;
+  }
+
+  @Override public long read() {

Review comment:
       nit: newline after `Override`

##########
File path: core/src/main/java/org/apache/iceberg/CatalogProperties.java
##########
@@ -30,6 +30,15 @@ private CatalogProperties() {
   public static final String FILE_IO_IMPL = "io-impl";
   public static final String WAREHOUSE_LOCATION = "warehouse";
 
+  public static final String TABLE_CACHE_ENABLED = "cache-enabled";

Review comment:
       why `TABLE_CACHE_` instead of just `CACHE_`?

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("{} was evicted from the TableCache 
with {} cause", key, cause);
+
+  // TODO - Update this to use the default expiration interval millis once 
things are more complete.
+  //        Don't want to break current tests.
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
+  }
+
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+    return wrap(catalog, true, expirationInterval.toMillis());
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    // Set negative values to zero to avoid creating a Duration with a 
negative value
+    this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 : 
expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;

Review comment:
       > We settled on -1 for turning "cache always" (i.e. expiration is off).
   
   Could you explain a bit about the reason the settlement? Or maybe I missed 
the conversation when reading, if you could link to that. I actually really 
like the idea of having a max value and treat everything above that as forever. 
The reasons are:
   
   1. it preserves the meaning of "interval" as non-negative, user does not 
need to check what -1 means for an interval, like what we are trying to do here.
   2. it does not really make sense to keep a cache for a year anyway, a max 
value does make sense in this case.
   
   And we can just log a warning if the interval value is too large when 
initializing catalog in engine.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -73,6 +74,13 @@
  *   <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)</li>
  *   <li><code>default-namespace</code> - a namespace to use as the 
default</li>
  *   <li><code>cache-enabled</code> - whether to enable catalog cache</li>
+ *   <li><code>cache.expiration-interval-ms</code> - interval in millis before 
expiring tables from catalog cache

Review comment:
       I think we don't need these explanation by linking to the javadoc in 
`CatalogProperties`

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -114,6 +194,12 @@ private void invalidate(TableIdentifier ident) {
     tableCache.invalidateAll(metadataTableIdentifiers(ident));
   }
 
+  private void onTableExpiration(TableIdentifier ident) {

Review comment:
       this method seems to be not related to what it is doing

##########
File path: core/src/main/java/org/apache/iceberg/CatalogProperties.java
##########
@@ -30,6 +30,15 @@ private CatalogProperties() {
   public static final String FILE_IO_IMPL = "io-impl";
   public static final String WAREHOUSE_LOCATION = "warehouse";
 
+  public static final String TABLE_CACHE_ENABLED = "cache-enabled";

Review comment:
       I originally thought the interval will work in combination with 
`cache-enabled`, but now we are can fully determine if caching is determined 
based on the expiration interval, so this is really just a shortcut. This is 
definitely a much cleaner solution.
   
   This reminds me of how we decided to make `type` is just a shortcut of 
`catalog-impl`, and I think we should just follow what we did there.
   
   So We don't need  `cache-enabled` as a catalog property, and instead it just 
translates to an interval value at engine side to initialize the catalog. I 
think this will simplify your Spark tests a lot, as there is no need for you to 
test all those combinations of `cache-enabled` and interval settings. You just 
need 1 test for the translation, and ensure the interval works correctly at 
core level.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -29,24 +34,99 @@
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps an Iceberg Catalog to cache tables.
+ * <p>
+ * When {@code expirationIntervalMillis} is positive, the cache will expire 
tables after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ * <p>
+ * If the duration is negative / -1, cache-entry expiration is disabled.
+ * <p>
+ * If the duration is zero, caching should be turned off. If a value of zero 
gets passed to
+ * this {@link Catalog} wrapper, that is a bug.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("Evicted {} from the table cache ({})", 
key, cause);
+
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_OFF);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
+    Preconditions.checkArgument(expirationIntervalMillis != 0,
+        "When %s is set to 0, the catalog cache should be disabled. This 
indicates a bug.",
+        CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS);
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    this.expirationIntervalMillis = expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;
+    this.tableCache = createTableCache(ticker);
+  }
+
+  // Class called when an item is removed from or written to the cache, to 
allow for callbacks.
+  // Only used when cache expiration is enabled.
+  class MetadataTableInvalidatingCacheWriter implements 
CacheWriter<TableIdentifier, Table> {

Review comment:
       why is this not a static class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to