Repository: hive Updated Branches: refs/heads/branch-2.0 fffebe67f -> 6fed7783d
HIVE-12610: Hybrid Grace Hash Join should fail task faster if processing first batch fails, instead of continuing processing the rest (Wei Zheng via Vikram Dixit K) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6fed7783 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6fed7783 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6fed7783 Branch: refs/heads/branch-2.0 Commit: 6fed7783db836ad6c52bbe87c283a836c1a764bb Parents: fffebe6 Author: vikram <[email protected]> Authored: Wed Dec 16 16:14:50 2015 -0800 Committer: vikram <[email protected]> Committed: Wed Dec 16 16:53:58 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/MapJoinOperator.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6fed7783/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index fbc5ea4..dc0b85e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import com.esotericsoftware.kryo.KryoException; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; @@ -513,6 +514,11 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem if (hashPartitions[i].isHashMapOnDisk()) { try { continueProcess(i); // Re-process spilled data + } catch (KryoException ke) { + LOG.error("Processing the spilled data failed due to Kryo error!"); + LOG.error("Cleaning up all spilled data!"); + cleanupGraceHashJoin(); + throw new HiveException(ke); } catch (Exception e) { throw new HiveException(e); } @@ -657,6 +663,19 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem } /** + * Clean up data participating the join, i.e. in-mem and on-disk files for small table(s) and big table + */ + private void cleanupGraceHashJoin() { + for (byte pos = 0; pos < mapJoinTables.length; pos++) { + if (pos != conf.getPosBigTable()) { + LOG.info("Cleaning up small table data at pos: " + pos); + HybridHashTableContainer container = (HybridHashTableContainer) mapJoinTables[pos]; + container.clear(); + } + } + } + + /** * Implements the getName function for the Node Interface. * * @return the name of the operator
