HIVE-10302: Load small tables (for map join) in executor memory only once [Spark Branch], rebased with master (reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d3629859 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d3629859 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d3629859 Branch: refs/heads/hbase-metastore Commit: d3629859a2cdc3f45a83582db1e100cd9f3a482f Parents: 441d4c0 Author: Xuefu Zhang <xzh...@cloudera.com> Authored: Mon Jun 1 14:09:08 2015 -0700 Committer: Xuefu Zhang <xzh...@cloudera.com> Committed: Mon Jun 1 14:09:08 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/MapJoinOperator.java | 6 ++++- .../hive/ql/exec/spark/HashTableLoader.java | 23 +++++++++++++++++--- .../ql/exec/spark/HivePairFlatMapFunction.java | 1 + .../hive/ql/exec/spark/SparkUtilities.java | 6 +++++ 4 files changed, 32 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d3629859/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index b1352f3..15cafdd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap; @@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.Reusable import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer; import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -521,7 +523,9 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem // in mapreduce case, we need to always clear up as mapreduce doesn't have object registry. if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) - && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())) { + && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) + && !(HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark") + && SparkUtilities.isDedicatedCluster(hconf))) { if (isLogInfoEnabled) { LOG.info("MR: Clearing all map join table containers."); } http://git-wip-us.apache.org/repos/asf/hive/blob/d3629859/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java index 043f1f7..1d674e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java @@ -113,15 +113,32 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable } String fileName = localWork.getBucketFileName(bigInputPath); Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte) pos, fileName); - LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path); - mapJoinTables[pos] = mapJoinTableSerdes[pos].load(fs, path); + mapJoinTables[pos] = load(fs, path, mapJoinTableSerdes[pos]); } } catch (Exception e) { throw new HiveException(e); } } - @SuppressWarnings("unchecked") + private MapJoinTableContainer load(FileSystem fs, Path path, + MapJoinTableContainerSerDe mapJoinTableSerde) throws HiveException { + LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path); + if (!SparkUtilities.isDedicatedCluster(hconf)) { + return mapJoinTableSerde.load(fs, path); + } + MapJoinTableContainer mapJoinTable = SmallTableCache.get(path); + if (mapJoinTable == null) { + synchronized (path.toString().intern()) { + mapJoinTable = SmallTableCache.get(path); + if (mapJoinTable == null) { + mapJoinTable = mapJoinTableSerde.load(fs, path); + SmallTableCache.cache(path, mapJoinTable); + } + } + } + return mapJoinTable; + } + private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFileName) throws Exception { MapredLocalWork localWork = context.getLocalWork(); http://git-wip-us.apache.org/repos/asf/hive/blob/d3629859/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java index 2f137f9..7df626b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java @@ -48,6 +48,7 @@ public abstract class HivePairFlatMapFunction<T, K, V> implements PairFlatMapFun protected void initJobConf() { if (jobConf == null) { jobConf = KryoSerializer.deserializeJobConf(this.buffer); + SmallTableCache.initialize(jobConf); setupMRLegacyConfigs(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/d3629859/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index ef612bd..ca0ffb6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.UUID; import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -108,6 +109,11 @@ public class SparkUtilities { return name; } + public static boolean isDedicatedCluster(Configuration conf) { + String master = conf.get("spark.master"); + return master.startsWith("yarn-") || master.startsWith("local"); + } + public static SparkSession getSparkSession(HiveConf conf, SparkSessionManager sparkSessionManager) throws HiveException { SparkSession sparkSession = SessionState.get().getSparkSession();