IMPALA-7447. Evict LocalCatalog cache entries based on size

This pulls in the 'sizeof' library from ehcache (Apache-licensed) and
uses it to implement size-based eviction of cache entries in
LocalCatalog.

This is difficult to test without being quite fragile to small changes
in the cached structures. However, I added a simple unit test as a
general sanity-check that it is computing some reasonable result.

Change-Id: Ia96af49b35c17e505b7b6785e78d140939085d91
Reviewed-on: http://gerrit.cloudera.org:8080/11231
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Todd Lipcon <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/07823211
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/07823211
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/07823211

Branch: refs/heads/master
Commit: 078232112469c238d5332453627eb37dcb10eb97
Parents: c0c3de2
Author: Todd Lipcon <[email protected]>
Authored: Tue Aug 14 01:42:43 2018 -0700
Committer: Todd Lipcon <[email protected]>
Committed: Wed Aug 22 16:05:55 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/exec-env.cc                      | 15 ++++-
 be/src/util/backend-gflag-util.cc               |  5 ++
 common/thrift/BackendGflags.thrift              |  4 ++
 fe/pom.xml                                      |  6 ++
 .../catalog/local/CatalogdMetaProvider.java     | 66 +++++++++++++++++---
 .../impala/catalog/local/LocalCatalog.java      |  4 +-
 .../catalog/local/CatalogdMetaProviderTest.java | 25 +++++++-
 7 files changed, 113 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 319e948..924073a 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -81,8 +81,19 @@ DEFINE_int32(num_hdfs_worker_threads, 16,
     "(Advanced) The number of threads in the global HDFS operation pool");
 
 DEFINE_bool_hidden(use_local_catalog, false,
-  "Use experimental implementation of a local catalog. If this is set, "
-  "the catalog service is not used and does not need to be started.");
+    "Use experimental implementation of a local catalog. If this is set, "
+    "the catalog service is not used and does not need to be started.");
+DEFINE_int32_hidden(local_catalog_cache_mb, -1,
+    "If --use_local_catalog is enabled, configures the size of the catalog "
+    "cache within each impalad. If this is set to -1, the cache is auto-"
+    "configured to 60% of the configured Java heap size. Note that the Java "
+    "heap size is distinct from and typically smaller than the overall "
+    "Impala memory limit.");
+DEFINE_int32_hidden(local_catalog_cache_expiration_s, 60 * 60,
+    "If --use_local_catalog is enabled, configures the expiration time "
+    "of the catalog cache within each impalad. Even if the configured "
+    "cache capacity has not been reached, items are removed from the cache "
+    "if they have not been accessed in this amount of time.");
 
 DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index c7982bf..051d396 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -29,6 +29,8 @@ DECLARE_bool(load_auth_to_local_rules);
 DECLARE_bool(enable_stats_extrapolation);
 DECLARE_bool(enable_orc_scanner);
 DECLARE_bool(use_local_catalog);
+DECLARE_int32(local_catalog_cache_expiration_s);
+DECLARE_int32(local_catalog_cache_mb);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_int32(num_metadata_loading_threads);
 DECLARE_int32(max_hdfs_partitions_parallel_load);
@@ -63,6 +65,9 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* 
cfg_bytes) {
   cfg.__set_load_catalog_in_background(FLAGS_load_catalog_in_background);
   cfg.__set_enable_orc_scanner(FLAGS_enable_orc_scanner);
   cfg.__set_use_local_catalog(FLAGS_use_local_catalog);
+  cfg.__set_local_catalog_cache_mb(FLAGS_local_catalog_cache_mb);
+  cfg.__set_local_catalog_cache_expiration_s(
+    FLAGS_local_catalog_cache_expiration_s);
   cfg.__set_server_name(FLAGS_server_name);
   cfg.__set_sentry_config(FLAGS_sentry_config);
   cfg.__set_authorization_policy_provider_class(

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 6852d27..7ba6c7e 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -81,4 +81,8 @@ struct TBackendGflags {
   27: required bool use_local_catalog
 
   28: required bool disable_catalog_data_ops_debug_only
+
+  29: required i32 local_catalog_cache_mb
+
+  30: required i32 local_catalog_cache_expiration_s
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index c695a5b..896dce5 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -467,6 +467,12 @@ under the License.
     </dependency>
 
     <dependency>
+        <groupId>org.ehcache</groupId>
+        <artifactId>sizeof</artifactId>
+        <version>0.3.0</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.googlecode.json-simple</groupId>
       <artifactId>json-simple</artifactId>
       <version>1.1.1</version>

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 4d2aa6b..5c1d820 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog.local;
 
+import java.lang.management.ManagementFactory;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,7 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
@@ -55,15 +57,18 @@ import org.apache.impala.util.ListMap;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
+import org.ehcache.sizeof.SizeOf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheStats;
+import com.google.common.cache.Weigher;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -121,13 +126,34 @@ public class CatalogdMetaProvider implements MetaProvider 
{
   // to the "direct" provider for now and circumvent catalogd.
   private DirectMetaProvider directProvider_ = new DirectMetaProvider();
 
-  // TODO(todd): hard-coded TTL is not the final solution here. We should 
implement
-  // memory estimation for all cached objects, and evict based on a 
configurable
-  // memory pressure.
-  final Cache<Object,Object> cache_ = CacheBuilder.newBuilder()
-      .expireAfterAccess(1, TimeUnit.HOURS)
-      .recordStats()
-      .build();
+
+  final Cache<Object,Object> cache_;
+
+  public CatalogdMetaProvider(TBackendGflags flags) {
+    Preconditions.checkArgument(flags.isSetLocal_catalog_cache_expiration_s());
+    Preconditions.checkArgument(flags.isSetLocal_catalog_cache_mb());
+
+    long cacheSizeBytes;
+    if (flags.local_catalog_cache_mb < 0) {
+      long maxHeapBytes = ManagementFactory.getMemoryMXBean()
+          .getHeapMemoryUsage().getMax();
+      cacheSizeBytes = (long)(maxHeapBytes * 0.6);
+    } else {
+      cacheSizeBytes = flags.local_catalog_cache_mb * 1024 * 1024;
+    }
+    int expirationSecs = flags.local_catalog_cache_expiration_s;
+    LOG.info("Metadata cache configuration: capacity={} MB, expiration={} sec",
+        cacheSizeBytes/1024/1024, expirationSecs);
+
+    // TODO(todd) add end-to-end test cases which stress cache eviction (both 
time
+    // and size-triggered) and make sure results are still correct.
+    cache_ = CacheBuilder.newBuilder()
+        .maximumWeight(cacheSizeBytes)
+        .expireAfterAccess(expirationSecs, TimeUnit.SECONDS)
+        .weigher(new SizeOfWeigher())
+        .recordStats()
+        .build();
+  }
 
   public CacheStats getCacheStats() {
     return cache_.stats();
@@ -729,4 +755,30 @@ public class CatalogdMetaProvider implements MetaProvider {
       return super.equals(obj) && partId_ == other.partId_;
     }
   }
+
+  @VisibleForTesting
+  static class SizeOfWeigher implements Weigher<Object, Object> {
+    // Bypass flyweight objects like small boxed integers, Boolean.TRUE, 
enums, etc.
+    private static final boolean BYPASS_FLYWEIGHT = true;
+    // Cache the reflected sizes of classes seen.
+    private static final boolean CACHE_SIZES = true;
+
+    private static SizeOf SIZEOF = SizeOf.newInstance(BYPASS_FLYWEIGHT, 
CACHE_SIZES);
+
+    private static final int BYTES_PER_WORD = 8; // Assume 64-bit VM.
+    // Guava cache overhead based on:
+    // 
http://code-o-matic.blogspot.com/2012/02/updated-memory-cost-per-javaguava.html
+    private static final int OVERHEAD_PER_ENTRY =
+        12 * BYTES_PER_WORD + // base cost per entry
+        4 * BYTES_PER_WORD;  // for use of 'maximumSize()'
+
+    @Override
+    public int weigh(Object key, Object value) {
+      long size = SIZEOF.deepSizeOf(key, value) + OVERHEAD_PER_ENTRY;
+      if (size > Integer.MAX_VALUE) {
+        return Integer.MAX_VALUE;
+      }
+      return (int)size;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 89f5345..c3c918e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -40,6 +40,7 @@ import org.apache.impala.catalog.Function.CompareMode;
 import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.PartitionNotFoundException;
 import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TUniqueId;
@@ -71,7 +72,8 @@ public class LocalCatalog implements FeCatalog {
   private String nullPartitionKeyValue_;
   private final String defaultKuduMasterHosts_;
 
-  private static MetaProvider PROVIDER = new CatalogdMetaProvider();
+  private static MetaProvider PROVIDER = new CatalogdMetaProvider(
+      BackendConfig.INSTANCE.getBackendCfg());
 
   public static LocalCatalog create(String defaultKuduMasterHosts) {
     return new LocalCatalog(PROVIDER, defaultKuduMasterHosts);

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
----------------------------------------------------------------------
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index f7c3abd..d5da6ad 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -17,18 +17,20 @@
 
 package org.apache.impala.catalog.local;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.catalog.local.CatalogdMetaProvider.SizeOfWeigher;
 import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
 import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
 import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.Pair;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.util.ListMap;
 import org.junit.Test;
@@ -53,7 +55,11 @@ public class CatalogdMetaProviderTest {
   }
 
   public CatalogdMetaProviderTest() throws Exception {
-    provider_ = new CatalogdMetaProvider();
+    // Set sufficient expiration/capacity for the test to not evict.
+    TBackendGflags flags = new TBackendGflags();
+    flags.setLocal_catalog_cache_expiration_s(3600);
+    flags.setLocal_catalog_cache_mb(100);
+    provider_ = new CatalogdMetaProvider(flags);
     Pair<Table, TableMetaRef> tablePair = provider_.loadTable("functional", 
"alltypes");
     tableRef_ = tablePair.second;
     prevStats_ = provider_.getCacheStats();
@@ -138,4 +144,19 @@ public class CatalogdMetaProviderTest {
     assertEquals(2, stats.hitCount());
     assertEquals(0, stats.missCount());
   }
+
+  @Test
+  public void testWeights() throws Exception {
+    List<PartitionRef> refs = provider_.loadPartitionList(tableRef_);
+    ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+    provider_.loadPartitionsByRefs(tableRef_, /* ignored */null, hostIndex , 
refs);
+
+    // Unfortunately Guava doesn't provide a statistic on the total weight of 
cached
+    // elements. So, we'll just instantiate the weigher directly and sanity 
check
+    // the size loosely.
+    SizeOfWeigher weigher = new SizeOfWeigher();
+    assertTrue(weigher.weigh(refs, null) > 3000);
+    assertTrue(weigher.weigh(refs, null) < 4000);
+  }
+
 }
\ No newline at end of file

Reply via email to