HIVE-10456: Grace Hash Join should not load spilled partitions on abort (Prasanth Jayachandran reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/07fcb098 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/07fcb098 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/07fcb098 Branch: refs/heads/llap Commit: 07fcb098b63003cf74718351269c79870100b8de Parents: 77b7fc3 Author: Prasanth Jayachandran <[email protected]> Authored: Sat May 2 17:40:01 2015 -0700 Committer: Prasanth Jayachandran <[email protected]> Committed: Sat May 2 17:40:01 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/MapJoinOperator.java | 136 +++++++++++-------- .../apache/hadoop/hive/ql/exec/ObjectCache.java | 7 + .../hadoop/hive/ql/exec/mr/ObjectCache.java | 5 + .../persistence/HybridHashTableContainer.java | 35 ++++- .../hive/ql/exec/tez/HashTableLoader.java | 5 - .../hadoop/hive/ql/exec/tez/ObjectCache.java | 6 + .../mapjoin/VectorMapJoinRowBytesContainer.java | 2 +- 7 files changed, 131 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index f2b800a..1cfc411 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -284,7 +284,17 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); loader.init(mapContext, mrContext, hconf, this); - loader.load(mapJoinTables, mapJoinTableSerdes); + try { + loader.load(mapJoinTables, mapJoinTableSerdes); + } catch (HiveException e) { + if (isLogInfoEnabled) { + LOG.info("Exception loading hash tables. Clearing partially loaded hash table containers."); + } + + // there could be some spilled partitions which needs to be cleaned up + clearAllTableContainers(); + throw e; + } hashTblInitedOnce = true; @@ -433,7 +443,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem @Override public void closeOp(boolean abort) throws HiveException { boolean spilled = false; - for (MapJoinTableContainer container: mapJoinTables) { + for (MapJoinTableContainer container : mapJoinTables) { if (container != null) { spilled = spilled || container.hasSpill(); container.dumpMetrics(); @@ -442,79 +452,93 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem // For Hybrid Grace Hash Join, we need to see if there is any spilled data to be processed next if (spilled) { - if (hashMapRowGetters == null) { - hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length]; - } - int numPartitions = 0; - // Find out number of partitions for each small table (should be same across tables) - for (byte pos = 0; pos < mapJoinTables.length; pos++) { - if (pos != conf.getPosBigTable()) { - firstSmallTable = (HybridHashTableContainer)mapJoinTables[pos]; - numPartitions = firstSmallTable.getHashPartitions().length; - break; + if (!abort) { + if (hashMapRowGetters == null) { + hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length]; } - } - assert numPartitions != 0 : "Number of partitions must be greater than 0!"; - - if (firstSmallTable.hasSpill()) { - spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length]; - hybridMapJoinLeftover = true; - - // Clear all in-memory partitions first + int numPartitions = 0; + // Find out number of partitions for each small table (should be same across tables) for (byte pos = 0; pos < mapJoinTables.length; pos++) { - MapJoinTableContainer tableContainer = mapJoinTables[pos]; - if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) { - HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; - hybridHtContainer.dumpStats(); - - HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions(); - // Clear all in memory partitions first - for (int i = 0; i < hashPartitions.length; i++) { - if (!hashPartitions[i].isHashMapOnDisk()) { - hybridHtContainer.setTotalInMemRowCount( - hybridHtContainer.getTotalInMemRowCount() - - hashPartitions[i].getHashMapFromMemory().getNumValues()); - hashPartitions[i].getHashMapFromMemory().clear(); + if (pos != conf.getPosBigTable()) { + firstSmallTable = (HybridHashTableContainer) mapJoinTables[pos]; + numPartitions = firstSmallTable.getHashPartitions().length; + break; + } + } + assert numPartitions != 0 : "Number of partitions must be greater than 0!"; + + if (firstSmallTable.hasSpill()) { + spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length]; + hybridMapJoinLeftover = true; + + // Clear all in-memory partitions first + for (byte pos = 0; pos < mapJoinTables.length; pos++) { + MapJoinTableContainer tableContainer = mapJoinTables[pos]; + if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) { + HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; + hybridHtContainer.dumpStats(); + + HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions(); + // Clear all in memory partitions first + for (int i = 0; i < hashPartitions.length; i++) { + if (!hashPartitions[i].isHashMapOnDisk()) { + hybridHtContainer.setTotalInMemRowCount( + hybridHtContainer.getTotalInMemRowCount() - + hashPartitions[i].getHashMapFromMemory().getNumValues()); + hashPartitions[i].getHashMapFromMemory().clear(); + } } + assert hybridHtContainer.getTotalInMemRowCount() == 0; } - assert hybridHtContainer.getTotalInMemRowCount() == 0; } - } - // Reprocess the spilled data - for (int i = 0; i < numPartitions; i++) { - HashPartition[] hashPartitions = firstSmallTable.getHashPartitions(); - if (hashPartitions[i].isHashMapOnDisk()) { - try { - continueProcess(i); // Re-process spilled data - } catch (IOException e) { - e.printStackTrace(); - } catch (SerDeException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } - for (byte pos = 0; pos < order.length; pos++) { - if (pos != conf.getPosBigTable()) - spilledMapJoinTables[pos] = null; + // Reprocess the spilled data + for (int i = 0; i < numPartitions; i++) { + HashPartition[] hashPartitions = firstSmallTable.getHashPartitions(); + if (hashPartitions[i].isHashMapOnDisk()) { + try { + continueProcess(i); // Re-process spilled data + } catch (Exception e) { + throw new HiveException(e); + } + for (byte pos = 0; pos < order.length; pos++) { + if (pos != conf.getPosBigTable()) + spilledMapJoinTables[pos] = null; + } } } } } + + if (isLogInfoEnabled) { + LOG.info("spilled: " + spilled + " abort: " + abort + ". Clearing spilled partitions."); + } + + // spilled tables are loaded always (no sharing), so clear it + clearAllTableContainers(); + cache.remove(cacheKey); } + // in mapreduce case, we need to always clear up as mapreduce doesn't have object registry. if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) - && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) - && mapJoinTables != null) { + && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())) { + if (isLogInfoEnabled) { + LOG.info("MR: Clearing all map join table containers."); + } + clearAllTableContainers(); + } + + super.closeOp(abort); + } + + private void clearAllTableContainers() { + if (mapJoinTables != null) { for (MapJoinTableContainer tableContainer : mapJoinTables) { if (tableContainer != null) { tableContainer.clear(); } } } - cache.release(cacheKey); - this.loader = null; - super.closeOp(abort); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java index f0df2d3..440e0a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java @@ -53,4 +53,11 @@ public interface ObjectCache { * @return the last cached object with the key, null if none. */ public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws HiveException; + + /** + * Removes the specified key from the object cache. + * + * @param key - key to be removed + */ + public void remove(String key); } http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java index a6f698d..bf4ae8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java @@ -91,4 +91,9 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { } }; } + + @Override + public void remove(String key) { + // nothing to do + } } http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 3f6d61e..412226e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -183,6 +183,36 @@ public class HybridHashTableContainer public boolean isHashMapOnDisk() { return hashMapOnDisk; } + + public void clear() { + if (hashMap != null) { + hashMap.clear(); + hashMap = null; + } + + if (hashMapLocalPath != null) { + try { + Files.delete(hashMapLocalPath); + } catch (Throwable ignored) { + } + hashMapLocalPath = null; + } + + if (sidefileKVContainer != null) { + sidefileKVContainer.clear(); + sidefileKVContainer = null; + } + + if (matchfileObjContainer != null) { + matchfileObjContainer.clear(); + matchfileObjContainer = null; + } + + if (matchfileRowBytesContainer != null) { + matchfileRowBytesContainer.clear(); + matchfileRowBytesContainer = null; + } + } } public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable, @@ -546,12 +576,11 @@ public class HybridHashTableContainer return toSpillPartitionId; } - /* Clean up in memory hashtables */ @Override public void clear() { for (HashPartition hp : hashPartitions) { - if (hp.hashMap != null) { - hp.hashMap.clear(); + if (hp != null) { + hp.clear(); } } memoryUsed = 0; http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 6a81f11..536b92c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; @@ -181,10 +180,6 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable } tableContainer.seal(); mapJoinTables[pos] = tableContainer; - } catch (IOException e) { - throw new HiveException(e); - } catch (SerDeException e) { - throw new HiveException(e); } catch (Exception e) { throw new HiveException(e); } http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java index c0bcb21..64295d4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java @@ -93,4 +93,10 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { } }); } + + @Override + public void remove(String key) { + LOG.info("Removing key: " + key); + registry.delete(key); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java index c8359d3..1c91be6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java @@ -290,7 +290,7 @@ public class VectorMapJoinRowBytesContainer { return currentLength; } - public void clear() throws IOException { + public void clear() { if (fileInputStream != null) { try { fileInputStream.close();
