HIVE-16133 : Footer cache in Tez AM can take too much memory (Sergey Shelukhin, reviewed by Gunther Hagleitner and Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e32f6622 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e32f6622 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e32f6622 Branch: refs/heads/hive-14535 Commit: e32f6622fc4506c553eeb0eeb80e792ad6161317 Parents: 76b65ba Author: Sergey Shelukhin <[email protected]> Authored: Sat Mar 11 16:09:22 2017 -0800 Committer: Sergey Shelukhin <[email protected]> Committed: Sat Mar 11 16:09:22 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 +- .../hadoop/hive/ql/io/orc/LocalCache.java | 56 +++++++++++++------- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 15 +++--- .../hive/ql/io/orc/TestInputOutputFormat.java | 9 ++-- 4 files changed, 50 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e32f6622/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f68bd35..1fb3253 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1357,8 +1357,8 @@ public class HiveConf extends Configuration { "Include file ID in splits on file systems that support it."), HIVE_ORC_ALLOW_SYNTHETIC_FILE_ID_IN_SPLITS("hive.orc.splits.allow.synthetic.fileid", true, "Allow synthetic file ID in splits on file systems that don't have a native one."), - HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000, - "Max cache size for keeping meta info about orc splits cached in the client."), + HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE("hive.orc.cache.stripe.details.mem.size", "256Mb", + new SizeValidator(), "Maximum size of orc splits cached in the client."), HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10, "How many threads orc should use to create splits in parallel."), HIVE_ORC_CACHE_USE_SOFT_REFERENCES("hive.orc.cache.use.soft.references", false, http://git-wip-us.apache.org/repos/asf/hive/blob/e32f6622/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java index 88b65dc..b375aea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java @@ -32,17 +32,40 @@ import org.slf4j.LoggerFactory; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.Weigher; class LocalCache implements OrcInputFormat.FooterCache { private static final Logger LOG = LoggerFactory.getLogger(LocalCache.class); private static final int DEFAULT_CACHE_INITIAL_CAPACITY = 1024; - private final Cache<Path, OrcTail> cache; - LocalCache(int numThreads, int cacheStripeDetailsSize, boolean useSoftRef) { - CacheBuilder builder = CacheBuilder.newBuilder() + private static final class TailAndFileData { + public TailAndFileData(long fileLength, long fileModificationTime, ByteBuffer bb) { + this.fileLength = fileLength; + this.fileModTime = fileModificationTime; + this.bb = bb; + } + public ByteBuffer bb; + public long fileLength, fileModTime; + + public int getMemoryUsage() { + return bb.remaining() + 100; // 100 is for 2 longs, BB and java overheads (semi-arbitrary). + } + } + + private final Cache<Path, TailAndFileData> cache; + + LocalCache(int numThreads, long cacheMemSize, boolean useSoftRef) { + CacheBuilder<Path, TailAndFileData> builder = CacheBuilder.newBuilder() .initialCapacity(DEFAULT_CACHE_INITIAL_CAPACITY) .concurrencyLevel(numThreads) - .maximumSize(cacheStripeDetailsSize); + .maximumWeight(cacheMemSize) + .weigher(new Weigher<Path, TailAndFileData>() { + @Override + public int weigh(Path key, TailAndFileData value) { + return value.getMemoryUsage(); + } + }); + if (useSoftRef) { builder = builder.softValues(); } @@ -55,11 +78,8 @@ class LocalCache implements OrcInputFormat.FooterCache { } public void put(Path path, OrcTail tail) { - cache.put(path, tail); - } - - public OrcTail get(Path path) { - return cache.getIfPresent(path); + cache.put(path, new TailAndFileData(tail.getFileTail().getFileLength(), + tail.getFileModificationTime(), tail.getSerializedTail().duplicate())); } @Override @@ -74,23 +94,21 @@ class LocalCache implements OrcInputFormat.FooterCache { ++i; FileStatus file = fileWithId.getFileStatus(); Path path = file.getPath(); - OrcTail tail = cache.getIfPresent(path); + TailAndFileData tfd = cache.getIfPresent(path); if (LOG.isDebugEnabled()) { - LOG.debug("Serialized tail " + (tail == null ? "not " : "") + "cached for path: " + path); + LOG.debug("Serialized tail " + (tfd == null ? "not " : "") + "cached for path: " + path); } - if (tail == null) continue; - if (tail != null && file.getLen() == tail.getFileTail().getFileLength() - && file.getModificationTime() == tail.getFileModificationTime()) { - result[i] = tail; + if (tfd == null) continue; + if (file.getLen() == tfd.fileLength && file.getModificationTime() == tfd.fileModTime) { + result[i] = ReaderImpl.extractFileTail(tfd.bb.duplicate(), tfd.fileLength, tfd.fileModTime); continue; } // Invalidate cache.invalidate(path); if (LOG.isDebugEnabled()) { LOG.debug("Meta-Info for : " + path + " changed. CachedModificationTime: " - + tail.getFileModificationTime() + ", CurrentModificationTime: " - + file.getModificationTime() + ", CachedLength: " + tail.getFileTail().getFileLength() - + ", CurrentLength: " + file.getLen()); + + tfd.fileModTime + ", CurrentModificationTime: " + file.getModificationTime() + + ", CachedLength: " + tfd.fileLength + ", CurrentLength: " + file.getLen()); } } } @@ -110,4 +128,4 @@ class LocalCache implements OrcInputFormat.FooterCache { throws IOException { put(cacheKey.getPath(), orcTail); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/e32f6622/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 369584b..59682db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -587,14 +587,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, Math.max(conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0), 0); splitStrategyBatchMs = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS); LOG.debug("Number of buckets specified by conf file is " + numBuckets); - int cacheStripeDetailsSize = HiveConf.getIntVar(conf, - ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE); - int numThreads = HiveConf.getIntVar(conf, - ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS); - boolean useSoftReference = HiveConf.getBoolVar(conf, - ConfVars.HIVE_ORC_CACHE_USE_SOFT_REFERENCES); + long cacheMemSize = HiveConf.getSizeVar( + conf, ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE); + int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS); + boolean useSoftReference = HiveConf.getBoolVar( + conf, ConfVars.HIVE_ORC_CACHE_USE_SOFT_REFERENCES); - cacheStripeDetails = (cacheStripeDetailsSize > 0); + cacheStripeDetails = (cacheMemSize > 0); this.etlFileThreshold = minSplits <= 0 ? DEFAULT_ETL_FILE_THRESHOLD : minSplits; @@ -621,7 +620,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, useExternalCache = false; } if (localCache == null) { - localCache = new LocalCache(numThreads, cacheStripeDetailsSize, useSoftReference); + localCache = new LocalCache(numThreads, cacheMemSize, useSoftReference); } if (useExternalCache) { if (metaCache == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/e32f6622/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 4fa0651..b003eb8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -463,8 +463,7 @@ public class TestInputOutputFormat { public void testSplitStrategySelection() throws Exception { conf.set("mapreduce.input.fileinputformat.split.maxsize", "500"); - conf.setLong(HiveConf.ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE.varname, - 100); + conf.set(HiveConf.ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "10Mb"); final int[] counts = { 1, 10, 100, 256 }; final int[] sizes = { 100, 1000 }; final int[] numSplits = { 1, 9, 10, 11, 99, 111 }; @@ -543,7 +542,7 @@ public class TestInputOutputFormat { } k = 0; - conf.set("hive.orc.cache.stripe.details.size", "-1"); + conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "0"); for (int c : counts) { for (int s : sizes) { final FileSystem fs = generateMockFiles(c, s); @@ -2707,7 +2706,7 @@ public class TestInputOutputFormat { MockFileSystem fs = new MockFileSystem(conf); // creates the static cache MockPath mockPath = new MockPath(fs, "mock:///mocktbl"); - conf.set("hive.orc.cache.stripe.details.size", "-1"); + conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "0"); conf.set("mapred.input.dir", mockPath.toString()); conf.set("fs.defaultFS", "mock:///"); conf.set("fs.mock.impl", MockFileSystem.class.getName()); @@ -2774,7 +2773,7 @@ public class TestInputOutputFormat { assertEquals(1, readOpsDelta); // enable cache and use default strategy - conf.set("hive.orc.cache.stripe.details.size", "100"); + conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "10Mb"); conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "HYBRID"); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) {
