IMPALA-7447. Evict LocalCatalog cache entries based on size This pulls in the 'sizeof' library from ehcache (Apache-licensed) and uses it to implement size-based eviction of cache entries in LocalCatalog.
This is difficult to test without being quite fragile to small changes in the cached structures. However, I added a simple unit test as a general sanity-check that it is computing some reasonable result. Change-Id: Ia96af49b35c17e505b7b6785e78d140939085d91 Reviewed-on: http://gerrit.cloudera.org:8080/11231 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/07823211 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/07823211 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/07823211 Branch: refs/heads/master Commit: 078232112469c238d5332453627eb37dcb10eb97 Parents: c0c3de2 Author: Todd Lipcon <[email protected]> Authored: Tue Aug 14 01:42:43 2018 -0700 Committer: Todd Lipcon <[email protected]> Committed: Wed Aug 22 16:05:55 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/exec-env.cc | 15 ++++- be/src/util/backend-gflag-util.cc | 5 ++ common/thrift/BackendGflags.thrift | 4 ++ fe/pom.xml | 6 ++ .../catalog/local/CatalogdMetaProvider.java | 66 +++++++++++++++++--- .../impala/catalog/local/LocalCatalog.java | 4 +- .../catalog/local/CatalogdMetaProviderTest.java | 25 +++++++- 7 files changed, 113 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 319e948..924073a 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -81,8 +81,19 @@ DEFINE_int32(num_hdfs_worker_threads, 16, "(Advanced) The number of threads in the global HDFS operation pool"); DEFINE_bool_hidden(use_local_catalog, false, - "Use experimental implementation of a local catalog. If this is set, " - "the catalog service is not used and does not need to be started."); + "Use experimental implementation of a local catalog. If this is set, " + "the catalog service is not used and does not need to be started."); +DEFINE_int32_hidden(local_catalog_cache_mb, -1, + "If --use_local_catalog is enabled, configures the size of the catalog " + "cache within each impalad. If this is set to -1, the cache is auto-" + "configured to 60% of the configured Java heap size. Note that the Java " + "heap size is distinct from and typically smaller than the overall " + "Impala memory limit."); +DEFINE_int32_hidden(local_catalog_cache_expiration_s, 60 * 60, + "If --use_local_catalog is enabled, configures the expiration time " + "of the catalog cache within each impalad. Even if the configured " + "cache capacity has not been reached, items are removed from the cache " + "if they have not been accessed in this amount of time."); DECLARE_int32(state_store_port); DECLARE_int32(num_threads_per_core); http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/be/src/util/backend-gflag-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index c7982bf..051d396 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -29,6 +29,8 @@ DECLARE_bool(load_auth_to_local_rules); DECLARE_bool(enable_stats_extrapolation); DECLARE_bool(enable_orc_scanner); DECLARE_bool(use_local_catalog); +DECLARE_int32(local_catalog_cache_expiration_s); +DECLARE_int32(local_catalog_cache_mb); DECLARE_int32(non_impala_java_vlog); DECLARE_int32(num_metadata_loading_threads); DECLARE_int32(max_hdfs_partitions_parallel_load); @@ -63,6 +65,9 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) { cfg.__set_load_catalog_in_background(FLAGS_load_catalog_in_background); cfg.__set_enable_orc_scanner(FLAGS_enable_orc_scanner); cfg.__set_use_local_catalog(FLAGS_use_local_catalog); + cfg.__set_local_catalog_cache_mb(FLAGS_local_catalog_cache_mb); + cfg.__set_local_catalog_cache_expiration_s( + FLAGS_local_catalog_cache_expiration_s); cfg.__set_server_name(FLAGS_server_name); cfg.__set_sentry_config(FLAGS_sentry_config); cfg.__set_authorization_policy_provider_class( http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/common/thrift/BackendGflags.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index 6852d27..7ba6c7e 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -81,4 +81,8 @@ struct TBackendGflags { 27: required bool use_local_catalog 28: required bool disable_catalog_data_ops_debug_only + + 29: required i32 local_catalog_cache_mb + + 30: required i32 local_catalog_cache_expiration_s } http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/pom.xml ---------------------------------------------------------------------- diff --git a/fe/pom.xml b/fe/pom.xml index c695a5b..896dce5 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -467,6 +467,12 @@ under the License. </dependency> <dependency> + <groupId>org.ehcache</groupId> + <artifactId>sizeof</artifactId> + <version>0.3.0</version> + </dependency> + + <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1.1</version> http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index 4d2aa6b..5c1d820 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -17,6 +17,7 @@ package org.apache.impala.catalog.local; +import java.lang.management.ManagementFactory; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import org.apache.impala.common.InternalException; import org.apache.impala.common.Pair; import org.apache.impala.common.Reference; import org.apache.impala.service.FeSupport; +import org.apache.impala.thrift.TBackendGflags; import org.apache.impala.thrift.TCatalogInfoSelector; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; @@ -55,15 +57,18 @@ import org.apache.impala.util.ListMap; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; +import org.ehcache.sizeof.SizeOf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheStats; +import com.google.common.cache.Weigher; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -121,13 +126,34 @@ public class CatalogdMetaProvider implements MetaProvider { // to the "direct" provider for now and circumvent catalogd. private DirectMetaProvider directProvider_ = new DirectMetaProvider(); - // TODO(todd): hard-coded TTL is not the final solution here. We should implement - // memory estimation for all cached objects, and evict based on a configurable - // memory pressure. - final Cache<Object,Object> cache_ = CacheBuilder.newBuilder() - .expireAfterAccess(1, TimeUnit.HOURS) - .recordStats() - .build(); + + final Cache<Object,Object> cache_; + + public CatalogdMetaProvider(TBackendGflags flags) { + Preconditions.checkArgument(flags.isSetLocal_catalog_cache_expiration_s()); + Preconditions.checkArgument(flags.isSetLocal_catalog_cache_mb()); + + long cacheSizeBytes; + if (flags.local_catalog_cache_mb < 0) { + long maxHeapBytes = ManagementFactory.getMemoryMXBean() + .getHeapMemoryUsage().getMax(); + cacheSizeBytes = (long)(maxHeapBytes * 0.6); + } else { + cacheSizeBytes = flags.local_catalog_cache_mb * 1024 * 1024; + } + int expirationSecs = flags.local_catalog_cache_expiration_s; + LOG.info("Metadata cache configuration: capacity={} MB, expiration={} sec", + cacheSizeBytes/1024/1024, expirationSecs); + + // TODO(todd) add end-to-end test cases which stress cache eviction (both time + // and size-triggered) and make sure results are still correct. + cache_ = CacheBuilder.newBuilder() + .maximumWeight(cacheSizeBytes) + .expireAfterAccess(expirationSecs, TimeUnit.SECONDS) + .weigher(new SizeOfWeigher()) + .recordStats() + .build(); + } public CacheStats getCacheStats() { return cache_.stats(); @@ -729,4 +755,30 @@ public class CatalogdMetaProvider implements MetaProvider { return super.equals(obj) && partId_ == other.partId_; } } + + @VisibleForTesting + static class SizeOfWeigher implements Weigher<Object, Object> { + // Bypass flyweight objects like small boxed integers, Boolean.TRUE, enums, etc. + private static final boolean BYPASS_FLYWEIGHT = true; + // Cache the reflected sizes of classes seen. + private static final boolean CACHE_SIZES = true; + + private static SizeOf SIZEOF = SizeOf.newInstance(BYPASS_FLYWEIGHT, CACHE_SIZES); + + private static final int BYTES_PER_WORD = 8; // Assume 64-bit VM. + // Guava cache overhead based on: + // http://code-o-matic.blogspot.com/2012/02/updated-memory-cost-per-javaguava.html + private static final int OVERHEAD_PER_ENTRY = + 12 * BYTES_PER_WORD + // base cost per entry + 4 * BYTES_PER_WORD; // for use of 'maximumSize()' + + @Override + public int weigh(Object key, Object value) { + long size = SIZEOF.deepSizeOf(key, value) + OVERHEAD_PER_ENTRY; + if (size > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int)size; + } + } } http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java index 89f5345..c3c918e 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java @@ -40,6 +40,7 @@ import org.apache.impala.catalog.Function.CompareMode; import org.apache.impala.catalog.HdfsCachePool; import org.apache.impala.catalog.PartitionNotFoundException; import org.apache.impala.catalog.PrunablePartition; +import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TPartitionKeyValue; import org.apache.impala.thrift.TUniqueId; @@ -71,7 +72,8 @@ public class LocalCatalog implements FeCatalog { private String nullPartitionKeyValue_; private final String defaultKuduMasterHosts_; - private static MetaProvider PROVIDER = new CatalogdMetaProvider(); + private static MetaProvider PROVIDER = new CatalogdMetaProvider( + BackendConfig.INSTANCE.getBackendCfg()); public static LocalCatalog create(String defaultKuduMasterHosts) { return new LocalCatalog(PROVIDER, defaultKuduMasterHosts); http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java index f7c3abd..d5da6ad 100644 --- a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java @@ -17,18 +17,20 @@ package org.apache.impala.catalog.local; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import java.util.List; import java.util.Map; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.impala.catalog.local.CatalogdMetaProvider.SizeOfWeigher; import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata; import org.apache.impala.catalog.local.MetaProvider.PartitionRef; import org.apache.impala.catalog.local.MetaProvider.TableMetaRef; import org.apache.impala.common.Pair; import org.apache.impala.service.FeSupport; +import org.apache.impala.thrift.TBackendGflags; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.util.ListMap; import org.junit.Test; @@ -53,7 +55,11 @@ public class CatalogdMetaProviderTest { } public CatalogdMetaProviderTest() throws Exception { - provider_ = new CatalogdMetaProvider(); + // Set sufficient expiration/capacity for the test to not evict. + TBackendGflags flags = new TBackendGflags(); + flags.setLocal_catalog_cache_expiration_s(3600); + flags.setLocal_catalog_cache_mb(100); + provider_ = new CatalogdMetaProvider(flags); Pair<Table, TableMetaRef> tablePair = provider_.loadTable("functional", "alltypes"); tableRef_ = tablePair.second; prevStats_ = provider_.getCacheStats(); @@ -138,4 +144,19 @@ public class CatalogdMetaProviderTest { assertEquals(2, stats.hitCount()); assertEquals(0, stats.missCount()); } + + @Test + public void testWeights() throws Exception { + List<PartitionRef> refs = provider_.loadPartitionList(tableRef_); + ListMap<TNetworkAddress> hostIndex = new ListMap<>(); + provider_.loadPartitionsByRefs(tableRef_, /* ignored */null, hostIndex , refs); + + // Unfortunately Guava doesn't provide a statistic on the total weight of cached + // elements. So, we'll just instantiate the weigher directly and sanity check + // the size loosely. + SizeOfWeigher weigher = new SizeOfWeigher(); + assertTrue(weigher.weigh(refs, null) > 3000); + assertTrue(weigher.weigh(refs, null) < 4000); + } + } \ No newline at end of file
