kbendick commented on a change in pull request #3543:
URL: https://github.com/apache/iceberg/pull/3543#discussion_r748914125
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -20,33 +20,143 @@
package org.apache.iceberg;
import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.CacheWriter;
import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
keyLoggingRemovalListener =
+ (key, value, cause) -> LOG.info("Expired {} from the TableCache", key);
+
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog, false, 0);
+ }
+
+ public static Catalog wrap(Catalog catalog, boolean expirationEnabled, long
expirationIntervalMilllis) {
+ return wrap(catalog, true, expirationEnabled, expirationIntervalMilllis);
+ }
+
+ public static Catalog wrap(Catalog catalog, boolean caseSensitive, boolean
expirationEnabled,
+ long expirationIntervalMillis) {
+ return new CachingCatalog(catalog, caseSensitive, expirationEnabled,
expirationIntervalMillis);
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ @VisibleForTesting
+ static Catalog wrap(Catalog catalog, boolean expirationEnabled, long
expirationIntervalMillis, Ticker ticker) {
+ return new CachingCatalog(catalog, true, expirationEnabled,
expirationIntervalMillis, ticker);
}
- private final Cache<TableIdentifier, Table> tableCache =
Caffeine.newBuilder().softValues().build();
private final Catalog catalog;
private final boolean caseSensitive;
+ private final boolean expirationEnabled;
+ private final long expirationIntervalMillis;
+ private final Cache<TableIdentifier, Table> tableCache;
- private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, boolean
isExpirationEnabled,
+ long expirationIntervalInMillis) {
+ this(catalog, caseSensitive, isExpirationEnabled,
expirationIntervalInMillis, Ticker.systemTicker());
+ }
+
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, boolean
isExpirationEnabled,
+ long expirationIntervalMillis, Ticker ticker) {
this.catalog = catalog;
this.caseSensitive = caseSensitive;
+ this.expirationEnabled = isExpirationEnabled;
+ this.expirationIntervalMillis = expirationIntervalMillis;
+
+ this.tableCache = createTableCache(ticker);
+ }
+
+ /**
+ * Return the age of an entry in the cache.
+ * <p>
+ * This method is only visible for testing the cache expiration policy, as
cache invalidation is handled
+ * by the catalog and not the cache itself.
+ * <p>
+ * Returns the age of the cache entry corresponding to the identifier, or
{@code Optional.empty} if the table
+ * is not present in the cache or if no expireAfterAccess policy is present
in this CachingCatalog.
+ */
+ @VisibleForTesting
+ Optional<Duration> cachedEntryAge(TableIdentifier identifier) {
+ return tableCache.policy()
+ .expireAfterAccess()
+ .flatMap(tableExpiration -> tableExpiration.ageOf(identifier));
+ }
+
+ // Returns the cached Table entry corresponding to the given identifier iff
+ // it's in the cache. Grabs the table in a way that doesn't count as an
access
+ // and thus won't affect the cached entry's ttl (if enabled).
+ @VisibleForTesting
+ Optional<Table> tableFromCacheQuietly(TableIdentifier identifier) {
+ // Ensure async cleanup actions have happened.
+ tableCache.cleanUp();
+ return
Optional.ofNullable(tableCache.policy().getIfPresentQuietly(identifier));
+ }
+
+ @VisibleForTesting
+ Cache<TableIdentifier, Table> cache() {
+ return tableCache;
+ }
+
+ @VisibleForTesting
+ Optional<Duration> getTimeToTTL(TableIdentifier identifier) {
+ return tableCache
+ .policy()
+ .expireAfterAccess() // Assumes expireAfterAccess, which is what we
set at cache level.
+ .flatMap(tableExpiration -> tableExpiration.ageOf(identifier)) // Get
the time the table has been cached.
+ .map(age -> Duration.ofMillis(expirationIntervalMillis).minus(age));
+ }
+
+ private Cache<TableIdentifier, Table> createTableCache(Ticker ticker) {
+ Caffeine<TableIdentifier, Table> cacheBuilder = Caffeine
+ .newBuilder()
+ .softValues()
+ .removalListener(keyLoggingRemovalListener)
+ .writer(new CacheWriter<TableIdentifier, Table>() {
+ @Override
+ // TODO - Consider expiring and syncing any metadata tables that
have a different snapshotId
+ // upon write.
+ public void write(TableIdentifier tableIdentifier, Table table) {
+ LOG.info("Table {} was written to the catalog with snapshotId {}",
tableIdentifier,
+ table.currentSnapshot() == null ? null :
table.currentSnapshot().snapshotId());
+ }
+
+ @Override
+ public void delete(TableIdentifier tableIdentifier, Table table,
RemovalCause cause) {
+ // On expiration, remove any associated metadata tables so that
subsequent catalog loads won't
+ // return stale metadata tables w.r.t. the underlying data tables
they would return.
+ //
+ // TODO - Should we put metadata tables back into the catalog if
their associated table is still
+ // cached to keep tables and metadata tables on the same
snapshot?
+ if (expirationEnabled &&
!MetadataTableUtils.hasMetadataTableName(tableIdentifier)) {
Review comment:
I was thinking about that too. Checking expirationEnabled alone isn't
necessarily sufficient depending on the behavior we want.
The causes are as follows for those who don't know:
- `EXPLICIT` - Things like `invalidate`, `remove`, etc.
- `REPLACED` - New entry overwriting an old one - Doesn't seem possible
looking at the code.
- `COLLECTED` - for us, this would occur due to configuring `softValues`.
- `EXPIRED` - the reason that would come from `expireAfterAccess etc - what
`expirationEnabled` really means.
- `SIZE` - Not relevant to us now as we don't limit cache size.
How we react to each depends on how much we want to try to keep metadata
tables in sync with their main table (which currently it does do when `drop`
etc are called).
My thoughts on the ones I believe will come up:
- `EXPLICIT`: I think it's possible to ignore since we call `invalidateAll`
on metadata tables already on drop etc.
- `COLLECTED`: Whether or not we expire metadata tables when origin table is
GC'd I'm very open to ideas on
- `EXPIRED`: Yes, we should all the function.
I can change it to be reacting in a switch statement to the possible enums
while we decide on this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]