Repository: hive Updated Branches: refs/heads/master 305b8ce40 -> 0f1c112fc
HIVE-12610: Hybrid Grace Hash Join should fail task faster if processing first batch fails, instead of continuing processing the rest (Wei Zheng via Vikram Dixit K) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0f1c112f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0f1c112f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0f1c112f Branch: refs/heads/master Commit: 0f1c112fc489fac3781470589517fd1025a306ed Parents: 305b8ce Author: vikram <[email protected]> Authored: Wed Dec 16 16:06:46 2015 -0800 Committer: vikram <[email protected]> Committed: Wed Dec 16 16:06:46 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/MapJoinOperator.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0f1c112f/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index fbc5ea4..dc0b85e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import com.esotericsoftware.kryo.KryoException; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; @@ -513,6 +514,11 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem if (hashPartitions[i].isHashMapOnDisk()) { try { continueProcess(i); // Re-process spilled data + } catch (KryoException ke) { + LOG.error("Processing the spilled data failed due to Kryo error!"); + LOG.error("Cleaning up all spilled data!"); + cleanupGraceHashJoin(); + throw new HiveException(ke); } catch (Exception e) { throw new HiveException(e); } @@ -657,6 +663,19 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem } /** + * Clean up data participating the join, i.e. in-mem and on-disk files for small table(s) and big table + */ + private void cleanupGraceHashJoin() { + for (byte pos = 0; pos < mapJoinTables.length; pos++) { + if (pos != conf.getPosBigTable()) { + LOG.info("Cleaning up small table data at pos: " + pos); + HybridHashTableContainer container = (HybridHashTableContainer) mapJoinTables[pos]; + container.clear(); + } + } + } + + /** * Implements the getName function for the Node Interface. * * @return the name of the operator
