Repository: hive Updated Branches: refs/heads/master 54b4b2d42 -> f27c38ff5
HIVE-19525: Spark task logs print PLAN PATH excessive number of times (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f27c38ff Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f27c38ff Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f27c38ff Branch: refs/heads/master Commit: f27c38ff55902827499192a4f8cf8ed37d6fd967 Parents: 54b4b2d Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com> Authored: Wed Jun 6 14:12:47 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Wed Jun 6 14:12:47 2018 -0500 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/Utilities.java | 124 +++++++++---------- 1 file changed, 62 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f27c38ff/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 2177c33..80478ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -408,7 +408,19 @@ public final class Utilities { * @throws RuntimeException if the configuration files are not proper or if plan can not be loaded */ private static BaseWork getBaseWork(Configuration conf, String name) { - Path path = null; + + Path path = getPlanPath(conf, name); + LOG.debug("PLAN PATH = {}", path); + if (path == null) { // Map/reduce plan may not be generated + return null; + } + + BaseWork gWork = gWorkMap.get(conf).get(path); + if (gWork != null) { + LOG.debug("Found plan in cache for name: {}", name); + return gWork; + } + InputStream in = null; Kryo kryo = SerializationUtilities.borrowKryo(); try { @@ -424,73 +436,61 @@ public final class Utilities { kryo.setClassLoader(newLoader); } } - - path = getPlanPath(conf, name); - LOG.info("PLAN PATH = {}", path); - if (path == null) { // Map/reduce plan may not be generated - return null; + Path localPath = path; + LOG.debug("local path = {}", localPath); + final long serializedSize; + final String planMode; + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { + String planStringPath = path.toUri().getPath(); + LOG.debug("Loading plan from string: {}", planStringPath); + String planString = conf.getRaw(planStringPath); + if (planString == null) { + LOG.info("Could not find plan string in conf"); + return null; + } + serializedSize = planString.length(); + planMode = "RPC"; + byte[] planBytes = Base64.decodeBase64(planString); + in = new ByteArrayInputStream(planBytes); + in = new InflaterInputStream(in); + } else { + LOG.debug("Open file to read in plan: {}", localPath); + FileSystem fs = localPath.getFileSystem(conf); + in = fs.open(localPath); + serializedSize = fs.getFileStatus(localPath).getLen(); + planMode = "FILE"; } - BaseWork gWork = gWorkMap.get(conf).get(path); - if (gWork == null) { - Path localPath = path; - LOG.debug("local path = {}", localPath); - final long serializedSize; - final String planMode; - if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { - String planStringPath = path.toUri().getPath(); - LOG.debug("Loading plan from string: {}", planStringPath); - String planString = conf.getRaw(planStringPath); - if (planString == null) { - LOG.info("Could not find plan string in conf"); - return null; - } - serializedSize = planString.length(); - planMode = "RPC"; - byte[] planBytes = Base64.decodeBase64(planString); - in = new ByteArrayInputStream(planBytes); - in = new InflaterInputStream(in); + if(MAP_PLAN_NAME.equals(name)){ + if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ + gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class); + } else if(MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { + gWork = SerializationUtilities.deserializePlan(kryo, in, MergeFileWork.class); + } else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { + gWork = SerializationUtilities.deserializePlan(kryo, in, ColumnTruncateWork.class); } else { - LOG.debug("Open file to read in plan: {}", localPath); - FileSystem fs = localPath.getFileSystem(conf); - in = fs.open(localPath); - serializedSize = fs.getFileStatus(localPath).getLen(); - planMode = "FILE"; + throw new RuntimeException("unable to determine work from configuration ." + + MAPRED_MAPPER_CLASS + " was "+ conf.get(MAPRED_MAPPER_CLASS)); } - - if(MAP_PLAN_NAME.equals(name)){ - if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ - gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class); - } else if(MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { - gWork = SerializationUtilities.deserializePlan(kryo, in, MergeFileWork.class); - } else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { - gWork = SerializationUtilities.deserializePlan(kryo, in, ColumnTruncateWork.class); - } else { - throw new RuntimeException("unable to determine work from configuration ." - + MAPRED_MAPPER_CLASS + " was "+ conf.get(MAPRED_MAPPER_CLASS)) ; - } - } else if (REDUCE_PLAN_NAME.equals(name)) { - if(ExecReducer.class.getName().equals(conf.get(MAPRED_REDUCER_CLASS))) { - gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class); - } else { - throw new RuntimeException("unable to determine work from configuration ." - + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; - } - } else if (name.contains(MERGE_PLAN_NAME)) { - if (name.startsWith(MAPNAME)) { - gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class); - } else if (name.startsWith(REDUCENAME)) { - gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class); - } else { - throw new RuntimeException("Unknown work type: " + name); - } + } else if (REDUCE_PLAN_NAME.equals(name)) { + if(ExecReducer.class.getName().equals(conf.get(MAPRED_REDUCER_CLASS))) { + gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class); + } else { + throw new RuntimeException("unable to determine work from configuration ." + + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)); + } + } else if (name.contains(MERGE_PLAN_NAME)) { + if (name.startsWith(MAPNAME)) { + gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class); + } else if (name.startsWith(REDUCENAME)) { + gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class); + } else { + throw new RuntimeException("Unknown work type: " + name); } - LOG.info("Deserialized plan (via {}) - name: {} size: {}", planMode, - gWork.getName(), humanReadableByteCount(serializedSize)); - gWorkMap.get(conf).put(path, gWork); - } else { - LOG.debug("Found plan in cache for name: {}", name); } + LOG.info("Deserialized plan (via {}) - name: {} size: {}", planMode, + gWork.getName(), humanReadableByteCount(serializedSize)); + gWorkMap.get(conf).put(path, gWork); return gWork; } catch (FileNotFoundException fnf) { // happens. e.g.: no reduce work.