HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator 
(Gopal V, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5cbada84
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5cbada84
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5cbada84

Branch: refs/heads/branch-2.0
Commit: 5cbada84d0b020245936648f7785d4b3112b6934
Parents: 75723a5
Author: Sergey Shelukhin <[email protected]>
Authored: Fri Apr 22 10:55:53 2016 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Fri Apr 22 11:00:15 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/GroupByOperator.java  |  1 +
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java      | 15 ++++++++++-----
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5cbada84/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 0839b42..b88722f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -1104,6 +1104,7 @@ public class GroupByOperator extends 
Operator<GroupByDesc> {
         throw new HiveException(e);
       }
     }
+    hashAggregations = null;
   }
 
   // Group by contains the columns needed - no need to aggregate from children

http://git-wip-us.apache.org/repos/asf/hive/blob/5cbada84/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 24642af..27dbf59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -115,7 +115,7 @@ public class ReduceSinkOperator extends 
TerminalOperator<ReduceSinkDesc>
   protected transient String[] inputAliases;  // input aliases of this RS for 
join (used for PPD)
   protected transient boolean useUniformHash = false;
   // picks topN K:V pairs from input.
-  protected transient TopNHash reducerHash = new TopNHash();
+  protected transient TopNHash reducerHash;
   protected transient HiveKey keyWritable = new HiveKey();
   protected transient ObjectInspector keyObjectInspector;
   protected transient ObjectInspector valueObjectInspector;
@@ -242,7 +242,7 @@ public class ReduceSinkOperator extends 
TerminalOperator<ReduceSinkDesc>
       float memUsage = conf.getTopNMemoryUsage();
 
       if (limit >= 0 && memUsage > 0) {
-        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
+        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new 
TopNHash();
         reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
       }
 
@@ -381,8 +381,11 @@ public class ReduceSinkOperator extends 
TerminalOperator<ReduceSinkDesc>
        */
       boolean partKeyNull = conf.isPTFReduceSink() && 
partitionKeysAreNull(row);
 
-      // Try to store the first key. If it's not excluded, we will proceed.
-      int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull);
+      // Try to store the first key.
+      // if TopNHashes aren't active, always forward
+      // if TopNHashes are active, proceed if not already excluded (i.e order 
by limit)
+      final int firstIndex =
+          (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, 
partKeyNull) : TopNHash.FORWARD;
       if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
       // Compute value and hashcode - we'd either store or forward them.
       BytesWritable value = makeValueWritable(row);
@@ -390,6 +393,7 @@ public class ReduceSinkOperator extends 
TerminalOperator<ReduceSinkDesc>
       if (firstIndex == TopNHash.FORWARD) {
         collect(firstKey, value);
       } else {
+        // invariant: reducerHash != null
         assert firstIndex >= 0;
         reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false);
       }
@@ -566,12 +570,13 @@ public class ReduceSinkOperator extends 
TerminalOperator<ReduceSinkDesc>
 
   @Override
   protected void closeOp(boolean abort) throws HiveException {
-    if (!abort) {
+    if (!abort && reducerHash != null) {
       reducerHash.flush();
     }
     super.closeOp(abort);
     out = null;
     random = null;
+    reducerHash = null;
     if (isLogInfoEnabled) {
       LOG.info(toString() + ": records written - " + numRows);
     }

Reply via email to