Repository: hive Updated Branches: refs/heads/master 3104d4756 -> 3e46515d3
HIVE-20177: Vectorization: Reduce KeyWrapper allocation in GroupBy Streaming mode (Gopal V, reviewed by Matt McCline) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4d251514 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4d251514 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4d251514 Branch: refs/heads/master Commit: 4d251514fde46a28f1d59d439097918576c26560 Parents: 3104d47 Author: Gopal V <[email protected]> Authored: Tue Jul 31 13:48:07 2018 -0700 Committer: Gopal V <[email protected]> Committed: Tue Jul 31 13:48:07 2018 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/vector/VectorGroupByOperator.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4d251514/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 75efc29..43f1162 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -725,10 +725,11 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> VectorHashKeyWrapper[] batchKeys = keyWrappersBatch.getVectorHashKeyWrappers(); + final VectorHashKeyWrapper prevKey = streamingKey; if (streamingKey == null) { // This is the first batch we process after switching from hash mode currentStreamingAggregators = streamAggregationBufferRowPool.getFromPool(); - streamingKey = (VectorHashKeyWrapper) batchKeys[0].copyKey(); + streamingKey = batchKeys[0]; } aggregationBatchInfo.startBatch(); @@ -739,14 +740,9 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> // We've encountered a new key, must save current one // We can't forward yet, the aggregators have not been evaluated rowsToFlush[flushMark] = currentStreamingAggregators; - if (keysToFlush[flushMark] == null) { - keysToFlush[flushMark] = (VectorHashKeyWrapper) streamingKey.copyKey(); - } else { - streamingKey.duplicateTo(keysToFlush[flushMark]); - } - + keysToFlush[flushMark] = streamingKey; currentStreamingAggregators = streamAggregationBufferRowPool.getFromPool(); - batchKeys[i].duplicateTo(streamingKey); + streamingKey = batchKeys[i]; ++flushMark; } aggregationBatchInfo.mapAggregationBufferSet(currentStreamingAggregators, i); @@ -759,8 +755,13 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> for (int i = 0; i < flushMark; ++i) { writeSingleRow(keysToFlush[i], rowsToFlush[i]); rowsToFlush[i].reset(); + keysToFlush[i] = null; streamAggregationBufferRowPool.putInPool(rowsToFlush[i]); } + + if (streamingKey != prevKey) { + streamingKey = (VectorHashKeyWrapper) streamingKey.copyKey(); + } } @Override
