Repository: hive Updated Branches: refs/heads/branch-3 e96728c52 -> d999a3b23
HIVE-20177: Vectorization: Reduce KeyWrapper allocation in GroupBy Streaming mode (Gopal V, reviewed by Matt McCline) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3bb4d1ca Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3bb4d1ca Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3bb4d1ca Branch: refs/heads/branch-3 Commit: 3bb4d1cad063ece840d56aeb42df9ea19c3554b9 Parents: e96728c Author: Gopal V <[email protected]> Authored: Tue Jul 31 13:48:07 2018 -0700 Committer: Gopal V <[email protected]> Committed: Tue Jul 31 13:53:03 2018 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/vector/VectorGroupByOperator.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3bb4d1ca/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 75efc29..43f1162 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -725,10 +725,11 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> VectorHashKeyWrapper[] batchKeys = keyWrappersBatch.getVectorHashKeyWrappers(); + final VectorHashKeyWrapper prevKey = streamingKey; if (streamingKey == null) { // This is the first batch we process after switching from hash mode currentStreamingAggregators = streamAggregationBufferRowPool.getFromPool(); - streamingKey = (VectorHashKeyWrapper) batchKeys[0].copyKey(); + streamingKey = batchKeys[0]; } aggregationBatchInfo.startBatch(); @@ -739,14 +740,9 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> // We've encountered a new key, must save current one // We can't forward yet, the aggregators have not been evaluated rowsToFlush[flushMark] = currentStreamingAggregators; - if (keysToFlush[flushMark] == null) { - keysToFlush[flushMark] = (VectorHashKeyWrapper) streamingKey.copyKey(); - } else { - streamingKey.duplicateTo(keysToFlush[flushMark]); - } - + keysToFlush[flushMark] = streamingKey; currentStreamingAggregators = streamAggregationBufferRowPool.getFromPool(); - batchKeys[i].duplicateTo(streamingKey); + streamingKey = batchKeys[i]; ++flushMark; } aggregationBatchInfo.mapAggregationBufferSet(currentStreamingAggregators, i); @@ -759,8 +755,13 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> for (int i = 0; i < flushMark; ++i) { writeSingleRow(keysToFlush[i], rowsToFlush[i]); rowsToFlush[i].reset(); + keysToFlush[i] = null; streamAggregationBufferRowPool.putInPool(rowsToFlush[i]); } + + if (streamingKey != prevKey) { + streamingKey = (VectorHashKeyWrapper) streamingKey.copyKey(); + } } @Override
