Repository: hive Updated Branches: refs/heads/branch-1.2 01bc86b8d -> 0b22cbbf3
HIVE-12610: Hybrid Grace Hash Join should fail task faster if processing first batch fails, instead of continuing processing the rest (Wei Zheng via Vikram Dixit K) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0b22cbbf Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0b22cbbf Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0b22cbbf Branch: refs/heads/branch-1.2 Commit: 0b22cbbf3ac7f18c22145fbd70a7d14fea45d463 Parents: 01bc86b Author: vikram <[email protected]> Authored: Wed Dec 16 16:14:50 2015 -0800 Committer: vikram <[email protected]> Committed: Wed Dec 16 16:14:50 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/MapJoinOperator.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0b22cbbf/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index b1352f3..97e0b32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import com.esotericsoftware.kryo.KryoException; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.logging.Log; @@ -498,6 +499,11 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem if (hashPartitions[i].isHashMapOnDisk()) { try { continueProcess(i); // Re-process spilled data + } catch (KryoException ke) { + LOG.error("Processing the spilled data failed due to Kryo error!"); + LOG.error("Cleaning up all spilled data!"); + cleanupGraceHashJoin(); + throw new HiveException(ke); } catch (Exception e) { throw new HiveException(e); } @@ -635,6 +641,19 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem } /** + * Clean up data participating the join, i.e. in-mem and on-disk files for small table(s) and big table + */ + private void cleanupGraceHashJoin() { + for (byte pos = 0; pos < mapJoinTables.length; pos++) { + if (pos != conf.getPosBigTable()) { + LOG.info("Cleaning up small table data at pos: " + pos); + HybridHashTableContainer container = (HybridHashTableContainer) mapJoinTables[pos]; + container.clear(); + } + } + } + + /** * Implements the getName function for the Node Interface. * * @return the name of the operator
