HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9a2bd0c4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9a2bd0c4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9a2bd0c4 Branch: refs/heads/branch-1 Commit: 9a2bd0c48179f2b654b69adbbe88cd5485492658 Parents: 130293e Author: Sergey Shelukhin <[email protected]> Authored: Fri Apr 22 10:55:53 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Fri Apr 22 11:05:35 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/GroupByOperator.java | 1 + .../hadoop/hive/ql/exec/ReduceSinkOperator.java | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9a2bd0c4/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 9867739..083d4fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -1095,6 +1095,7 @@ public class GroupByOperator extends Operator<GroupByDesc> { throw new HiveException(e); } } + hashAggregations = null; } // Group by contains the columns needed - no need to aggregate from children http://git-wip-us.apache.org/repos/asf/hive/blob/9a2bd0c4/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 82d4078..23497d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -118,7 +118,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD) protected transient boolean useUniformHash = false; // picks topN K:V pairs from input. - protected transient TopNHash reducerHash = new TopNHash(); + protected transient TopNHash reducerHash; protected transient HiveKey keyWritable = new HiveKey(); protected transient ObjectInspector keyObjectInspector; protected transient ObjectInspector valueObjectInspector; @@ -236,7 +236,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> float memUsage = conf.getTopNMemoryUsage(); if (limit >= 0 && memUsage > 0) { - reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash; + reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash(); reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); } @@ -376,8 +376,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> */ boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row); - // Try to store the first key. If it's not excluded, we will proceed. - int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull); + // Try to store the first key. + // if TopNHashes aren't active, always forward + // if TopNHashes are active, proceed if not already excluded (i.e order by limit) + final int firstIndex = + (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD; if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do. // Compute value and hashcode - we'd either store or forward them. BytesWritable value = makeValueWritable(row); @@ -385,6 +388,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> if (firstIndex == TopNHash.FORWARD) { collect(firstKey, value); } else { + // invariant: reducerHash != null assert firstIndex >= 0; reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false); } @@ -561,12 +565,13 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> @Override protected void closeOp(boolean abort) throws HiveException { - if (!abort) { + if (!abort && reducerHash != null) { reducerHash.flush(); } super.closeOp(abort); out = null; random = null; + reducerHash = null; if (isLogInfoEnabled) { LOG.info(toString() + ": records written - " + numRows); }
