Repository: hive Updated Branches: refs/heads/branch-2.1 b8903b36b -> bdf4ef890 refs/heads/master 4b3507652 -> d97e4e2c9
HIVE-14433 : refactor LLAP plan cache avoidance and fix issue in merge processor (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d97e4e2c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d97e4e2c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d97e4e2c Branch: refs/heads/master Commit: d97e4e2c9bdd292f433173e9cce7445e9916e64d Parents: 4b35076 Author: Sergey Shelukhin <[email protected]> Authored: Fri Aug 12 13:33:39 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Fri Aug 12 13:33:46 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java | 2 +- .../org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java | 5 +++-- .../apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java | 5 +---- .../hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java | 5 ++--- .../hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java | 8 +------- 5 files changed, 8 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 99cdaa0..416606e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -145,7 +145,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem // the task in. On MR: The cache is a no-op. String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID); cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container"; - cache = ObjectCacheFactory.getCache(hconf, queryId); + cache = ObjectCacheFactory.getCache(hconf, queryId, false); loader = getHashTableLoader(hconf); hashMapRowGetters = null; http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java index 5201120..5a19030 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java @@ -44,10 +44,11 @@ public class ObjectCacheFactory { /** * Returns the appropriate cache */ - public static ObjectCache getCache(Configuration conf, String queryId) { + public static ObjectCache getCache(Configuration conf, String queryId, boolean isPlanCache) { if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { if (LlapProxy.isDaemon()) { // daemon - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED) + && !isPlanCache) { // LLAP object cache, unlike others, does not use globals. Thus, get the existing one. return getLlapObjectCache(queryId); } else { // no cache http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 0886c0e..6f36dfb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -96,11 +95,9 @@ public class MapRecordProcessor extends RecordProcessor { super(jconf, context); String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); if (LlapProxy.isDaemon()) { - cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); // do not cache plan setLlapOfFragmentId(context); - } else { - cache = ObjectCacheFactory.getCache(jconf, queryId); } + cache = ObjectCacheFactory.getCache(jconf, queryId, true); execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); cacheKeys = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java index ec97856..b7f1011 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java @@ -63,7 +63,7 @@ public class MergeFileRecordProcessor extends RecordProcessor { private MergeFileWork mfWork; MRInputLegacy mrInput = null; private final Object[] row = new Object[2]; - ObjectCache cache; + org.apache.hadoop.hive.ql.exec.ObjectCache cache; public MergeFileRecordProcessor(final JobConf jconf, final ProcessorContext context) { super(jconf, context); @@ -95,8 +95,7 @@ public class MergeFileRecordProcessor extends RecordProcessor { } String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); - org.apache.hadoop.hive.ql.exec.ObjectCache cache = ObjectCacheFactory - .getCache(jconf, queryId); + cache = ObjectCacheFactory.getCache(jconf, queryId, true); try { execContext.setJc(jconf); http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 1390a00..cf3c8ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -89,14 +89,8 @@ public class ReduceRecordProcessor extends RecordProcessor{ public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); - ObjectCache cache; - String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); - if (LlapProxy.isDaemon()) { // don't cache plan - cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); - } else { - cache = ObjectCacheFactory.getCache(jconf, queryId); - } + cache = ObjectCacheFactory.getCache(jconf, queryId, true); String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY; cacheKeys = Lists.newArrayList(cacheKey);
