kbendick commented on a change in pull request #3543:
URL: https://github.com/apache/iceberg/pull/3543#discussion_r766947324



##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("{} was evicted from the TableCache 
with {} cause", key, cause);
+
+  // TODO - Update this to use the default expiration interval millis once 
things are more complete.
+  //        Don't want to break current tests.
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
+  }
+
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+    return wrap(catalog, true, expirationInterval.toMillis());
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    // Set negative values to zero to avoid creating a Duration with a 
negative value
+    this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 : 
expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;

Review comment:
       Oh I understand. Yeah I will move the logic higher. It would need to be 
in the specific catalog when we instantiate it (e.g. in SparkCatalog).
   
   I will update.

##########
File path: core/src/test/java/org/apache/iceberg/util/FakeTicker.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A Ticker whose value can be advanced programmatically in test.
+ *
+ * This is modified from the Guava package 
com.google.common.testing.FakeTicker,

Review comment:
       I have actually removed all of the Guava code from this.
   
   This is an implementation of the interface from `caffeine`, and there are no 
methods from the Guava class in it anymore.
   
   I can remove that part at the top, as there are many `FakeTicker`s abounding 
on the internet. This one only uses nanos because that's what the interface 
uses. But I did not run across one that used a `Duration` as input.
   
   So this is arguably not from Guava anymore (plus we do credit Guava already).
   
   Up to you how to proceed.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("{} was evicted from the TableCache 
with {} cause", key, cause);
+
+  // TODO - Update this to use the default expiration interval millis once 
things are more complete.
+  //        Don't want to break current tests.
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
+  }
+
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+    return wrap(catalog, true, expirationInterval.toMillis());
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    // Set negative values to zero to avoid creating a Duration with a 
negative value
+    this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 : 
expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;
+    this.tableCache = createTableCache(ticker);
+  }
+
+  private Cache<TableIdentifier, Table> createTableCache(Ticker ticker) {
+    Caffeine<TableIdentifier, Table> cacheBuilder = Caffeine
+        .newBuilder()
+        .softValues()
+        .removalListener(identLoggingRemovalListener);
+
+    // Only setup ticker and background cleanup tasks from expiration if table 
expiration is enabled.
+    if (expirationEnabled) {
+      return cacheBuilder
+          .writer(new CacheWriter<TableIdentifier, Table>() {
+            @Override
+            public void write(TableIdentifier tableIdentifier, Table table) {
+            }

Review comment:
       The preference is really to inline them. The compiler has trouble 
figuring out the types otherwise.
   
   Almost all examples I've seen have inlined like this. But I'll see if I can 
get it work. Right now it's complaining because the `Caffeine` object generated 
is really `Caffeine<Object, Object>` for a long time and they have some weird 
tricks that do the type inferring after a bit. Hence the preference towards 
in-lining and a fluent builder where posible.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("{} was evicted from the TableCache 
with {} cause", key, cause);
+
+  // TODO - Update this to use the default expiration interval millis once 
things are more complete.
+  //        Don't want to break current tests.
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
+  }
+
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+    return wrap(catalog, true, expirationInterval.toMillis());
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    // Set negative values to zero to avoid creating a Duration with a 
negative value
+    this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 : 
expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;

Review comment:
       What happens if:
   -  `cache-enabled=true` and `cache.expiration-interval-ms=0`? Should that 
not cache then?
   
   That's the part that I'm hung up on. 

##########
File path: core/src/main/java/org/apache/iceberg/CatalogProperties.java
##########
@@ -30,6 +30,14 @@ private CatalogProperties() {
   public static final String FILE_IO_IMPL = "io-impl";
   public static final String WAREHOUSE_LOCATION = "warehouse";
 
+  public static final String TABLE_CACHE_ENABLED = "cache-enabled";
+  public static final boolean TABLE_CACHE_ENABLED_DEFAULT = true;
+  public static final String TABLE_CACHE_EXPIRATION_ENABLED = 
"cache.expiration-enabled";

Review comment:
       Sure that makes sense to me. Since it's a new catalog level config, the 
fewer options users have to set to get the same behavior, the better in my view.
   
   One concern is that users will then have to specifically choose a timeout 
value. Feedback I've received offline so far has mentioned that people aren't 
too sure what value would be best to use. Removing the `ENABLED` option makes 
it harder to have a default and remove the need to make that choice.
   
   But I guess if we always set up the examples with the "default" value, users 
will likely copy that.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after

Review comment:
       Fixed.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *

Review comment:
       Fixed. We have a number of places that don't do this, but I'm guessing 
we don't want to cause that large diff just for `<p>` (in a separate PR that 
is).

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("{} was evicted from the TableCache 
with {} cause", key, cause);
+
+  // TODO - Update this to use the default expiration interval millis once 
things are more complete.
+  //        Don't want to break current tests.
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
+  }
+
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+    return wrap(catalog, true, expirationInterval.toMillis());
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    // Set negative values to zero to avoid creating a Duration with a 
negative value
+    this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 : 
expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;
+    this.tableCache = createTableCache(ticker);
+  }
+
+  private Cache<TableIdentifier, Table> createTableCache(Ticker ticker) {
+    Caffeine<TableIdentifier, Table> cacheBuilder = Caffeine
+        .newBuilder()
+        .softValues()
+        .removalListener(identLoggingRemovalListener);
+
+    // Only setup ticker and background cleanup tasks from expiration if table 
expiration is enabled.
+    if (expirationEnabled) {
+      return cacheBuilder
+          .writer(new CacheWriter<TableIdentifier, Table>() {
+            @Override
+            public void write(TableIdentifier tableIdentifier, Table table) {
+            }

Review comment:
       I managed to get it. Upgrading my IntelliJ caused a lot of headaches. I 
downgraded and it seems fine now.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("{} was evicted from the TableCache 
with {} cause", key, cause);
+
+  // TODO - Update this to use the default expiration interval millis once 
things are more complete.
+  //        Don't want to break current tests.
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
+  }
+
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+    return wrap(catalog, true, expirationInterval.toMillis());
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    // Set negative values to zero to avoid creating a Duration with a 
negative value
+    this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 : 
expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;
+    this.tableCache = createTableCache(ticker);
+  }
+
+  private Cache<TableIdentifier, Table> createTableCache(Ticker ticker) {
+    Caffeine<TableIdentifier, Table> cacheBuilder = Caffeine
+        .newBuilder()
+        .softValues()
+        .removalListener(identLoggingRemovalListener);
+
+    // Only setup ticker and background cleanup tasks from expiration if table 
expiration is enabled.
+    if (expirationEnabled) {
+      return cacheBuilder
+          .writer(new CacheWriter<TableIdentifier, Table>() {
+            @Override
+            public void write(TableIdentifier tableIdentifier, Table table) {
+            }

Review comment:
       I managed to get it. Upgrading my IntelliJ caused a lot of headaches. I 
downgraded and it seems fine now so it's a separate class.

##########
File path: core/src/test/java/org/apache/iceberg/util/FakeTicker.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A Ticker whose value can be advanced programmatically in test.
+ *
+ * This is modified from the Guava package 
com.google.common.testing.FakeTicker,

Review comment:
       Removed.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,52 @@ public boolean dropNamespace(String[] namespace) throws 
NoSuchNamespaceException
 
   @Override
   public final void initialize(String name, CaseInsensitiveStringMap options) {
-    this.cacheEnabled = 
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
-    Catalog catalog = buildIcebergCatalog(name, options);
+    this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+        CatalogProperties.TABLE_CACHE_ENABLED, 
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+    // If the user disabled caching and did not set the 
cache.expiration-interval-ms, we'll set it to zero for
+    // them on their behalf. If they disabled caching but explicitly set a 
non-zero cache expiration
+    // interval, we will fail initialization as that's an invalid 
configuration.
+    long defaultCacheExpirationInterval =
+        cacheEnabled ? 
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT : 0L;
+
+    this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+        CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+        defaultCacheExpirationInterval);
+
+    // Normalize usage of -1 to 0 as we'll call Duration.ofMillis on this 
value.
+    if (cacheExpirationIntervalMs < 0) {
+      this.cacheExpirationIntervalMs = 0;
+    }
+
+    Preconditions.checkArgument(cacheEnabled || cacheExpirationIntervalMs <= 
0L,
+        "The catalog's table cache expiration interval must be set to zero via 
the property %s if caching is disabled",
+        CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS);
+
+    // If the user didn't specify TABLE_CACHE_EXPIRATION_INTERVAL_MS, put it 
into the SparkConf and the
+    // options map in case it's assumed to be elsewhere (such as cloning a 
spark session and
+    // re-instantiating the catalog).

Review comment:
       Fair enough. Looking at it closer, I don't think it would be a huge 
issue.
   
   My worry was that these options get pushed down, and are the options used to 
instantiate custom catalogs here: 
https://github.com/apache/iceberg/blob/58ceafb6c9e87aee7445141c283d0d2e06e38c5e/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L201-L235
   
   We also modify the options map here: 
https://github.com/apache/iceberg/blob/58ceafb6c9e87aee7445141c283d0d2e06e38c5e/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java#L92-L106
   
   But at least for Spark, it should be fine. If they're in the options map, 
they are (from user provided config), otherwise they're not but it shouldn't be 
an issue. 

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,52 @@ public boolean dropNamespace(String[] namespace) throws 
NoSuchNamespaceException
 
   @Override
   public final void initialize(String name, CaseInsensitiveStringMap options) {
-    this.cacheEnabled = 
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
-    Catalog catalog = buildIcebergCatalog(name, options);
+    this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+        CatalogProperties.TABLE_CACHE_ENABLED, 
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+    // If the user disabled caching and did not set the 
cache.expiration-interval-ms, we'll set it to zero for
+    // them on their behalf. If they disabled caching but explicitly set a 
non-zero cache expiration
+    // interval, we will fail initialization as that's an invalid 
configuration.
+    long defaultCacheExpirationInterval =
+        cacheEnabled ? 
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT : 0L;
+
+    this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+        CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+        defaultCacheExpirationInterval);
+
+    // Normalize usage of -1 to 0 as we'll call Duration.ofMillis on this 
value.
+    if (cacheExpirationIntervalMs < 0) {
+      this.cacheExpirationIntervalMs = 0;
+    }
+
+    Preconditions.checkArgument(cacheEnabled || cacheExpirationIntervalMs <= 
0L,
+        "The catalog's table cache expiration interval must be set to zero via 
the property %s if caching is disabled",
+        CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS);
+
+    // If the user didn't specify TABLE_CACHE_EXPIRATION_INTERVAL_MS, put it 
into the SparkConf and the
+    // options map in case it's assumed to be elsewhere (such as cloning a 
spark session and
+    // re-instantiating the catalog).

Review comment:
       I just checked and it shouldn't be an issue for Flink either. Will 
remove.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -384,14 +386,23 @@ public boolean dropNamespace(String[] namespace) throws 
NoSuchNamespaceException
 
   @Override
   public final void initialize(String name, CaseInsensitiveStringMap options) {
-    this.cacheEnabled = 
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
+    this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+        CatalogProperties.TABLE_CACHE_ENABLED, 
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+    this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+        CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+        CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);
+
+    Preconditions.checkArgument(cacheEnabled || cacheExpirationIntervalMs > 0L,
+        "The catalog's table cache expiration interval must be set to zero via 
the property {} if caching is disabled",
+        CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS);
+

Review comment:
       I think that instead of throwing, since some of the tests are failing 
now where cache is simply disabled, I'm going to check if cache is disabled and 
the user explicitly set a value greater than zero for the cache expiration 
interval ms.
   
   If the user simply disabled caching and didn't change that field (or set it 
to zero), we won't throw in that situation and we will default it to zero 
instead.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("{} was evicted from the TableCache 
with {} cause", key, cause);
+
+  // TODO - Update this to use the default expiration interval millis once 
things are more complete.
+  //        Don't want to break current tests.
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
+  }
+
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+    return wrap(catalog, true, expirationInterval.toMillis());
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    // Set negative values to zero to avoid creating a Duration with a 
negative value
+    this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 : 
expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;

Review comment:
       As a user, if I had `cache-enabled=true`, I'd be surprised if anything 
other than caching was happening.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("{} was evicted from the TableCache 
with {} cause", key, cause);
+
+  // TODO - Update this to use the default expiration interval millis once 
things are more complete.
+  //        Don't want to break current tests.
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
+  }
+
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+    return wrap(catalog, true, expirationInterval.toMillis());
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    // Set negative values to zero to avoid creating a Duration with a 
negative value
+    this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 : 
expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;

Review comment:
       Ok. I will do that. I will have 1 month be don't expire, 1+ be cache 
with expire, and 0 be don't cache anymore.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("{} was evicted from the TableCache 
with {} cause", key, cause);
+
+  // TODO - Update this to use the default expiration interval millis once 
things are more complete.
+  //        Don't want to break current tests.
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
+  }
+
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+    return wrap(catalog, true, expirationInterval.toMillis());
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    // Set negative values to zero to avoid creating a Duration with a 
negative value
+    this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 : 
expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;

Review comment:
       I still feel like `-1` would be a better choice for "cache always", with 
zero turning it off. But I've updated it to 30 days (a relatively large value).

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("{} was evicted from the TableCache 
with {} cause", key, cause);
+
+  // TODO - Update this to use the default expiration interval millis once 
things are more complete.
+  //        Don't want to break current tests.
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
+  }
+
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+    return wrap(catalog, true, expirationInterval.toMillis());
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    // Set negative values to zero to avoid creating a Duration with a 
negative value
+    this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 : 
expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;

Review comment:
       We settled on -1 for turning "cache always" (i.e. expiration is off).

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,52 @@ public boolean dropNamespace(String[] namespace) throws 
NoSuchNamespaceException
 
   @Override
   public final void initialize(String name, CaseInsensitiveStringMap options) {
-    this.cacheEnabled = 
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
-    Catalog catalog = buildIcebergCatalog(name, options);
+    this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+        CatalogProperties.TABLE_CACHE_ENABLED, 
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+    // If the user disabled caching and did not set the 
cache.expiration-interval-ms, we'll set it to zero for
+    // them on their behalf. If they disabled caching but explicitly set a 
non-zero cache expiration
+    // interval, we will fail initialization as that's an invalid 
configuration.
+    long defaultCacheExpirationInterval =
+        cacheEnabled ? 
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT : 0L;
+
+    this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+        CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+        defaultCacheExpirationInterval);
+
+    // Normalize usage of -1 to 0 as we'll call Duration.ofMillis on this 
value.
+    if (cacheExpirationIntervalMs < 0) {

Review comment:
       No negative values would leave caching on, just disable cache expiration.

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables 
after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("{} was evicted from the TableCache 
with {} cause", key, cause);
+
+  // TODO - Update this to use the default expiration interval millis once 
things are more complete.
+  //        Don't want to break current tests.
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 0);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
+  }
+
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+    return wrap(catalog, true, expirationInterval.toMillis());
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    return new CachingCatalog(catalog, caseSensitive, 
expirationIntervalMillis);
   }
 
-  private final Cache<TableIdentifier, Table> tableCache = 
Caffeine.newBuilder().softValues().build();
   private final Catalog catalog;
   private final boolean caseSensitive;
+  private final boolean expirationEnabled;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final long expirationIntervalMillis;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final Cache<TableIdentifier, Table> tableCache;
+
+  private CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis) {
+    this(catalog, caseSensitive, expirationIntervalMillis, 
Ticker.systemTicker());
+  }
 
-  private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected CachingCatalog(Catalog catalog, boolean caseSensitive, long 
expirationIntervalMillis, Ticker ticker) {
     this.catalog = catalog;
     this.caseSensitive = caseSensitive;
+    // Set negative values to zero to avoid creating a Duration with a 
negative value
+    this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 : 
expirationIntervalMillis;
+    this.expirationEnabled = expirationIntervalMillis > 0;

Review comment:
       Oh and the Precondition check ensures that we haven't entered 
`CachingCatalog` with a value of `0` for expiration interval (as that should 
turn off caching, which is controlled by no using this wrapper).

##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -29,24 +34,103 @@
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps an Iceberg Catalog to cache tables.
+ * <p>
+ * When {@code expirationIntervalMillis} is positive, the cache will expire 
tables after
+ * the given interval provided they have not been accessed via the catalog in 
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ * <p>
+ * If the duration is negative / -1, cache-entry expiration is disabled.
+ * <p>
+ * If the duration is zero, caching should be turned off. If a value of zero 
gets passed to
+ * this {@link Catalog} wrapper, that is a bug.
+ */
 public class CachingCatalog implements Catalog {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
+  private static final RemovalListener<TableIdentifier, Table> 
identLoggingRemovalListener =
+      (key, value, cause) -> LOG.debug("Evicted {} from the table cache ({})", 
key, cause);
+
   public static Catalog wrap(Catalog catalog) {
-    return wrap(catalog, true);
+    return wrap(catalog, 
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_OFF);
+  }
+
+  public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+    return wrap(catalog, true, expirationIntervalMillis);
   }
 
-  public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
-    return new CachingCatalog(catalog, caseSensitive);
+  public static Catalog wrap(Catalog catalog, Duration expirationInterval) {

Review comment:
       Yeah I'll drop it.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -83,7 +89,8 @@
 
   private String catalogName = null;
   private Catalog icebergCatalog = null;
-  private boolean cacheEnabled = true;
+  private boolean cacheEnabled = CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT;
+  private long cacheExpirationIntervalMs = 
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;

Review comment:
       I use them in the Spark test, but I can assert on properties of the 
CachingCatalog underneath instead.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,31 @@ public boolean dropNamespace(String[] namespace) throws 
NoSuchNamespaceException
 
   @Override
   public final void initialize(String name, CaseInsensitiveStringMap options) {
-    this.cacheEnabled = 
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
+    this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+        CatalogProperties.TABLE_CACHE_ENABLED, 
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+    // If the user disabled caching, we turn off the cache-expiration for them
+    long defaultCacheExpirationInterval = cacheEnabled ?
+            CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT :
+            CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_OFF;
+
+    this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+        CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+        defaultCacheExpirationInterval);

Review comment:
       Sure. That makes sense.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,31 @@ public boolean dropNamespace(String[] namespace) throws 
NoSuchNamespaceException
 
   @Override
   public final void initialize(String name, CaseInsensitiveStringMap options) {
-    this.cacheEnabled = 
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
+    this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+        CatalogProperties.TABLE_CACHE_ENABLED, 
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+    // If the user disabled caching, we turn off the cache-expiration for them
+    long defaultCacheExpirationInterval = cacheEnabled ?
+            CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT :
+            CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_OFF;
+
+    this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+        CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+        defaultCacheExpirationInterval);

Review comment:
       Done.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,31 @@ public boolean dropNamespace(String[] namespace) throws 
NoSuchNamespaceException
 
   @Override
   public final void initialize(String name, CaseInsensitiveStringMap options) {
-    this.cacheEnabled = 
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
+    this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+        CatalogProperties.TABLE_CACHE_ENABLED, 
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+    // If the user disabled caching, we turn off the cache-expiration for them
+    long defaultCacheExpirationInterval = cacheEnabled ?
+            CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT :
+            CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_OFF;

Review comment:
       Removed.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -83,7 +89,8 @@
 
   private String catalogName = null;
   private Catalog icebergCatalog = null;
-  private boolean cacheEnabled = true;
+  private boolean cacheEnabled = CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT;
+  private long cacheExpirationIntervalMs = 
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;

Review comment:
       Actually `cacheEnabled` is used in several other places. It needs to be 
a field. But I can remove `cacheExpirationIntervalMs`

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -83,7 +89,8 @@
 
   private String catalogName = null;
   private Catalog icebergCatalog = null;
-  private boolean cacheEnabled = true;
+  private boolean cacheEnabled = CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT;
+  private long cacheExpirationIntervalMs = 
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;

Review comment:
       I removed `cacheExpirationIntervalMs` and made it a local variable 
inside `initialize`. The rest is left as it was before this PR.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {

Review comment:
       I actually override it with 4 catalogs to test different combinations of 
behaviors.
   
   I can change the base, but the outcome will still be the same.
   
   Let me re-evaluate the test cases now that some changes have been made.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {

Review comment:
       I guess I can, but all of these things test different things. Also, 
`SparkTestBaseWIthCatalog` errors out when I try to pass it any config (which I 
would need here).

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {

Review comment:
       Ok. So I figured out how to make `SparkTestBaseWithCatalog` work. And I 
used it to test the SparkSessionCatalog.
   
   But I'm not sure which test case I should get rid of here:
   1) cache-enabled = false
   2) cache-enabled = true, expiration interval is above zero
   3) cache-enabled = true, expiration interval is zero so caching shouldn't 
happen.
   
   I feel I can get rid of test case number one just fine, but I'd like to keep 
the other two.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {

Review comment:
       Ok. So in the interest of speed / being able to parallelize test suites, 
as well as making the code cleaner, I've left this as only two test case.
   
   This file has two test cases, one for cache enabled / exipration enabled and 
one for cache enabled / expiration disabled.
   
   I added a separate test using `SparkTestBaseWithCatalog` that tests 
implicitly turning off caching by using a value of 0 for the expiration 
interval.
   
   I also added a separate test using `SparkTestBaseWithCatalog` that tests one 
example of cache expiration enabled on the SparkSessionCatalog.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {

Review comment:
       So there are now 4 tests. There are a lot of possible combinations to 
test, let me know which ones you think I should remove.
   
   1. TestSparkCatalogCacheExpirationDisabled
     1a. Checks that CachingCatalog is not used  when 
cache.expiration-interval-millis=0 (this one seems important).
   2. TestSparkCatalogCacheExpirationEnabled
     2a. Check with cache-enabled and expiration enabled via 30s (in millis)
     2b. Check with cache-enabled and expiration disabled via `-1`
   3. TestSparkSessionCatalogCacheExpiration
     3a. Same as 2a, but using the SparkSessionCatalog
   
   I think that 1a is important to have a test that verifies that behavior, 
same with 2b. If we wanted to get rid of a test, I would vote for 2a and just 
use 3a instead (since that tests the same behavior but ALSO ensures that the 
SparkSessionCatalog works with the new configs as well).

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {

Review comment:
       So there are now 4 tests. There are a lot of possible combinations to 
test, let me know which ones you think I should remove.
   
   1. **TestSparkCatalogCacheExpirationDisabled**
     1a. Checks that CachingCatalog is not used  when 
cache.expiration-interval-millis=0 (this one seems important).
   2. **TestSparkCatalogCacheExpirationEnabled**
     2a. Check with cache-enabled and expiration enabled via 30s (in millis)
     2b. Check with cache-enabled and expiration disabled via `-1`
   3. **TestSparkSessionCatalogCacheExpiration**
     3a. Same as 2a, but using the SparkSessionCatalog
   
   I think that 1a is important to have a test that verifies that behavior, 
same with 2b. If we wanted to get rid of a test, I would vote for 2a and just 
use 3a instead (since that tests the same behavior but ALSO ensures that the 
SparkSessionCatalog works with the new configs as well).

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {

Review comment:
       The reason that we need to setup different Spark catalogs to test this 
is that the configuration that gets passed into `initialize`, so a second 
behavior test can't be run over the same catalog.
   
   Again, let me know which tests to remove. And when I backport it, I'll only 
backport a subset of the tests if we leave multiple in this one. But I think at 
least 2 of these are necessary checks (`cache.expiration-interval-millis=0` 
disabling caching entirely and `cache.expiration-interval-millis=-1` for cache 
forever). Ideally we'd leave 3 but I understand these Spark tests are expensive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to