HIVE-16133 : Footer cache in Tez AM can take too much memory (Sergey Shelukhin, 
reviewed by Gunther Hagleitner and Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e32f6622
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e32f6622
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e32f6622

Branch: refs/heads/hive-14535
Commit: e32f6622fc4506c553eeb0eeb80e792ad6161317
Parents: 76b65ba
Author: Sergey Shelukhin <[email protected]>
Authored: Sat Mar 11 16:09:22 2017 -0800
Committer: Sergey Shelukhin <[email protected]>
Committed: Sat Mar 11 16:09:22 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  4 +-
 .../hadoop/hive/ql/io/orc/LocalCache.java       | 56 +++++++++++++-------
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 15 +++---
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  9 ++--
 4 files changed, 50 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e32f6622/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f68bd35..1fb3253 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1357,8 +1357,8 @@ public class HiveConf extends Configuration {
         "Include file ID in splits on file systems that support it."),
     
HIVE_ORC_ALLOW_SYNTHETIC_FILE_ID_IN_SPLITS("hive.orc.splits.allow.synthetic.fileid",
 true,
         "Allow synthetic file ID in splits on file systems that don't have a 
native one."),
-    HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 
10000,
-        "Max cache size for keeping meta info about orc splits cached in the 
client."),
+    
HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE("hive.orc.cache.stripe.details.mem.size",
 "256Mb",
+        new SizeValidator(), "Maximum size of orc splits cached in the 
client."),
     HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 
10,
         "How many threads orc should use to create splits in parallel."),
     HIVE_ORC_CACHE_USE_SOFT_REFERENCES("hive.orc.cache.use.soft.references", 
false,

http://git-wip-us.apache.org/repos/asf/hive/blob/e32f6622/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java
index 88b65dc..b375aea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java
@@ -32,17 +32,40 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.Weigher;
 
 class LocalCache implements OrcInputFormat.FooterCache {
   private static final Logger LOG = LoggerFactory.getLogger(LocalCache.class);
   private static final int DEFAULT_CACHE_INITIAL_CAPACITY = 1024;
-  private final Cache<Path, OrcTail> cache;
 
-  LocalCache(int numThreads, int cacheStripeDetailsSize, boolean useSoftRef) {
-    CacheBuilder builder = CacheBuilder.newBuilder()
+  private static final class TailAndFileData {
+    public TailAndFileData(long fileLength, long fileModificationTime, 
ByteBuffer bb) {
+      this.fileLength = fileLength;
+      this.fileModTime = fileModificationTime;
+      this.bb = bb;
+    }
+    public ByteBuffer bb;
+    public long fileLength, fileModTime;
+
+    public int getMemoryUsage() {
+      return bb.remaining() + 100; // 100 is for 2 longs, BB and java 
overheads (semi-arbitrary).
+    }
+  }
+
+  private final Cache<Path, TailAndFileData> cache;
+
+  LocalCache(int numThreads, long cacheMemSize, boolean useSoftRef) {
+    CacheBuilder<Path, TailAndFileData> builder = CacheBuilder.newBuilder()
         .initialCapacity(DEFAULT_CACHE_INITIAL_CAPACITY)
         .concurrencyLevel(numThreads)
-        .maximumSize(cacheStripeDetailsSize);
+        .maximumWeight(cacheMemSize)
+        .weigher(new Weigher<Path, TailAndFileData>() {
+          @Override
+          public int weigh(Path key, TailAndFileData value) {
+            return value.getMemoryUsage();
+          }
+        });
+
     if (useSoftRef) {
       builder = builder.softValues();
     }
@@ -55,11 +78,8 @@ class LocalCache implements OrcInputFormat.FooterCache {
   }
 
   public void put(Path path, OrcTail tail) {
-    cache.put(path, tail);
-  }
-
-  public OrcTail get(Path path) {
-    return cache.getIfPresent(path);
+    cache.put(path, new TailAndFileData(tail.getFileTail().getFileLength(),
+        tail.getFileModificationTime(), tail.getSerializedTail().duplicate()));
   }
 
   @Override
@@ -74,23 +94,21 @@ class LocalCache implements OrcInputFormat.FooterCache {
       ++i;
       FileStatus file = fileWithId.getFileStatus();
       Path path = file.getPath();
-      OrcTail tail = cache.getIfPresent(path);
+      TailAndFileData tfd = cache.getIfPresent(path);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Serialized tail " + (tail == null ? "not " : "") + "cached 
for path: " + path);
+        LOG.debug("Serialized tail " + (tfd == null ? "not " : "") + "cached 
for path: " + path);
       }
-      if (tail == null) continue;
-      if (tail != null && file.getLen() == tail.getFileTail().getFileLength()
-          && file.getModificationTime() == tail.getFileModificationTime()) {
-        result[i] = tail;
+      if (tfd == null) continue;
+      if (file.getLen() == tfd.fileLength && file.getModificationTime() == 
tfd.fileModTime) {
+        result[i] = ReaderImpl.extractFileTail(tfd.bb.duplicate(), 
tfd.fileLength, tfd.fileModTime);
         continue;
       }
       // Invalidate
       cache.invalidate(path);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Meta-Info for : " + path + " changed. 
CachedModificationTime: "
-            + tail.getFileModificationTime() + ", CurrentModificationTime: "
-            + file.getModificationTime() + ", CachedLength: " + 
tail.getFileTail().getFileLength()
-            + ", CurrentLength: " + file.getLen());
+            + tfd.fileModTime + ", CurrentModificationTime: " + 
file.getModificationTime()
+            + ", CachedLength: " + tfd.fileLength + ", CurrentLength: " + 
file.getLen());
       }
     }
   }
@@ -110,4 +128,4 @@ class LocalCache implements OrcInputFormat.FooterCache {
       throws IOException {
     put(cacheKey.getPath(), orcTail);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e32f6622/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 369584b..59682db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -587,14 +587,13 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
           Math.max(conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0), 0);
       splitStrategyBatchMs = HiveConf.getIntVar(conf, 
ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS);
       LOG.debug("Number of buckets specified by conf file is " + numBuckets);
-      int cacheStripeDetailsSize = HiveConf.getIntVar(conf,
-          ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE);
-      int numThreads = HiveConf.getIntVar(conf,
-          ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS);
-      boolean useSoftReference = HiveConf.getBoolVar(conf,
-          ConfVars.HIVE_ORC_CACHE_USE_SOFT_REFERENCES);
+      long cacheMemSize = HiveConf.getSizeVar(
+          conf, ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE);
+      int numThreads = HiveConf.getIntVar(conf, 
ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS);
+      boolean useSoftReference = HiveConf.getBoolVar(
+          conf, ConfVars.HIVE_ORC_CACHE_USE_SOFT_REFERENCES);
 
-      cacheStripeDetails = (cacheStripeDetailsSize > 0);
+      cacheStripeDetails = (cacheMemSize > 0);
 
       this.etlFileThreshold = minSplits <= 0 ? DEFAULT_ETL_FILE_THRESHOLD : 
minSplits;
 
@@ -621,7 +620,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
             useExternalCache = false;
           }
           if (localCache == null) {
-            localCache = new LocalCache(numThreads, cacheStripeDetailsSize, 
useSoftReference);
+            localCache = new LocalCache(numThreads, cacheMemSize, 
useSoftReference);
           }
           if (useExternalCache) {
             if (metaCache == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e32f6622/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java 
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 4fa0651..b003eb8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -463,8 +463,7 @@ public class TestInputOutputFormat {
   public void testSplitStrategySelection() throws Exception {
 
     conf.set("mapreduce.input.fileinputformat.split.maxsize", "500");
-    conf.setLong(HiveConf.ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE.varname,
-        100);
+    
conf.set(HiveConf.ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, 
"10Mb");
     final int[] counts = { 1, 10, 100, 256 };
     final int[] sizes = { 100, 1000 };
     final int[] numSplits = { 1, 9, 10, 11, 99, 111 };
@@ -543,7 +542,7 @@ public class TestInputOutputFormat {
     }
 
     k = 0;
-    conf.set("hive.orc.cache.stripe.details.size", "-1");
+    conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "0");
     for (int c : counts) {
       for (int s : sizes) {
         final FileSystem fs = generateMockFiles(c, s);
@@ -2707,7 +2706,7 @@ public class TestInputOutputFormat {
     MockFileSystem fs = new MockFileSystem(conf);
     // creates the static cache
     MockPath mockPath = new MockPath(fs, "mock:///mocktbl");
-    conf.set("hive.orc.cache.stripe.details.size", "-1");
+    conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "0");
     conf.set("mapred.input.dir", mockPath.toString());
     conf.set("fs.defaultFS", "mock:///");
     conf.set("fs.mock.impl", MockFileSystem.class.getName());
@@ -2774,7 +2773,7 @@ public class TestInputOutputFormat {
     assertEquals(1, readOpsDelta);
 
     // enable cache and use default strategy
-    conf.set("hive.orc.cache.stripe.details.size", "100");
+    conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, 
"10Mb");
     conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "HYBRID");
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {

Reply via email to