Repository: hive
Updated Branches:
  refs/heads/branch-2.0 fffebe67f -> 6fed7783d


HIVE-12610: Hybrid Grace Hash Join should fail task faster if processing first 
batch fails, instead of continuing processing the rest (Wei Zheng via Vikram 
Dixit K)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6fed7783
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6fed7783
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6fed7783

Branch: refs/heads/branch-2.0
Commit: 6fed7783db836ad6c52bbe87c283a836c1a764bb
Parents: fffebe6
Author: vikram <[email protected]>
Authored: Wed Dec 16 16:14:50 2015 -0800
Committer: vikram <[email protected]>
Committed: Wed Dec 16 16:53:58 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/MapJoinOperator.java     | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6fed7783/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index fbc5ea4..dc0b85e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 
+import com.esotericsoftware.kryo.KryoException;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
@@ -513,6 +514,11 @@ public class MapJoinOperator extends 
AbstractMapJoinOperator<MapJoinDesc> implem
             if (hashPartitions[i].isHashMapOnDisk()) {
               try {
                 continueProcess(i);     // Re-process spilled data
+              } catch (KryoException ke) {
+                LOG.error("Processing the spilled data failed due to Kryo 
error!");
+                LOG.error("Cleaning up all spilled data!");
+                cleanupGraceHashJoin();
+                throw new HiveException(ke);
               } catch (Exception e) {
                 throw new HiveException(e);
               }
@@ -657,6 +663,19 @@ public class MapJoinOperator extends 
AbstractMapJoinOperator<MapJoinDesc> implem
   }
 
   /**
+   * Clean up data participating the join, i.e. in-mem and on-disk files for 
small table(s) and big table
+   */
+  private void cleanupGraceHashJoin() {
+    for (byte pos = 0; pos < mapJoinTables.length; pos++) {
+      if (pos != conf.getPosBigTable()) {
+        LOG.info("Cleaning up small table data at pos: " + pos);
+        HybridHashTableContainer container = (HybridHashTableContainer) 
mapJoinTables[pos];
+        container.clear();
+      }
+    }
+  }
+
+  /**
    * Implements the getName function for the Node Interface.
    *
    * @return the name of the operator

Reply via email to