HIVE-10403 - Add n-way join support for Hybrid Grace Hash Join (Wei Zheng via Vikram Dixit)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c72d073c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c72d073c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c72d073c Branch: refs/heads/llap Commit: c72d073c1fe2a07c80120647bb2170fb7e50d168 Parents: 6db33a9 Author: vikram <[email protected]> Authored: Fri May 1 10:54:21 2015 -0700 Committer: vikram <[email protected]> Committed: Fri May 1 11:07:06 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../test/resources/testconfiguration.properties | 3 +- .../hadoop/hive/ql/exec/HashTableLoader.java | 4 +- .../hadoop/hive/ql/exec/MapJoinOperator.java | 169 +- .../hadoop/hive/ql/exec/mr/HashTableLoader.java | 2 +- .../persistence/BytesBytesMultiHashMap.java | 1 + .../exec/persistence/HybridHashTableConf.java | 86 + .../persistence/HybridHashTableContainer.java | 213 ++- .../ql/exec/persistence/KeyValueContainer.java | 31 +- .../ql/exec/persistence/ObjectContainer.java | 31 +- .../hive/ql/exec/spark/HashTableLoader.java | 2 +- .../hive/ql/exec/tez/HashTableLoader.java | 60 +- .../ql/exec/vector/VectorMapJoinOperator.java | 4 +- .../mapjoin/VectorMapJoinCommonOperator.java | 4 - .../VectorMapJoinGenerateResultOperator.java | 25 +- .../fast/VectorMapJoinFastBytesHashMap.java | 4 +- .../VectorMapJoinFastBytesHashMultiSet.java | 4 +- .../fast/VectorMapJoinFastBytesHashSet.java | 4 +- .../fast/VectorMapJoinFastBytesHashTable.java | 4 +- .../mapjoin/fast/VectorMapJoinFastHashMap.java | 4 +- .../fast/VectorMapJoinFastHashMultiSet.java | 4 +- .../mapjoin/fast/VectorMapJoinFastHashSet.java | 4 +- .../fast/VectorMapJoinFastHashTable.java | 4 +- .../fast/VectorMapJoinFastHashTableLoader.java | 4 +- .../fast/VectorMapJoinFastLongHashMap.java | 4 +- .../fast/VectorMapJoinFastLongHashMultiSet.java | 4 +- .../fast/VectorMapJoinFastLongHashSet.java | 4 +- .../fast/VectorMapJoinFastLongHashTable.java | 4 +- .../fast/VectorMapJoinFastMultiKeyHashMap.java | 6 +- .../VectorMapJoinFastMultiKeyHashMultiSet.java | 4 +- .../fast/VectorMapJoinFastMultiKeyHashSet.java | 4 +- .../fast/VectorMapJoinFastStringHashMap.java | 4 +- .../VectorMapJoinFastStringHashMultiSet.java | 4 +- .../fast/VectorMapJoinFastStringHashSet.java | 4 +- .../fast/VectorMapJoinFastTableContainer.java | 23 +- .../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 2 +- .../fast/TestVectorMapJoinFastLongHashMap.java | 14 +- .../TestVectorMapJoinFastMultiKeyHashMap.java | 14 +- .../clientpositive/auto_sortmerge_join_13.q | 2 + .../clientpositive/hybridgrace_hashjoin_1.q | 258 +++ .../clientpositive/hybridgrace_hashjoin_2.q | 152 ++ .../queries/clientpositive/hybridhashjoin.q | 250 --- .../test/queries/clientpositive/tez_join_hash.q | 2 + .../test/queries/clientpositive/tez_smb_main.q | 6 + .../tez/hybridgrace_hashjoin_1.q.out | 1587 ++++++++++++++++++ .../tez/hybridgrace_hashjoin_2.q.out | 1417 ++++++++++++++++ .../clientpositive/tez/hybridhashjoin.q.out | 1566 ----------------- 47 files changed, 3924 insertions(+), 2086 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 72e4ff2..95e3d04 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -764,6 +764,10 @@ public class HiveConf extends Configuration { HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ("hive.mapjoin.hybridgrace.memcheckfrequency", 1024, "For " + "hybrid grace hash join, how often (how many rows apart) we check if memory is full. " + "This number should be power of 2."), + HIVEHYBRIDGRACEHASHJOINMINWBSIZE("hive.mapjoin.hybridgrace.minwbsize", 524288, "For hybrid grace" + + " hash join, the minimum write buffer size used by optimized hashtable. Default is 512 KB."), + HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS("hive.mapjoin.hybridgrace.minnumpartitions", 16, "For" + + " hybrid grace hash join, the minimum number of partitions to create."), HIVEHASHTABLEWBSIZE("hive.mapjoin.optimized.hashtable.wbsize", 10 * 1024 * 1024, "Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to\n" + "store data. This is one buffer size. HT may be slightly faster if this is larger, but for small\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index b7abf0d..b2a6e58 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -301,7 +301,8 @@ minitez.query.files=bucket_map_join_tez1.q,\ dynamic_partition_pruning_2.q,\ explainuser_1.q,\ explainuser_2.q,\ - hybridhashjoin.q,\ + hybridgrace_hashjoin_1.q,\ + hybridgrace_hashjoin_2.q,\ mapjoin_decimal.q,\ lvj_mapjoin.q, \ mrr.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java index c3e3078..cbf2d43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java @@ -32,6 +32,6 @@ public interface HashTableLoader { void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf, MapJoinOperator joinOp); - void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes, - long memUsage) throws HiveException; + void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes) + throws HiveException; } http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index f66ab90..f2b800a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap; import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; @@ -89,9 +88,10 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem private UnwrapRowContainer[] unwrapContainer; private transient Configuration hconf; private transient boolean hybridMapJoinLeftover; // whether there's spilled data to be processed - protected transient MapJoinBytesTableContainer currentSmallTable; // reloaded hashmap from disk - protected transient int tag; // big table alias - protected transient int smallTable; // small table alias + protected transient MapJoinBytesTableContainer[] spilledMapJoinTables; // used to hold restored + // spilled small tables + protected HybridHashTableContainer firstSmallTable; // The first small table; + // Only this table has spilled big table rows public MapJoinOperator() { } @@ -272,7 +272,6 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem protected Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]> loadHashTable( ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { - loadCalled = true; if (this.hashTblInitedOnce @@ -285,9 +284,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); loader.init(mapContext, mrContext, hconf, this); - long memUsage = (long)(MapJoinMemoryExhaustionHandler.getMaxHeapSize() - * conf.getHashTableMemoryUsage()); - loader.load(mapJoinTables, mapJoinTableSerdes, memUsage); + loader.load(mapJoinTables, mapJoinTableSerdes); hashTblInitedOnce = true; @@ -325,18 +322,6 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem @Override public void process(Object row, int tag) throws HiveException { - this.tag = tag; - - // As we're calling processOp again to process the leftover triplets, we know the "row" is - // coming from the on-disk matchfile. We need to recreate hashMapRowGetter against new hashtable - if (hybridMapJoinLeftover) { - assert hashMapRowGetters != null; - if (hashMapRowGetters[smallTable] == null) { - MapJoinKey refKey = getRefKey((byte) tag); - hashMapRowGetters[smallTable] = currentSmallTable.createGetter(refKey); - } - } - try { alias = (byte) tag; if (hashMapRowGetters == null) { @@ -349,13 +334,24 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem } } + // As we're calling processOp again to process the leftover "tuples", we know the "row" is + // coming from the spilled matchfile. We need to recreate hashMapRowGetter against new hashtables + if (hybridMapJoinLeftover) { + MapJoinKey refKey = getRefKey(alias); + for (byte pos = 0; pos < order.length; pos++) { + if (pos != alias && spilledMapJoinTables[pos] != null) { + hashMapRowGetters[pos] = spilledMapJoinTables[pos].createGetter(refKey); + } + } + } + // compute keys and values as StandardObjects ReusableGetAdaptor firstSetKey = null; int fieldCount = joinKeys[alias].size(); boolean joinNeeded = false; + boolean bigTableRowSpilled = false; for (byte pos = 0; pos < order.length; pos++) { if (pos != alias) { - smallTable = pos; // record small table alias JoinUtil.JoinResult joinResult; ReusableGetAdaptor adaptor; if (firstSetKey == null) { @@ -389,9 +385,14 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem storage[pos] = rowContainer.copy(); aliasFilterTags[pos] = rowContainer.getAliasFilter(); } - // Spill the big table rows into appropriate partition - if (joinResult == JoinUtil.JoinResult.SPILL) { + // Spill the big table rows into appropriate partition: + // When the JoinResult is SPILL, it means the corresponding small table row may have been + // spilled to disk (at least the partition that holds this row is on disk). So we need to + // postpone the join processing for this pair by also spilling this big table row. + if (joinResult == JoinUtil.JoinResult.SPILL && + !bigTableRowSpilled) { // For n-way join, only spill big table rows once spillBigTableRow(mapJoinTables[pos], row); + bigTableRowSpilled = true; } } } @@ -431,7 +432,6 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem @Override public void closeOp(boolean abort) throws HiveException { - boolean spilled = false; for (MapJoinTableContainer container: mapJoinTables) { if (container != null) { @@ -440,10 +440,30 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem } } + // For Hybrid Grace Hash Join, we need to see if there is any spilled data to be processed next if (spilled) { - for (MapJoinTableContainer tableContainer : mapJoinTables) { - if (tableContainer != null) { - if (tableContainer instanceof HybridHashTableContainer) { + if (hashMapRowGetters == null) { + hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length]; + } + int numPartitions = 0; + // Find out number of partitions for each small table (should be same across tables) + for (byte pos = 0; pos < mapJoinTables.length; pos++) { + if (pos != conf.getPosBigTable()) { + firstSmallTable = (HybridHashTableContainer)mapJoinTables[pos]; + numPartitions = firstSmallTable.getHashPartitions().length; + break; + } + } + assert numPartitions != 0 : "Number of partitions must be greater than 0!"; + + if (firstSmallTable.hasSpill()) { + spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length]; + hybridMapJoinLeftover = true; + + // Clear all in-memory partitions first + for (byte pos = 0; pos < mapJoinTables.length; pos++) { + MapJoinTableContainer tableContainer = mapJoinTables[pos]; + if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) { HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; hybridHtContainer.dumpStats(); @@ -453,29 +473,30 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem if (!hashPartitions[i].isHashMapOnDisk()) { hybridHtContainer.setTotalInMemRowCount( hybridHtContainer.getTotalInMemRowCount() - - hashPartitions[i].getHashMapFromMemory().getNumValues()); + hashPartitions[i].getHashMapFromMemory().getNumValues()); hashPartitions[i].getHashMapFromMemory().clear(); } } assert hybridHtContainer.getTotalInMemRowCount() == 0; + } + } - for (int i = 0; i < hashPartitions.length; i++) { - if (hashPartitions[i].isHashMapOnDisk()) { - // Recursively process on-disk triplets (hash partition, sidefile, matchfile) - try { - hybridMapJoinLeftover = true; - hashMapRowGetters[smallTable] = null; - continueProcess(hashPartitions[i], hybridHtContainer); - } catch (IOException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } catch (SerDeException e) { - e.printStackTrace(); - } - } - hybridMapJoinLeftover = false; - currentSmallTable = null; + // Reprocess the spilled data + for (int i = 0; i < numPartitions; i++) { + HashPartition[] hashPartitions = firstSmallTable.getHashPartitions(); + if (hashPartitions[i].isHashMapOnDisk()) { + try { + continueProcess(i); // Re-process spilled data + } catch (IOException e) { + e.printStackTrace(); + } catch (SerDeException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + for (byte pos = 0; pos < order.length; pos++) { + if (pos != conf.getPosBigTable()) + spilledMapJoinTables[pos] = null; } } } @@ -497,18 +518,20 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem } /** - * Continue processing each pair of spilled hashtable and big table row container - * @param partition hash partition to process - * @param hybridHtContainer Hybrid hashtable container + * Continue processing join between spilled hashtable(s) and spilled big table + * @param partitionId the partition number across all small tables to process * @throws HiveException * @throws IOException - * @throws ClassNotFoundException * @throws SerDeException */ - private void continueProcess(HashPartition partition, HybridHashTableContainer hybridHtContainer) - throws HiveException, IOException, ClassNotFoundException, SerDeException { - reloadHashTable(partition, hybridHtContainer); - reProcessBigTable(partition); + private void continueProcess(int partitionId) + throws HiveException, IOException, SerDeException, ClassNotFoundException { + for (byte pos = 0; pos < mapJoinTables.length; pos++) { + if (pos != conf.getPosBigTable()) { + reloadHashTable(pos, partitionId); + } + } + reProcessBigTable(partitionId); } /** @@ -516,16 +539,16 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem * It can have two steps: * 1) Deserialize a serialized hash table, and * 2) Merge every key/value pair from small table container into the hash table - * @param partition hash partition to process - * @param hybridHtContainer Hybrid hashtable container + * @param pos position of small table + * @param partitionId the partition of the small table to be reloaded from * @throws IOException - * @throws ClassNotFoundException * @throws HiveException * @throws SerDeException */ - protected void reloadHashTable(HashPartition partition, - HybridHashTableContainer hybridHtContainer) - throws IOException, ClassNotFoundException, HiveException, SerDeException { + protected void reloadHashTable(byte pos, int partitionId) + throws IOException, HiveException, SerDeException, ClassNotFoundException { + HybridHashTableContainer container = (HybridHashTableContainer)mapJoinTables[pos]; + HashPartition partition = container.getHashPartitions()[partitionId]; // Merge the sidefile into the newly created hash table // This is where the spilling may happen again @@ -544,11 +567,12 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem // If based on the new key count, keyCount is smaller than a threshold, // then just load the entire restored hashmap into memory. // The size of deserialized partition shouldn't exceed half of memory limit - if (rowCount * hybridHtContainer.getTableRowSize() >= hybridHtContainer.getMemoryThreshold() / 2) { - LOG.info("Hybrid Grace Hash Join: Hash table reload can fail since it will be greater than memory limit. Recursive spilling is currently not supported"); + if (rowCount * container.getTableRowSize() >= container.getMemoryThreshold() / 2) { + LOG.warn("Hybrid Grace Hash Join: Hash table cannot be reloaded since it" + + " will be greater than memory limit. Recursive spilling is currently not supported"); } - KeyValueHelper writeHelper = hybridHtContainer.getWriteHelper(); + KeyValueHelper writeHelper = container.getWriteHelper(); while (kvContainer.hasNext()) { ObjectPair<HiveKey, BytesWritable> pair = kvContainer.next(); Writable key = pair.getFirst(); @@ -557,27 +581,30 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem restoredHashMap.put(writeHelper, -1); } - hybridHtContainer.setTotalInMemRowCount(hybridHtContainer.getTotalInMemRowCount() + container.setTotalInMemRowCount(container.getTotalInMemRowCount() + restoredHashMap.getNumValues() + kvContainer.size()); kvContainer.clear(); - // Since there's only one hashmap to deal with, it's OK to create a MapJoinBytesTableContainer - currentSmallTable = new MapJoinBytesTableContainer(restoredHashMap); - currentSmallTable.setInternalValueOi(hybridHtContainer.getInternalValueOi()); - currentSmallTable.setSortableSortOrders(hybridHtContainer.getSortableSortOrders()); + spilledMapJoinTables[pos] = new MapJoinBytesTableContainer(restoredHashMap); + spilledMapJoinTables[pos].setInternalValueOi(container.getInternalValueOi()); + spilledMapJoinTables[pos].setSortableSortOrders(container.getSortableSortOrders()); } /** * Iterate over the big table row container and feed process() with leftover rows - * @param partition the hash partition being brought back to memory at the moment + * @param partitionId the partition from which to take out spilled big table rows * @throws HiveException - * @throws IOException */ - protected void reProcessBigTable(HashPartition partition) throws HiveException, IOException { + protected void reProcessBigTable(int partitionId) throws HiveException { + // For binary join, firstSmallTable is the only small table; it has reference to spilled big + // table rows; + // For n-way join, since we only spill once, when processing the first small table, so only the + // firstSmallTable has reference to the spilled big table rows. + HashPartition partition = firstSmallTable.getHashPartitions()[partitionId]; ObjectContainer bigTable = partition.getMatchfileObjContainer(); while (bigTable.hasNext()) { Object row = bigTable.next(); - process(row, tag); + process(row, conf.getPosBigTable()); } bigTable.clear(); } http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java index 96a6728..abf38e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java @@ -72,7 +72,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable @Override public void load( MapJoinTableContainer[] mapJoinTables, - MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException { + MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { String currentInputPath = context.getCurrentInputPath().toString(); LOG.info("******* Load from HashTable for input file: " + currentInputPath); http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index dd5c621..2ba622e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -566,6 +566,7 @@ public final class BytesBytesMultiHashMap { this.writeBuffers.clear(); this.refs = new long[1]; this.keysAssigned = 0; + this.numValues = 0; } public void expandAndRehashToTarget(int estimateNewRowCount) { http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableConf.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableConf.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableConf.java new file mode 100644 index 0000000..625038f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableConf.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.persistence; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * This conf class is a wrapper of a list of HybridHashTableContainers and some common info shared + * among them, which is used in n-way join (multiple small tables are involved). + */ +public class HybridHashTableConf { + private List<HybridHashTableContainer> loadedContainerList; // A list of alrady loaded containers + private int numberOfPartitions = 0; // Number of partitions each table should have + private int nextSpillPartition = -1; // The partition to be spilled next + + public HybridHashTableConf() { + loadedContainerList = new ArrayList<HybridHashTableContainer>(); + } + + public int getNumberOfPartitions() { + return numberOfPartitions; + } + + public void setNumberOfPartitions(int numberOfPartitions) { + this.numberOfPartitions = numberOfPartitions; + this.nextSpillPartition = numberOfPartitions - 1; + } + + public int getNextSpillPartition() { + return this.nextSpillPartition; + } + + public void setNextSpillPartition(int nextSpillPartition) { + this.nextSpillPartition = nextSpillPartition; + } + + + public List<HybridHashTableContainer> getLoadedContainerList() { + return loadedContainerList; + } + + /** + * Spill one in-memory partition from tail for all previously loaded HybridHashTableContainers. + * Also mark that partition number as spill-on-creation for future created containers. + * @return amount of memory freed; 0 if only one last partition is in memory for each container + */ + public long spill() throws IOException { + if (nextSpillPartition == 0) { + return 0; + } + long memFreed = 0; + for (HybridHashTableContainer container : loadedContainerList) { + memFreed += container.spillPartition(nextSpillPartition); + container.setSpill(true); + } + nextSpillPartition--; + return memFreed; + } + + /** + * Check if a partition should be spilled directly on creation + * @param partitionId the partition to create + * @return true if it should be spilled directly, false otherwise + */ + public boolean doSpillOnCreation(int partitionId) { + return nextSpillPartition != -1 && partitionId > nextSpillPartition; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index cb9083d..3f6d61e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -72,15 +72,18 @@ public class HybridHashTableContainer private static final Log LOG = LogFactory.getLog(HybridHashTableContainer.class); private final HashPartition[] hashPartitions; // an array of partitions holding the triplets - private int totalInMemRowCount = 0; // total number of small table rows in memory - private final long memoryThreshold; // the max memory limit allocated + private int totalInMemRowCount = 0; // total number of small table rows in memory + private long memoryThreshold; // the max memory limit that can be allocated + private long memoryUsed; // the actual memory used + private int writeBufferSize; // write buffer size for this HybridHashTableContainer private final long tableRowSize; // row size of the small table - private boolean isSpilled; // whether there's any spilled partition - private int toSpillPartitionId; // the partition into which to spill the big table row; - // This may change after every setMapJoinKey call - private int numPartitionsSpilled; // number of spilled partitions - private boolean lastPartitionInMem; // only one (last one) partition is left in memory + private boolean isSpilled; // whether there's any spilled partition + private int toSpillPartitionId; // the partition into which to spill the big table row; + // This may change after every setMapJoinKey call + private int numPartitionsSpilled; // number of spilled partitions + private boolean lastPartitionInMem; // only one (last one) partition is left in memory private final int memoryCheckFrequency; // how often (# of rows apart) to check if memory is full + private HybridHashTableConf nwayConf; // configuration for n-way join /** The OI used to deserialize values. We never deserialize keys. */ private LazyBinaryStructObjectInspector internalValueOi; @@ -182,53 +185,93 @@ public class HybridHashTableContainer } } - public HybridHashTableContainer(Configuration hconf, long keyCount, long memUsage, long tableSize) - throws SerDeException { + public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable, + long estimatedTableSize, HybridHashTableConf nwayConf) + throws SerDeException, IOException { this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), - HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ), - tableSize, keyCount, memUsage); + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), + estimatedTableSize, keyCount, memoryAvailable, nwayConf); } - private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, int wbSize, - long noConditionalTaskThreshold, int memCheckFreq, long tableSize, - long keyCount, long memUsage) throws SerDeException { - - if (wbSize > noConditionalTaskThreshold) { - LOG.warn("adjusting hash table write buffer size to be smaller than noconditionaltasksize"); - wbSize = (int) noConditionalTaskThreshold; - } - + private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, + int memCheckFreq, int minWbSize, int minNumParts, + long estimatedTableSize, long keyCount, + long memoryAvailable, HybridHashTableConf nwayConf) + throws SerDeException, IOException { directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter(); int newKeyCount = HashMapWrapper.calculateTableSize( keyCountAdj, threshold, loadFactor, keyCount); - memoryThreshold = noConditionalTaskThreshold; - tableRowSize = tableSize / newKeyCount; + memoryThreshold = memoryAvailable; + tableRowSize = estimatedTableSize / keyCount; memoryCheckFrequency = memCheckFreq; - int numPartitions = calcNumPartitions(tableSize, wbSize); // estimate # of partitions to create + this.nwayConf = nwayConf; + int numPartitions; + if (nwayConf == null) { // binary join + numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize, + nwayConf); + writeBufferSize = (int)(estimatedTableSize / numPartitions); + } else { // n-way join + // It has been calculated in HashTableLoader earlier, so just need to retrieve that number + numPartitions = nwayConf.getNumberOfPartitions(); + if (nwayConf.getLoadedContainerList().size() == 0) { // n-way: first small table + writeBufferSize = (int)(estimatedTableSize / numPartitions); + } else { // n-way: all later small tables + while (memoryThreshold < numPartitions * minWbSize) { + // Spill previously loaded tables to make more room + long memFreed = nwayConf.spill(); + if (memFreed == 0) { + LOG.warn("Available memory is not enough to create HybridHashTableContainers" + + " consistently!"); + break; + } else { + LOG.info("Total available memory was: " + memoryThreshold); + memoryThreshold += memFreed; + LOG.info("Total available memory is: " + memoryThreshold); + } + } + writeBufferSize = (int)(memoryThreshold / numPartitions); + } + } + writeBufferSize = writeBufferSize < minWbSize ? minWbSize : writeBufferSize; + LOG.info("Write buffer size: " + writeBufferSize); hashPartitions = new HashPartition[numPartitions]; int numPartitionsSpilledOnCreation = 0; - long memoryAllocated = 0; + memoryUsed = 0; int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions); for (int i = 0; i < numPartitions; i++) { - if (i == 0) { // We unconditionally create a hashmap for the first hash partition - hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, wbSize, memUsage, true); - } else { - hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, wbSize, memUsage, - memoryAllocated + wbSize < memoryThreshold); + if (this.nwayConf == null || // binary join + nwayConf.getLoadedContainerList().size() == 0) { // n-way join, first (biggest) small table + if (i == 0) { // We unconditionally create a hashmap for the first hash partition + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold, true); + } else { + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold, + memoryUsed + writeBufferSize < memoryThreshold); + } + } else { // n-way join + // For all later small tables, follow the same pattern of the previously loaded tables. + if (this.nwayConf.doSpillOnCreation(i)) { + hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, false); + } else { + hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, true); + } } + if (isHashMapSpilledOnCreation(i)) { numPartitionsSpilledOnCreation++; numPartitionsSpilled++; this.setSpill(true); + if (this.nwayConf != null && this.nwayConf.getNextSpillPartition() == numPartitions - 1) { + this.nwayConf.setNextSpillPartition(i - 1); + } } else { - memoryAllocated += hashPartitions[i].hashMap.memorySize(); + memoryUsed += hashPartitions[i].hashMap.memorySize(); } } assert numPartitionsSpilledOnCreation != numPartitions : "All partitions are directly spilled!" + @@ -236,6 +279,11 @@ public class HybridHashTableContainer LOG.info("Number of partitions created: " + numPartitions); LOG.info("Number of partitions spilled directly to disk on creation: " + numPartitionsSpilledOnCreation); + + // Append this container to the loaded list + if (this.nwayConf != null) { + this.nwayConf.getLoadedContainerList().add(this); + } } @@ -251,6 +299,20 @@ public class HybridHashTableContainer return memoryThreshold; } + /** + * Get the current memory usage by recalculating it. + * @return current memory usage + */ + public long refreshMemoryUsed() { + long memUsed = 0; + for (HashPartition hp : hashPartitions) { + if (hp.hashMap != null) { + memUsed += hp.hashMap.memorySize(); + } + } + return memoryUsed = memUsed; + } + public LazyBinaryStructObjectInspector getInternalValueOi() { return internalValueOi; } @@ -313,10 +375,16 @@ public class HybridHashTableContainer LOG.warn("This LAST partition in memory won't be spilled!"); lastPartitionInMem = true; } else { - int biggest = biggestPartition(); + if (nwayConf == null) { // binary join + int biggest = biggestPartition(); + spillPartition(biggest); + this.setSpill(true); + } else { // n-way join + LOG.info("N-way spilling: spill tail partition from previously loaded small tables"); + memoryThreshold += nwayConf.spill(); + LOG.info("Memory threshold has been increased to: " + memoryThreshold); + } numPartitionsSpilled++; - spillPartition(biggest); - this.setSpill(true); } } } @@ -349,13 +417,7 @@ public class HybridHashTableContainer * @return true if memory is full, false if not */ private boolean isMemoryFull() { - long size = 0; - for (int i = 0; i < hashPartitions.length; i++) { - if (!isOnDisk(i)) { - size += hashPartitions[i].hashMap.memorySize(); - } - } - return size >= memoryThreshold; + return refreshMemoryUsed() >= memoryThreshold; } /** @@ -385,11 +447,11 @@ public class HybridHashTableContainer /** * Move the hashtable of a specified partition from memory into local file system * @param partitionId the hashtable to be moved + * @return amount of memory freed */ - private void spillPartition(int partitionId) throws IOException { + public long spillPartition(int partitionId) throws IOException { HashPartition partition = hashPartitions[partitionId]; int inMemRowCount = partition.hashMap.getNumValues(); - long inMemSize = partition.hashMap.memorySize(); Path path = Files.createTempFile("partition-" + partitionId + "-", null); OutputStream outputStream = Files.newOutputStream(path); @@ -403,57 +465,55 @@ public class HybridHashTableContainer partition.hashMapLocalPath = path; partition.hashMapOnDisk = true; - long size = 0; - for (int i = 0; i < hashPartitions.length; i++) { - if (!isOnDisk(i)) { - size += hashPartitions[i].hashMap.memorySize(); - } - } LOG.info("Spilling hash partition " + partitionId + " (Rows: " + inMemRowCount + - ", Mem size: " + inMemSize + "): " + path); - LOG.info("Memory usage before spilling: " + size); - LOG.info("Memory usage after spilling: " + (size - inMemSize)); + ", Mem size: " + partition.hashMap.memorySize() + "): " + path); + LOG.info("Memory usage before spilling: " + memoryUsed); + + long memFreed = partition.hashMap.memorySize(); + memoryUsed -= memFreed; + LOG.info("Memory usage after spilling: " + memoryUsed); totalInMemRowCount -= inMemRowCount; partition.hashMap.clear(); + return memFreed; } /** - * Calculate how many partitions are needed. This is an estimation. + * Calculate how many partitions are needed. + * For n-way join, we only do this calculation once in the HashTableLoader, for the biggest small + * table. Other small tables will use the same number. They may need to adjust (usually reduce) + * their individual write buffer size in order not to exceed memory threshold. + * @param memoryThreshold memory threshold for the given table * @param dataSize total data size for the table - * @param wbSize write buffer size + * @param minNumParts minimum required number of partitions + * @param minWbSize minimum required write buffer size + * @param nwayConf the n-way join configuration * @return number of partitions needed */ - private int calcNumPartitions(long dataSize, int wbSize) { - if (memoryThreshold < wbSize) { - throw new IllegalStateException("Available memory is less than hashtable writebuffer size!" - + " Try increasing hive.auto.convert.join.noconditionaltask.size."); - } - - int lowerLimit = 2; - int numPartitions = (int) Math.ceil(dataSize / wbSize); - - LOG.info("Total available memory: " + memoryThreshold); - LOG.info("Estimated small table size: " + dataSize); - LOG.info("Write buffer size: " + wbSize); - LOG.info("Initial number of partitions: " + numPartitions); + public static int calcNumPartitions(long memoryThreshold, long dataSize, int minNumParts, + int minWbSize, HybridHashTableConf nwayConf) throws IOException { + int numPartitions = minNumParts; - if (numPartitions < lowerLimit) { - return lowerLimit; - } else if (dataSize > memoryThreshold) { - numPartitions = (int) (memoryThreshold / wbSize); + if (memoryThreshold < minNumParts * minWbSize) { + LOG.warn("Available memory is not enough to create a HybridHashTableContainer!"); } - // Make sure numPartitions is power of 2, to make N & (M - 1) easy when calculating partition No. - numPartitions = (Long.bitCount(numPartitions) == 1) ? numPartitions - : Integer.highestOneBit(numPartitions) << 1; - while (dataSize / numPartitions > memoryThreshold) { - numPartitions *= 2; + if (memoryThreshold < dataSize) { + while (dataSize / numPartitions > memoryThreshold) { + numPartitions *= 2; + } } + LOG.info("Total available memory: " + memoryThreshold); + LOG.info("Estimated small table size: " + dataSize); LOG.info("Number of hash partitions to be created: " + numPartitions); return numPartitions; } + /* Get number of partitions */ + public int getNumPartitions() { + return hashPartitions.length; + } + /* Get total number of rows from all in memory partitions */ public int getTotalInMemRowCount() { return totalInMemRowCount; @@ -494,6 +554,7 @@ public class HybridHashTableContainer hp.hashMap.clear(); } } + memoryUsed = 0; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java index d3ec29a..d1bea48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.io.BytesWritable; import java.io.File; @@ -64,29 +65,31 @@ public class KeyValueContainer { } try { setupOutput(); - } catch (IOException e) { + } catch (IOException | HiveException e) { throw new RuntimeException("Failed to create temporary output file on disk", e); } } - private void setupOutput() throws IOException { - if (parentFile == null) { - parentFile = File.createTempFile("key-value-container", ""); - if (parentFile.delete() && parentFile.mkdir()) { - parentFile.deleteOnExit(); + private void setupOutput() throws IOException, HiveException { + FileOutputStream fos = null; + try { + if (parentFile == null) { + parentFile = File.createTempFile("key-value-container", ""); + if (parentFile.delete() && parentFile.mkdir()) { + parentFile.deleteOnExit(); + } } - } - if (tmpFile == null || input != null) { - tmpFile = File.createTempFile("KeyValueContainer", ".tmp", parentFile); - LOG.info("KeyValueContainer created temp file " + tmpFile.getAbsolutePath()); - tmpFile.deleteOnExit(); - } + if (tmpFile == null || input != null) { + tmpFile = File.createTempFile("KeyValueContainer", ".tmp", parentFile); + LOG.info("KeyValueContainer created temp file " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + } - FileOutputStream fos = null; - try { fos = new FileOutputStream(tmpFile); output = new Output(fos); + } catch (IOException e) { + throw new HiveException(e); } finally { if (output == null && fos != null) { fos.close(); http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java index 18943dd..7d7ce1d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.HiveException; import java.io.File; import java.io.FileInputStream; @@ -67,29 +68,31 @@ public class ObjectContainer<ROW> { kryo = Utilities.runtimeSerializationKryo.get(); try { setupOutput(); - } catch (IOException e) { + } catch (IOException | HiveException e) { throw new RuntimeException("Failed to create temporary output file on disk", e); } } - private void setupOutput() throws IOException { - if (parentFile == null) { - parentFile = File.createTempFile("object-container", ""); - if (parentFile.delete() && parentFile.mkdir()) { - parentFile.deleteOnExit(); + private void setupOutput() throws IOException, HiveException { + FileOutputStream fos = null; + try { + if (parentFile == null) { + parentFile = File.createTempFile("object-container", ""); + if (parentFile.delete() && parentFile.mkdir()) { + parentFile.deleteOnExit(); + } } - } - if (tmpFile == null || input != null) { - tmpFile = File.createTempFile("ObjectContainer", ".tmp", parentFile); - LOG.info("ObjectContainer created temp file " + tmpFile.getAbsolutePath()); - tmpFile.deleteOnExit(); - } + if (tmpFile == null || input != null) { + tmpFile = File.createTempFile("ObjectContainer", ".tmp", parentFile); + LOG.info("ObjectContainer created temp file " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + } - FileOutputStream fos = null; - try { fos = new FileOutputStream(tmpFile); output = new Output(fos); + } catch (IOException e) { + throw new HiveException(e); } finally { if (output == null && fos != null) { fos.close(); http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java index fe108c4..043f1f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java @@ -69,7 +69,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable @Override public void load(MapJoinTableContainer[] mapJoinTables, - MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) + MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { // Note: it's possible that a MJ operator is in a ReduceWork, in which case the http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index ba5a797..6a81f11 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; +import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableConf; import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; @@ -69,7 +70,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable @Override public void load(MapJoinTableContainer[] mapJoinTables, - MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) + MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { Map<Integer, String> parentToInput = desc.getParentToInput(); @@ -79,10 +80,44 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE); boolean useHybridGraceHashJoin = desc.isHybridHashJoin(); boolean isFirstKey = true; + // TODO remove this after memory manager is in + long noConditionalTaskThreshold = HiveConf.getLongVar( + hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + + // Only applicable to n-way Hybrid Grace Hash Join + HybridHashTableConf nwayConf = null; + long totalSize = 0; + int biggest = 0; // position of the biggest small table + if (useHybridGraceHashJoin && mapJoinTables.length > 2) { + // Create a Conf for n-way HybridHashTableContainers + nwayConf = new HybridHashTableConf(); + + // Find the biggest small table; also calculate total data size of all small tables + long maxSize = 0; // the size of the biggest small table + for (int pos = 0; pos < mapJoinTables.length; pos++) { + if (pos == desc.getPosBigTable()) { + continue; + } + totalSize += desc.getParentDataSizes().get(pos); + biggest = desc.getParentDataSizes().get(pos) > maxSize ? pos : biggest; + maxSize = desc.getParentDataSizes().get(pos) > maxSize ? desc.getParentDataSizes().get(pos) + : maxSize; + } - // Disable hybrid grace hash join for n-way join - if (mapJoinTables.length > 2) { - useHybridGraceHashJoin = false; + // Using biggest small table, calculate number of partitions to create for each small table + float percentage = (float) maxSize / totalSize; + long memory = (long) (noConditionalTaskThreshold * percentage); + int numPartitions = 0; + try { + numPartitions = HybridHashTableContainer.calcNumPartitions(memory, + desc.getParentDataSizes().get(biggest), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), + nwayConf); + } catch (IOException e) { + throw new HiveException(e); + } + nwayConf.setNumberOfPartitions(numPartitions); } for (int pos = 0; pos < mapJoinTables.length; pos++) { @@ -122,10 +157,21 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable Long keyCountObj = parentKeyCounts.get(pos); long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue(); + long memory = 0; + if (useHybridGraceHashJoin) { + if (mapJoinTables.length > 2) { + // Allocate n-way join memory proportionally + float percentage = (float) desc.getParentDataSizes().get(pos) / totalSize; + memory = (long) (noConditionalTaskThreshold * percentage); + } else { // binary join + memory = noConditionalTaskThreshold; + } + } + MapJoinTableContainer tableContainer = useOptimizedTables - ? (useHybridGraceHashJoin ? new HybridHashTableContainer(hconf, keyCount, memUsage, - desc.getParentDataSizes().get(pos)) - : new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage)) + ? (useHybridGraceHashJoin ? new HybridHashTableContainer(hconf, keyCount, + memory, desc.getParentDataSizes().get(pos), nwayConf) + : new MapJoinBytesTableContainer(hconf, valCtx, keyCount, 0)) : new HashMapWrapper(hconf, keyCount); LOG.info("Using tableContainer " + tableContainer.getClass().getSimpleName()); http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index 534a906..0547346 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -311,8 +311,10 @@ public class VectorMapJoinOperator extends MapJoinOperator implements Vectorizat } @Override - protected void reProcessBigTable(HybridHashTableContainer.HashPartition partition) + protected void reProcessBigTable(int partitionId) throws HiveException { + + HybridHashTableContainer.HashPartition partition = firstSmallTable.getHashPartitions()[partitionId]; ObjectContainer bigTable = partition.getMatchfileObjContainer(); DataOutputBuffer dataOutputBuffer = new DataOutputBuffer(); http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index f272b6d..f9d5736 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -744,10 +744,6 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem // Setup a scratch batch that will be used to play back big table rows that were spilled // to disk for the Hybrid Grace hash partitioning. spillReplayBatch = VectorizedBatchUtil.makeLike(batch); - - // TEMPORARY -- Set this up for Hybrid Grace logic in MapJoinOperator.closeOp - hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length]; - smallTable = posSingleVectorMapJoinSmallTable; } protected void displayBatchColumns(VectorizedRowBatch batch, String batchName) { http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 743a975..70c8cb1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -27,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; @@ -449,7 +451,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC int partitionId = hashTableResult.spillPartitionId(); - HybridHashTableContainer ht = (HybridHashTableContainer) mapJoinTables[smallTable]; + HybridHashTableContainer ht = (HybridHashTableContainer) mapJoinTables[posSingleVectorMapJoinSmallTable]; HashPartition hp = ht.getHashPartitions()[partitionId]; VectorMapJoinRowBytesContainer rowBytesContainer = hp.getMatchfileRowBytesContainer(); @@ -499,27 +501,30 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC } @Override - protected void reloadHashTable(HashPartition partition, - HybridHashTableContainer hybridHtContainer) - throws IOException, ClassNotFoundException, HiveException, SerDeException { + protected void reloadHashTable(byte pos, int partitionId) + throws IOException, HiveException, SerDeException, ClassNotFoundException { - // The super method will reload a hash table partition and - // put a single MapJoinBytesTableContainer into the currentSmallTable member. - super.reloadHashTable(partition, hybridHtContainer); + // The super method will reload a hash table partition of one of the small tables. + // Currently, for native vector map join it will only be one small table. + super.reloadHashTable(pos, partitionId); + + MapJoinTableContainer smallTable = spilledMapJoinTables[pos]; vectorMapJoinHashTable = VectorMapJoinOptimizedCreateHashTable.createHashTable(conf, - currentSmallTable); + smallTable); needHashTableSetup = true; LOG.info(CLASS_NAME + " reloadHashTable!"); } @Override - protected void reProcessBigTable(HybridHashTableContainer.HashPartition partition) - throws HiveException, IOException { + protected void reProcessBigTable(int partitionId) + throws HiveException { LOG.info(CLASS_NAME + " reProcessBigTable enter..."); + HashPartition partition = firstSmallTable.getHashPartitions()[partitionId]; + int rowCount = 0; int batchCount = 0; http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java index 0796406..f9550c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java @@ -90,8 +90,8 @@ public abstract class VectorMapJoinFastBytesHashMap } public VectorMapJoinFastBytesHashMap( - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); valueStore = new VectorMapJoinFastValueStore(writeBuffersSize); http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java index d685c22..9dcaf8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java @@ -85,8 +85,8 @@ public abstract class VectorMapJoinFastBytesHashMultiSet } public VectorMapJoinFastBytesHashMultiSet( - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize); } http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java index 9f20fdc..9f122c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java @@ -77,8 +77,8 @@ public abstract class VectorMapJoinFastBytesHashSet } public VectorMapJoinFastBytesHashSet( - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize); } http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java index 594a77f..b6e6321 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java @@ -214,8 +214,8 @@ public abstract class VectorMapJoinFastBytesHashTable } public VectorMapJoinFastBytesHashTable( - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); allocateBucketArray(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMap.java index b37247c..262b619 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMap.java @@ -32,7 +32,7 @@ public abstract class VectorMapJoinFastHashMap public VectorMapJoinFastHashMap( boolean isOuterJoin, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMultiSet.java index 5569f6e..5f7c6a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMultiSet.java @@ -42,7 +42,7 @@ public abstract class VectorMapJoinFastHashMultiSet public VectorMapJoinFastHashMultiSet( boolean isOuterJoin, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashSet.java index 0738df3..8509971 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashSet.java @@ -38,7 +38,7 @@ public abstract class VectorMapJoinFastHashSet public VectorMapJoinFastHashSet( boolean isOuterJoin, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java index 33e34fa..fbe6b4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java @@ -30,7 +30,6 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab protected float loadFactor; protected int writeBuffersSize; - protected long memUsage; protected int metricPutConflict; protected int largestNumberOfSteps; @@ -52,7 +51,7 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab } public VectorMapJoinFastHashTable( - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { + int initialCapacity, float loadFactor, int writeBuffersSize) { initialCapacity = (Long.bitCount(initialCapacity) == 1) ? initialCapacity : nextHighestPowerOfTwo(initialCapacity); @@ -65,6 +64,5 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab this.loadFactor = loadFactor; this.writeBuffersSize = writeBuffersSize; - this.memUsage = memUsage; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java index 92b5d40..4edf604 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java @@ -62,7 +62,7 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive. @Override public void load(MapJoinTableContainer[] mapJoinTables, - MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) + MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { Map<Integer, String> parentToInput = desc.getParentToInput(); @@ -91,7 +91,7 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive. long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue(); VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer = - new VectorMapJoinFastTableContainer(desc, hconf, keyCount, memUsage); + new VectorMapJoinFastTableContainer(desc, hconf, keyCount); while (kvReader.next()) { vectorMapJoinFastTableContainer.putRow( http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java index 3a0b380..d6ad028 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java @@ -86,9 +86,9 @@ public class VectorMapJoinFastLongHashMap public VectorMapJoinFastLongHashMap( boolean minMaxEnabled, boolean isOuterJoin, HashTableKeyType hashTableKeyType, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { + int initialCapacity, float loadFactor, int writeBuffersSize) { super(minMaxEnabled, isOuterJoin, hashTableKeyType, - initialCapacity, loadFactor, writeBuffersSize, memUsage); + initialCapacity, loadFactor, writeBuffersSize); valueStore = new VectorMapJoinFastValueStore(writeBuffersSize); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java index f9763e3..e447551 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java @@ -84,8 +84,8 @@ public class VectorMapJoinFastLongHashMultiSet public VectorMapJoinFastLongHashMultiSet( boolean minMaxEnabled, boolean isOuterJoin, HashTableKeyType hashTableKeyType, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { + int initialCapacity, float loadFactor, int writeBuffersSize) { super(minMaxEnabled, isOuterJoin, hashTableKeyType, - initialCapacity, loadFactor, writeBuffersSize, memUsage); + initialCapacity, loadFactor, writeBuffersSize); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java index cd23949..aa44e60 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java @@ -77,8 +77,8 @@ public class VectorMapJoinFastLongHashSet public VectorMapJoinFastLongHashSet( boolean minMaxEnabled, boolean isOuterJoin, HashTableKeyType hashTableKeyType, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { + int initialCapacity, float loadFactor, int writeBuffersSize) { super(minMaxEnabled, isOuterJoin, hashTableKeyType, - initialCapacity, loadFactor, writeBuffersSize, memUsage); + initialCapacity, loadFactor, writeBuffersSize); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java index b448e1f..2137fb7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java @@ -270,8 +270,8 @@ public abstract class VectorMapJoinFastLongHashTable public VectorMapJoinFastLongHashTable( boolean minMaxEnabled, boolean isOuterJoin, HashTableKeyType hashTableKeyType, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); this.isOuterJoin = isOuterJoin; this.hashTableKeyType = hashTableKeyType; PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.longTypeInfo }; http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java index b962475..9a9fb8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java @@ -28,12 +28,12 @@ public class VectorMapJoinFastMultiKeyHashMap @VisibleForTesting public VectorMapJoinFastMultiKeyHashMap(int initialCapacity, float loadFactor, int wbSize) { - this(false, initialCapacity, loadFactor, wbSize, -1); + this(false, initialCapacity, loadFactor, wbSize); } public VectorMapJoinFastMultiKeyHashMap( boolean isOuterJoin, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java index 71a62fe..a8744a5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java @@ -26,7 +26,7 @@ public class VectorMapJoinFastMultiKeyHashMultiSet public VectorMapJoinFastMultiKeyHashMultiSet( boolean isOuterJoin, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java index dad3b32..a8048e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java @@ -26,7 +26,7 @@ public class VectorMapJoinFastMultiKeyHashSet public VectorMapJoinFastMultiKeyHashSet( boolean isOuterJoin, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java index c80ea89..6f181b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java @@ -37,8 +37,8 @@ public class VectorMapJoinFastStringHashMap extends VectorMapJoinFastBytesHashMa public VectorMapJoinFastStringHashMap( boolean isOuterJoin, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java index 4933b16..9653b71 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java @@ -37,8 +37,8 @@ public class VectorMapJoinFastStringHashMultiSet extends VectorMapJoinFastBytesH public VectorMapJoinFastStringHashMultiSet( boolean isOuterJoin, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java index ae8d943..6419a0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java @@ -37,8 +37,8 @@ public class VectorMapJoinFastStringHashSet extends VectorMapJoinFastBytesHashSe public VectorMapJoinFastStringHashSet( boolean isOuterJoin, - int initialCapacity, float loadFactor, int writeBuffersSize, long memUsage) { - super(initialCapacity, loadFactor, writeBuffersSize, memUsage); + int initialCapacity, float loadFactor, int writeBuffersSize) { + super(initialCapacity, loadFactor, writeBuffersSize); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c72d073c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java index 3789275..373b5f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java @@ -56,13 +56,12 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai private float loadFactor; private int wbSize; private long keyCount; - private long memUsage; private VectorMapJoinFastHashTable VectorMapJoinFastHashTable; public VectorMapJoinFastTableContainer(MapJoinDesc desc, Configuration hconf, - long keyCount, long memUsage) throws SerDeException { + long keyCount) throws SerDeException { this.desc = desc; this.hconf = hconf; @@ -73,13 +72,11 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai wbSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE); this.keyCount = keyCount; - this.memUsage = memUsage; // LOG.info("VectorMapJoinFastTableContainer load keyCountAdj " + keyCountAdj); // LOG.info("VectorMapJoinFastTableContainer load threshold " + threshold); // LOG.info("VectorMapJoinFastTableContainer load loadFactor " + loadFactor); // LOG.info("VectorMapJoinFastTableContainer load wbSize " + wbSize); - // LOG.info("VectorMapJoinFastTableContainer load memUsage " + memUsage); int newThreshold = HashMapWrapper.calculateTableSize( keyCountAdj, threshold, loadFactor, keyCount); @@ -117,17 +114,17 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai case HASH_MAP: hashTable = new VectorMapJoinFastLongHashMap( minMaxEnabled, isOuterJoin, hashTableKeyType, - newThreshold, loadFactor, writeBufferSize, memUsage); + newThreshold, loadFactor, writeBufferSize); break; case HASH_MULTISET: hashTable = new VectorMapJoinFastLongHashMultiSet( minMaxEnabled, isOuterJoin, hashTableKeyType, - newThreshold, loadFactor, writeBufferSize, memUsage); + newThreshold, loadFactor, writeBufferSize); break; case HASH_SET: hashTable = new VectorMapJoinFastLongHashSet( minMaxEnabled, isOuterJoin, hashTableKeyType, - newThreshold, loadFactor, writeBufferSize, memUsage); + newThreshold, loadFactor, writeBufferSize); break; } break; @@ -137,17 +134,17 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai case HASH_MAP: hashTable = new VectorMapJoinFastStringHashMap( isOuterJoin, - newThreshold, loadFactor, writeBufferSize, memUsage); + newThreshold, loadFactor, writeBufferSize); break; case HASH_MULTISET: hashTable = new VectorMapJoinFastStringHashMultiSet( isOuterJoin, - newThreshold, loadFactor, writeBufferSize, memUsage); + newThreshold, loadFactor, writeBufferSize); break; case HASH_SET: hashTable = new VectorMapJoinFastStringHashSet( isOuterJoin, - newThreshold, loadFactor, writeBufferSize, memUsage); + newThreshold, loadFactor, writeBufferSize); break; } break; @@ -157,17 +154,17 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai case HASH_MAP: hashTable = new VectorMapJoinFastMultiKeyHashMap( isOuterJoin, - newThreshold, loadFactor, writeBufferSize, memUsage); + newThreshold, loadFactor, writeBufferSize); break; case HASH_MULTISET: hashTable = new VectorMapJoinFastMultiKeyHashMultiSet( isOuterJoin, - newThreshold, loadFactor, writeBufferSize, memUsage); + newThreshold, loadFactor, writeBufferSize); break; case HASH_SET: hashTable = new VectorMapJoinFastMultiKeyHashSet( isOuterJoin, - newThreshold, loadFactor, writeBufferSize, memUsage); + newThreshold, loadFactor, writeBufferSize); break; } break;
