Repository: hive Updated Branches: refs/heads/branch-1 130293e56 -> 9a2bd0c48 refs/heads/branch-2.0 75723a5d8 -> 5cbada84d refs/heads/master aac9263b4 -> 145e253df
HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/145e253d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/145e253d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/145e253d Branch: refs/heads/master Commit: 145e253df9c05e4e725c6aeab172ac0885bf5384 Parents: aac9263 Author: Sergey Shelukhin <[email protected]> Authored: Fri Apr 22 10:55:53 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Fri Apr 22 10:55:53 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/GroupByOperator.java | 1 + .../hadoop/hive/ql/exec/ReduceSinkOperator.java | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/145e253d/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index e39b75e..47b5793 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -1102,6 +1102,7 @@ public class GroupByOperator extends Operator<GroupByDesc> { throw new HiveException(e); } } + hashAggregations = null; } // Group by contains the columns needed - no need to aggregate from children http://git-wip-us.apache.org/repos/asf/hive/blob/145e253d/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 1b8e7d2..ba71a1e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -115,7 +115,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD) protected transient boolean useUniformHash = false; // picks topN K:V pairs from input. - protected transient TopNHash reducerHash = new TopNHash(); + protected transient TopNHash reducerHash; protected transient HiveKey keyWritable = new HiveKey(); protected transient ObjectInspector keyObjectInspector; protected transient ObjectInspector valueObjectInspector; @@ -237,7 +237,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> float memUsage = conf.getTopNMemoryUsage(); if (limit >= 0 && memUsage > 0) { - reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash; + reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash(); reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); } @@ -385,8 +385,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> */ boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row); - // Try to store the first key. If it's not excluded, we will proceed. - int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull); + // Try to store the first key. + // if TopNHashes aren't active, always forward + // if TopNHashes are active, proceed if not already excluded (i.e order by limit) + final int firstIndex = + (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD; if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do. // Compute value and hashcode - we'd either store or forward them. BytesWritable value = makeValueWritable(row); @@ -394,6 +397,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> if (firstIndex == TopNHash.FORWARD) { collect(firstKey, value); } else { + // invariant: reducerHash != null assert firstIndex >= 0; reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false); } @@ -563,12 +567,13 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> @Override protected void closeOp(boolean abort) throws HiveException { - if (!abort) { + if (!abort && reducerHash != null) { reducerHash.flush(); } super.closeOp(abort); out = null; random = null; + reducerHash = null; if (isLogInfoEnabled) { LOG.info(toString() + ": records written - " + numRows); }
