kbendick commented on a change in pull request #3543:
URL: https://github.com/apache/iceberg/pull/3543#discussion_r766947324
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables
after
+ * the given interval provided they have not been accessed via the catalog in
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
identLoggingRemovalListener =
+ (key, value, cause) -> LOG.debug("{} was evicted from the TableCache
with {} cause", key, cause);
+
+ // TODO - Update this to use the default expiration interval millis once
things are more complete.
+ // Don't want to break current tests.
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog, 0);
+ }
+
+ public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+ return wrap(catalog, true, expirationIntervalMillis);
+ }
+
+ public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+ return wrap(catalog, true, expirationInterval.toMillis());
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ public static Catalog wrap(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ return new CachingCatalog(catalog, caseSensitive,
expirationIntervalMillis);
}
- private final Cache<TableIdentifier, Table> tableCache =
Caffeine.newBuilder().softValues().build();
private final Catalog catalog;
private final boolean caseSensitive;
+ private final boolean expirationEnabled;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final long expirationIntervalMillis;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final Cache<TableIdentifier, Table> tableCache;
+
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ this(catalog, caseSensitive, expirationIntervalMillis,
Ticker.systemTicker());
+ }
- private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis, Ticker ticker) {
this.catalog = catalog;
this.caseSensitive = caseSensitive;
+ // Set negative values to zero to avoid creating a Duration with a
negative value
+ this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 :
expirationIntervalMillis;
+ this.expirationEnabled = expirationIntervalMillis > 0;
Review comment:
Oh I understand. Yeah I will move the logic higher. It would need to be
in the specific catalog when we instantiate it (e.g. in SparkCatalog).
I will update.
##########
File path: core/src/test/java/org/apache/iceberg/util/FakeTicker.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A Ticker whose value can be advanced programmatically in test.
+ *
+ * This is modified from the Guava package
com.google.common.testing.FakeTicker,
Review comment:
I have actually removed all of the Guava code from this.
This is an implementation of the interface from `caffeine`, and there are no
methods from the Guava class in it anymore.
I can remove that part at the top, as there are many `FakeTicker`s abounding
on the internet. This one only uses nanos because that's what the interface
uses. But I did not run across one that used a `Duration` as input.
So this is arguably not from Guava anymore (plus we do credit Guava already).
Up to you how to proceed.
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables
after
+ * the given interval provided they have not been accessed via the catalog in
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
identLoggingRemovalListener =
+ (key, value, cause) -> LOG.debug("{} was evicted from the TableCache
with {} cause", key, cause);
+
+ // TODO - Update this to use the default expiration interval millis once
things are more complete.
+ // Don't want to break current tests.
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog, 0);
+ }
+
+ public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+ return wrap(catalog, true, expirationIntervalMillis);
+ }
+
+ public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+ return wrap(catalog, true, expirationInterval.toMillis());
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ public static Catalog wrap(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ return new CachingCatalog(catalog, caseSensitive,
expirationIntervalMillis);
}
- private final Cache<TableIdentifier, Table> tableCache =
Caffeine.newBuilder().softValues().build();
private final Catalog catalog;
private final boolean caseSensitive;
+ private final boolean expirationEnabled;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final long expirationIntervalMillis;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final Cache<TableIdentifier, Table> tableCache;
+
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ this(catalog, caseSensitive, expirationIntervalMillis,
Ticker.systemTicker());
+ }
- private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis, Ticker ticker) {
this.catalog = catalog;
this.caseSensitive = caseSensitive;
+ // Set negative values to zero to avoid creating a Duration with a
negative value
+ this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 :
expirationIntervalMillis;
+ this.expirationEnabled = expirationIntervalMillis > 0;
+ this.tableCache = createTableCache(ticker);
+ }
+
+ private Cache<TableIdentifier, Table> createTableCache(Ticker ticker) {
+ Caffeine<TableIdentifier, Table> cacheBuilder = Caffeine
+ .newBuilder()
+ .softValues()
+ .removalListener(identLoggingRemovalListener);
+
+ // Only setup ticker and background cleanup tasks from expiration if table
expiration is enabled.
+ if (expirationEnabled) {
+ return cacheBuilder
+ .writer(new CacheWriter<TableIdentifier, Table>() {
+ @Override
+ public void write(TableIdentifier tableIdentifier, Table table) {
+ }
Review comment:
The preference is really to inline them. The compiler has trouble
figuring out the types otherwise.
Almost all examples I've seen have inlined like this. But I'll see if I can
get it work. Right now it's complaining because the `Caffeine` object generated
is really `Caffeine<Object, Object>` for a long time and they have some weird
tricks that do the type inferring after a bit. Hence the preference towards
in-lining and a fluent builder where posible.
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables
after
+ * the given interval provided they have not been accessed via the catalog in
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
identLoggingRemovalListener =
+ (key, value, cause) -> LOG.debug("{} was evicted from the TableCache
with {} cause", key, cause);
+
+ // TODO - Update this to use the default expiration interval millis once
things are more complete.
+ // Don't want to break current tests.
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog, 0);
+ }
+
+ public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+ return wrap(catalog, true, expirationIntervalMillis);
+ }
+
+ public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+ return wrap(catalog, true, expirationInterval.toMillis());
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ public static Catalog wrap(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ return new CachingCatalog(catalog, caseSensitive,
expirationIntervalMillis);
}
- private final Cache<TableIdentifier, Table> tableCache =
Caffeine.newBuilder().softValues().build();
private final Catalog catalog;
private final boolean caseSensitive;
+ private final boolean expirationEnabled;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final long expirationIntervalMillis;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final Cache<TableIdentifier, Table> tableCache;
+
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ this(catalog, caseSensitive, expirationIntervalMillis,
Ticker.systemTicker());
+ }
- private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis, Ticker ticker) {
this.catalog = catalog;
this.caseSensitive = caseSensitive;
+ // Set negative values to zero to avoid creating a Duration with a
negative value
+ this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 :
expirationIntervalMillis;
+ this.expirationEnabled = expirationIntervalMillis > 0;
Review comment:
What happens if:
- `cache-enabled=true` and `cache.expiration-interval-ms=0`? Should that
not cache then?
That's the part that I'm hung up on.
##########
File path: core/src/main/java/org/apache/iceberg/CatalogProperties.java
##########
@@ -30,6 +30,14 @@ private CatalogProperties() {
public static final String FILE_IO_IMPL = "io-impl";
public static final String WAREHOUSE_LOCATION = "warehouse";
+ public static final String TABLE_CACHE_ENABLED = "cache-enabled";
+ public static final boolean TABLE_CACHE_ENABLED_DEFAULT = true;
+ public static final String TABLE_CACHE_EXPIRATION_ENABLED =
"cache.expiration-enabled";
Review comment:
Sure that makes sense to me. Since it's a new catalog level config, the
fewer options users have to set to get the same behavior, the better in my view.
One concern is that users will then have to specifically choose a timeout
value. Feedback I've received offline so far has mentioned that people aren't
too sure what value would be best to use. Removing the `ENABLED` option makes
it harder to have a default and remove the need to make that choice.
But I guess if we always set up the examples with the "default" value, users
will likely copy that.
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables
after
Review comment:
Fixed.
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
Review comment:
Fixed. We have a number of places that don't do this, but I'm guessing
we don't want to cause that large diff just for `<p>` (in a separate PR that
is).
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables
after
+ * the given interval provided they have not been accessed via the catalog in
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
identLoggingRemovalListener =
+ (key, value, cause) -> LOG.debug("{} was evicted from the TableCache
with {} cause", key, cause);
+
+ // TODO - Update this to use the default expiration interval millis once
things are more complete.
+ // Don't want to break current tests.
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog, 0);
+ }
+
+ public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+ return wrap(catalog, true, expirationIntervalMillis);
+ }
+
+ public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+ return wrap(catalog, true, expirationInterval.toMillis());
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ public static Catalog wrap(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ return new CachingCatalog(catalog, caseSensitive,
expirationIntervalMillis);
}
- private final Cache<TableIdentifier, Table> tableCache =
Caffeine.newBuilder().softValues().build();
private final Catalog catalog;
private final boolean caseSensitive;
+ private final boolean expirationEnabled;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final long expirationIntervalMillis;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final Cache<TableIdentifier, Table> tableCache;
+
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ this(catalog, caseSensitive, expirationIntervalMillis,
Ticker.systemTicker());
+ }
- private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis, Ticker ticker) {
this.catalog = catalog;
this.caseSensitive = caseSensitive;
+ // Set negative values to zero to avoid creating a Duration with a
negative value
+ this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 :
expirationIntervalMillis;
+ this.expirationEnabled = expirationIntervalMillis > 0;
+ this.tableCache = createTableCache(ticker);
+ }
+
+ private Cache<TableIdentifier, Table> createTableCache(Ticker ticker) {
+ Caffeine<TableIdentifier, Table> cacheBuilder = Caffeine
+ .newBuilder()
+ .softValues()
+ .removalListener(identLoggingRemovalListener);
+
+ // Only setup ticker and background cleanup tasks from expiration if table
expiration is enabled.
+ if (expirationEnabled) {
+ return cacheBuilder
+ .writer(new CacheWriter<TableIdentifier, Table>() {
+ @Override
+ public void write(TableIdentifier tableIdentifier, Table table) {
+ }
Review comment:
I managed to get it. Upgrading my IntelliJ caused a lot of headaches. I
downgraded and it seems fine now.
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables
after
+ * the given interval provided they have not been accessed via the catalog in
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
identLoggingRemovalListener =
+ (key, value, cause) -> LOG.debug("{} was evicted from the TableCache
with {} cause", key, cause);
+
+ // TODO - Update this to use the default expiration interval millis once
things are more complete.
+ // Don't want to break current tests.
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog, 0);
+ }
+
+ public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+ return wrap(catalog, true, expirationIntervalMillis);
+ }
+
+ public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+ return wrap(catalog, true, expirationInterval.toMillis());
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ public static Catalog wrap(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ return new CachingCatalog(catalog, caseSensitive,
expirationIntervalMillis);
}
- private final Cache<TableIdentifier, Table> tableCache =
Caffeine.newBuilder().softValues().build();
private final Catalog catalog;
private final boolean caseSensitive;
+ private final boolean expirationEnabled;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final long expirationIntervalMillis;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final Cache<TableIdentifier, Table> tableCache;
+
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ this(catalog, caseSensitive, expirationIntervalMillis,
Ticker.systemTicker());
+ }
- private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis, Ticker ticker) {
this.catalog = catalog;
this.caseSensitive = caseSensitive;
+ // Set negative values to zero to avoid creating a Duration with a
negative value
+ this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 :
expirationIntervalMillis;
+ this.expirationEnabled = expirationIntervalMillis > 0;
+ this.tableCache = createTableCache(ticker);
+ }
+
+ private Cache<TableIdentifier, Table> createTableCache(Ticker ticker) {
+ Caffeine<TableIdentifier, Table> cacheBuilder = Caffeine
+ .newBuilder()
+ .softValues()
+ .removalListener(identLoggingRemovalListener);
+
+ // Only setup ticker and background cleanup tasks from expiration if table
expiration is enabled.
+ if (expirationEnabled) {
+ return cacheBuilder
+ .writer(new CacheWriter<TableIdentifier, Table>() {
+ @Override
+ public void write(TableIdentifier tableIdentifier, Table table) {
+ }
Review comment:
I managed to get it. Upgrading my IntelliJ caused a lot of headaches. I
downgraded and it seems fine now so it's a separate class.
##########
File path: core/src/test/java/org/apache/iceberg/util/FakeTicker.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A Ticker whose value can be advanced programmatically in test.
+ *
+ * This is modified from the Guava package
com.google.common.testing.FakeTicker,
Review comment:
Removed.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,52 @@ public boolean dropNamespace(String[] namespace) throws
NoSuchNamespaceException
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
- this.cacheEnabled =
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
- Catalog catalog = buildIcebergCatalog(name, options);
+ this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+ CatalogProperties.TABLE_CACHE_ENABLED,
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+ // If the user disabled caching and did not set the
cache.expiration-interval-ms, we'll set it to zero for
+ // them on their behalf. If they disabled caching but explicitly set a
non-zero cache expiration
+ // interval, we will fail initialization as that's an invalid
configuration.
+ long defaultCacheExpirationInterval =
+ cacheEnabled ?
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT : 0L;
+
+ this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+ defaultCacheExpirationInterval);
+
+ // Normalize usage of -1 to 0 as we'll call Duration.ofMillis on this
value.
+ if (cacheExpirationIntervalMs < 0) {
+ this.cacheExpirationIntervalMs = 0;
+ }
+
+ Preconditions.checkArgument(cacheEnabled || cacheExpirationIntervalMs <=
0L,
+ "The catalog's table cache expiration interval must be set to zero via
the property %s if caching is disabled",
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS);
+
+ // If the user didn't specify TABLE_CACHE_EXPIRATION_INTERVAL_MS, put it
into the SparkConf and the
+ // options map in case it's assumed to be elsewhere (such as cloning a
spark session and
+ // re-instantiating the catalog).
Review comment:
Fair enough. Looking at it closer, I don't think it would be a huge
issue.
My worry was that these options get pushed down, and are the options used to
instantiate custom catalogs here:
https://github.com/apache/iceberg/blob/58ceafb6c9e87aee7445141c283d0d2e06e38c5e/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L201-L235
We also modify the options map here:
https://github.com/apache/iceberg/blob/58ceafb6c9e87aee7445141c283d0d2e06e38c5e/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java#L92-L106
But at least for Spark, it should be fine. If they're in the options map,
they are (from user provided config), otherwise they're not but it shouldn't be
an issue.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,52 @@ public boolean dropNamespace(String[] namespace) throws
NoSuchNamespaceException
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
- this.cacheEnabled =
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
- Catalog catalog = buildIcebergCatalog(name, options);
+ this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+ CatalogProperties.TABLE_CACHE_ENABLED,
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+ // If the user disabled caching and did not set the
cache.expiration-interval-ms, we'll set it to zero for
+ // them on their behalf. If they disabled caching but explicitly set a
non-zero cache expiration
+ // interval, we will fail initialization as that's an invalid
configuration.
+ long defaultCacheExpirationInterval =
+ cacheEnabled ?
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT : 0L;
+
+ this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+ defaultCacheExpirationInterval);
+
+ // Normalize usage of -1 to 0 as we'll call Duration.ofMillis on this
value.
+ if (cacheExpirationIntervalMs < 0) {
+ this.cacheExpirationIntervalMs = 0;
+ }
+
+ Preconditions.checkArgument(cacheEnabled || cacheExpirationIntervalMs <=
0L,
+ "The catalog's table cache expiration interval must be set to zero via
the property %s if caching is disabled",
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS);
+
+ // If the user didn't specify TABLE_CACHE_EXPIRATION_INTERVAL_MS, put it
into the SparkConf and the
+ // options map in case it's assumed to be elsewhere (such as cloning a
spark session and
+ // re-instantiating the catalog).
Review comment:
I just checked and it shouldn't be an issue for Flink either. Will
remove.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -384,14 +386,23 @@ public boolean dropNamespace(String[] namespace) throws
NoSuchNamespaceException
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
- this.cacheEnabled =
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
+ this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+ CatalogProperties.TABLE_CACHE_ENABLED,
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+ this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);
+
+ Preconditions.checkArgument(cacheEnabled || cacheExpirationIntervalMs > 0L,
+ "The catalog's table cache expiration interval must be set to zero via
the property {} if caching is disabled",
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS);
+
Review comment:
I think that instead of throwing, since some of the tests are failing
now where cache is simply disabled, I'm going to check if cache is disabled and
the user explicitly set a value greater than zero for the cache expiration
interval ms.
If the user simply disabled caching and didn't change that field (or set it
to zero), we won't throw in that situation and we will default it to zero
instead.
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables
after
+ * the given interval provided they have not been accessed via the catalog in
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
identLoggingRemovalListener =
+ (key, value, cause) -> LOG.debug("{} was evicted from the TableCache
with {} cause", key, cause);
+
+ // TODO - Update this to use the default expiration interval millis once
things are more complete.
+ // Don't want to break current tests.
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog, 0);
+ }
+
+ public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+ return wrap(catalog, true, expirationIntervalMillis);
+ }
+
+ public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+ return wrap(catalog, true, expirationInterval.toMillis());
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ public static Catalog wrap(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ return new CachingCatalog(catalog, caseSensitive,
expirationIntervalMillis);
}
- private final Cache<TableIdentifier, Table> tableCache =
Caffeine.newBuilder().softValues().build();
private final Catalog catalog;
private final boolean caseSensitive;
+ private final boolean expirationEnabled;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final long expirationIntervalMillis;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final Cache<TableIdentifier, Table> tableCache;
+
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ this(catalog, caseSensitive, expirationIntervalMillis,
Ticker.systemTicker());
+ }
- private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis, Ticker ticker) {
this.catalog = catalog;
this.caseSensitive = caseSensitive;
+ // Set negative values to zero to avoid creating a Duration with a
negative value
+ this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 :
expirationIntervalMillis;
+ this.expirationEnabled = expirationIntervalMillis > 0;
Review comment:
As a user, if I had `cache-enabled=true`, I'd be surprised if anything
other than caching was happening.
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables
after
+ * the given interval provided they have not been accessed via the catalog in
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
identLoggingRemovalListener =
+ (key, value, cause) -> LOG.debug("{} was evicted from the TableCache
with {} cause", key, cause);
+
+ // TODO - Update this to use the default expiration interval millis once
things are more complete.
+ // Don't want to break current tests.
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog, 0);
+ }
+
+ public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+ return wrap(catalog, true, expirationIntervalMillis);
+ }
+
+ public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+ return wrap(catalog, true, expirationInterval.toMillis());
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ public static Catalog wrap(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ return new CachingCatalog(catalog, caseSensitive,
expirationIntervalMillis);
}
- private final Cache<TableIdentifier, Table> tableCache =
Caffeine.newBuilder().softValues().build();
private final Catalog catalog;
private final boolean caseSensitive;
+ private final boolean expirationEnabled;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final long expirationIntervalMillis;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final Cache<TableIdentifier, Table> tableCache;
+
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ this(catalog, caseSensitive, expirationIntervalMillis,
Ticker.systemTicker());
+ }
- private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis, Ticker ticker) {
this.catalog = catalog;
this.caseSensitive = caseSensitive;
+ // Set negative values to zero to avoid creating a Duration with a
negative value
+ this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 :
expirationIntervalMillis;
+ this.expirationEnabled = expirationIntervalMillis > 0;
Review comment:
Ok. I will do that. I will have 1 month be don't expire, 1+ be cache
with expire, and 0 be don't cache anymore.
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables
after
+ * the given interval provided they have not been accessed via the catalog in
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
identLoggingRemovalListener =
+ (key, value, cause) -> LOG.debug("{} was evicted from the TableCache
with {} cause", key, cause);
+
+ // TODO - Update this to use the default expiration interval millis once
things are more complete.
+ // Don't want to break current tests.
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog, 0);
+ }
+
+ public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+ return wrap(catalog, true, expirationIntervalMillis);
+ }
+
+ public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+ return wrap(catalog, true, expirationInterval.toMillis());
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ public static Catalog wrap(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ return new CachingCatalog(catalog, caseSensitive,
expirationIntervalMillis);
}
- private final Cache<TableIdentifier, Table> tableCache =
Caffeine.newBuilder().softValues().build();
private final Catalog catalog;
private final boolean caseSensitive;
+ private final boolean expirationEnabled;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final long expirationIntervalMillis;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final Cache<TableIdentifier, Table> tableCache;
+
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ this(catalog, caseSensitive, expirationIntervalMillis,
Ticker.systemTicker());
+ }
- private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis, Ticker ticker) {
this.catalog = catalog;
this.caseSensitive = caseSensitive;
+ // Set negative values to zero to avoid creating a Duration with a
negative value
+ this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 :
expirationIntervalMillis;
+ this.expirationEnabled = expirationIntervalMillis > 0;
Review comment:
I still feel like `-1` would be a better choice for "cache always", with
zero turning it off. But I've updated it to 30 days (a relatively large value).
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables
after
+ * the given interval provided they have not been accessed via the catalog in
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
identLoggingRemovalListener =
+ (key, value, cause) -> LOG.debug("{} was evicted from the TableCache
with {} cause", key, cause);
+
+ // TODO - Update this to use the default expiration interval millis once
things are more complete.
+ // Don't want to break current tests.
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog, 0);
+ }
+
+ public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+ return wrap(catalog, true, expirationIntervalMillis);
+ }
+
+ public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+ return wrap(catalog, true, expirationInterval.toMillis());
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ public static Catalog wrap(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ return new CachingCatalog(catalog, caseSensitive,
expirationIntervalMillis);
}
- private final Cache<TableIdentifier, Table> tableCache =
Caffeine.newBuilder().softValues().build();
private final Catalog catalog;
private final boolean caseSensitive;
+ private final boolean expirationEnabled;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final long expirationIntervalMillis;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final Cache<TableIdentifier, Table> tableCache;
+
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ this(catalog, caseSensitive, expirationIntervalMillis,
Ticker.systemTicker());
+ }
- private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis, Ticker ticker) {
this.catalog = catalog;
this.caseSensitive = caseSensitive;
+ // Set negative values to zero to avoid creating a Duration with a
negative value
+ this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 :
expirationIntervalMillis;
+ this.expirationEnabled = expirationIntervalMillis > 0;
Review comment:
We settled on -1 for turning "cache always" (i.e. expiration is off).
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,52 @@ public boolean dropNamespace(String[] namespace) throws
NoSuchNamespaceException
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
- this.cacheEnabled =
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
- Catalog catalog = buildIcebergCatalog(name, options);
+ this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+ CatalogProperties.TABLE_CACHE_ENABLED,
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+ // If the user disabled caching and did not set the
cache.expiration-interval-ms, we'll set it to zero for
+ // them on their behalf. If they disabled caching but explicitly set a
non-zero cache expiration
+ // interval, we will fail initialization as that's an invalid
configuration.
+ long defaultCacheExpirationInterval =
+ cacheEnabled ?
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT : 0L;
+
+ this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+ defaultCacheExpirationInterval);
+
+ // Normalize usage of -1 to 0 as we'll call Duration.ofMillis on this
value.
+ if (cacheExpirationIntervalMs < 0) {
Review comment:
No negative values would leave caching on, just disable cache expiration.
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -30,23 +35,98 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Class that wraps an Iceberg Catalog to cache table's.
+ *
+ * When `expirationIntervalMillis` is non-zero, the cache will expire tables
after
+ * the given interval provided they have not been accessed via the catalog in
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ *
+ * If the duration is zero, the cache will retain entries forever, or until
+ * an action is taken on the cache to refresh the cache's table entry.
+ */
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
identLoggingRemovalListener =
+ (key, value, cause) -> LOG.debug("{} was evicted from the TableCache
with {} cause", key, cause);
+
+ // TODO - Update this to use the default expiration interval millis once
things are more complete.
+ // Don't want to break current tests.
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog, 0);
+ }
+
+ public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+ return wrap(catalog, true, expirationIntervalMillis);
+ }
+
+ public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
+ return wrap(catalog, true, expirationInterval.toMillis());
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ public static Catalog wrap(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ return new CachingCatalog(catalog, caseSensitive,
expirationIntervalMillis);
}
- private final Cache<TableIdentifier, Table> tableCache =
Caffeine.newBuilder().softValues().build();
private final Catalog catalog;
private final boolean caseSensitive;
+ private final boolean expirationEnabled;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final long expirationIntervalMillis;
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected final Cache<TableIdentifier, Table> tableCache;
+
+ private CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis) {
+ this(catalog, caseSensitive, expirationIntervalMillis,
Ticker.systemTicker());
+ }
- private CachingCatalog(Catalog catalog, boolean caseSensitive) {
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ protected CachingCatalog(Catalog catalog, boolean caseSensitive, long
expirationIntervalMillis, Ticker ticker) {
this.catalog = catalog;
this.caseSensitive = caseSensitive;
+ // Set negative values to zero to avoid creating a Duration with a
negative value
+ this.expirationIntervalMillis = expirationIntervalMillis <= 0 ? 0 :
expirationIntervalMillis;
+ this.expirationEnabled = expirationIntervalMillis > 0;
Review comment:
Oh and the Precondition check ensures that we haven't entered
`CachingCatalog` with a value of `0` for expiration interval (as that should
turn off caching, which is controlled by no using this wrapper).
##########
File path: core/src/main/java/org/apache/iceberg/CachingCatalog.java
##########
@@ -29,24 +34,103 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps an Iceberg Catalog to cache tables.
+ * <p>
+ * When {@code expirationIntervalMillis} is positive, the cache will expire
tables after
+ * the given interval provided they have not been accessed via the catalog in
that time.
+ * Each lookup of the table via the catalog will restart the expiration time.
+ * <p>
+ * If the duration is negative / -1, cache-entry expiration is disabled.
+ * <p>
+ * If the duration is zero, caching should be turned off. If a value of zero
gets passed to
+ * this {@link Catalog} wrapper, that is a bug.
+ */
public class CachingCatalog implements Catalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private static final RemovalListener<TableIdentifier, Table>
identLoggingRemovalListener =
+ (key, value, cause) -> LOG.debug("Evicted {} from the table cache ({})",
key, cause);
+
public static Catalog wrap(Catalog catalog) {
- return wrap(catalog, true);
+ return wrap(catalog,
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_OFF);
+ }
+
+ public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
+ return wrap(catalog, true, expirationIntervalMillis);
}
- public static Catalog wrap(Catalog catalog, boolean caseSensitive) {
- return new CachingCatalog(catalog, caseSensitive);
+ public static Catalog wrap(Catalog catalog, Duration expirationInterval) {
Review comment:
Yeah I'll drop it.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -83,7 +89,8 @@
private String catalogName = null;
private Catalog icebergCatalog = null;
- private boolean cacheEnabled = true;
+ private boolean cacheEnabled = CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT;
+ private long cacheExpirationIntervalMs =
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
Review comment:
I use them in the Spark test, but I can assert on properties of the
CachingCatalog underneath instead.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,31 @@ public boolean dropNamespace(String[] namespace) throws
NoSuchNamespaceException
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
- this.cacheEnabled =
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
+ this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+ CatalogProperties.TABLE_CACHE_ENABLED,
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+ // If the user disabled caching, we turn off the cache-expiration for them
+ long defaultCacheExpirationInterval = cacheEnabled ?
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT :
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_OFF;
+
+ this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+ defaultCacheExpirationInterval);
Review comment:
Sure. That makes sense.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,31 @@ public boolean dropNamespace(String[] namespace) throws
NoSuchNamespaceException
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
- this.cacheEnabled =
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
+ this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+ CatalogProperties.TABLE_CACHE_ENABLED,
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+ // If the user disabled caching, we turn off the cache-expiration for them
+ long defaultCacheExpirationInterval = cacheEnabled ?
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT :
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_OFF;
+
+ this.cacheExpirationIntervalMs = PropertyUtil.propertyAsLong(options,
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS,
+ defaultCacheExpirationInterval);
Review comment:
Done.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -383,14 +390,31 @@ public boolean dropNamespace(String[] namespace) throws
NoSuchNamespaceException
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
- this.cacheEnabled =
Boolean.parseBoolean(options.getOrDefault("cache-enabled", "true"));
+ this.cacheEnabled = PropertyUtil.propertyAsBoolean(options,
+ CatalogProperties.TABLE_CACHE_ENABLED,
CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT);
+
+ // If the user disabled caching, we turn off the cache-expiration for them
+ long defaultCacheExpirationInterval = cacheEnabled ?
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT :
+ CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_OFF;
Review comment:
Removed.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -83,7 +89,8 @@
private String catalogName = null;
private Catalog icebergCatalog = null;
- private boolean cacheEnabled = true;
+ private boolean cacheEnabled = CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT;
+ private long cacheExpirationIntervalMs =
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
Review comment:
Actually `cacheEnabled` is used in several other places. It needs to be
a field. But I can remove `cacheExpirationIntervalMs`
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
##########
@@ -83,7 +89,8 @@
private String catalogName = null;
private Catalog icebergCatalog = null;
- private boolean cacheEnabled = true;
+ private boolean cacheEnabled = CatalogProperties.TABLE_CACHE_ENABLED_DEFAULT;
+ private long cacheExpirationIntervalMs =
CatalogProperties.TABLE_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
Review comment:
I removed `cacheExpirationIntervalMs` and made it a local variable
inside `initialize`. The rest is left as it was before this PR.
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {
Review comment:
I actually override it with 4 catalogs to test different combinations of
behaviors.
I can change the base, but the outcome will still be the same.
Let me re-evaluate the test cases now that some changes have been made.
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {
Review comment:
I guess I can, but all of these things test different things. Also,
`SparkTestBaseWIthCatalog` errors out when I try to pass it any config (which I
would need here).
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {
Review comment:
Ok. So I figured out how to make `SparkTestBaseWithCatalog` work. And I
used it to test the SparkSessionCatalog.
But I'm not sure which test case I should get rid of here:
1) cache-enabled = false
2) cache-enabled = true, expiration interval is above zero
3) cache-enabled = true, expiration interval is zero so caching shouldn't
happen.
I feel I can get rid of test case number one just fine, but I'd like to keep
the other two.
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {
Review comment:
Ok. So in the interest of speed / being able to parallelize test suites,
as well as making the code cleaner, I've left this as only two test case.
This file has two test cases, one for cache enabled / exipration enabled and
one for cache enabled / expiration disabled.
I added a separate test using `SparkTestBaseWithCatalog` that tests
implicitly turning off caching by using a value of 0 for the expiration
interval.
I also added a separate test using `SparkTestBaseWithCatalog` that tests one
example of cache expiration enabled on the SparkSessionCatalog.
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {
Review comment:
So there are now 4 tests. There are a lot of possible combinations to
test, let me know which ones you think I should remove.
1. TestSparkCatalogCacheExpirationDisabled
1a. Checks that CachingCatalog is not used when
cache.expiration-interval-millis=0 (this one seems important).
2. TestSparkCatalogCacheExpirationEnabled
2a. Check with cache-enabled and expiration enabled via 30s (in millis)
2b. Check with cache-enabled and expiration disabled via `-1`
3. TestSparkSessionCatalogCacheExpiration
3a. Same as 2a, but using the SparkSessionCatalog
I think that 1a is important to have a test that verifies that behavior,
same with 2b. If we wanted to get rid of a test, I would vote for 2a and just
use 3a instead (since that tests the same behavior but ALSO ensures that the
SparkSessionCatalog works with the new configs as well).
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {
Review comment:
So there are now 4 tests. There are a lot of possible combinations to
test, let me know which ones you think I should remove.
1. **TestSparkCatalogCacheExpirationDisabled**
1a. Checks that CachingCatalog is not used when
cache.expiration-interval-millis=0 (this one seems important).
2. **TestSparkCatalogCacheExpirationEnabled**
2a. Check with cache-enabled and expiration enabled via 30s (in millis)
2b. Check with cache-enabled and expiration disabled via `-1`
3. **TestSparkSessionCatalogCacheExpiration**
3a. Same as 2a, but using the SparkSessionCatalog
I think that 1a is important to have a test that verifies that behavior,
same with 2b. If we wanted to get rid of a test, I would vote for 2a and just
use 3a instead (since that tests the same behavior but ALSO ensures that the
SparkSessionCatalog works with the new configs as well).
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogCacheExpiration.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestSparkCatalogCacheExpiration extends SparkCatalogTestBase {
Review comment:
The reason that we need to setup different Spark catalogs to test this
is that the configuration that gets passed into `initialize`, so a second
behavior test can't be run over the same catalog.
Again, let me know which tests to remove. And when I backport it, I'll only
backport a subset of the tests if we leave multiple in this one. But I think at
least 2 of these are necessary checks (`cache.expiration-interval-millis=0`
disabling caching entirely and `cache.expiration-interval-millis=-1` for cache
forever). Ideally we'd leave 3 but I understand these Spark tests are expensive.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]