Enhancements to capture HashAggregate and HashJoin runtime metrics.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/28992889 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/28992889 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/28992889 Branch: refs/heads/master Commit: 28992889090c000104bd549230f8a7fe4ee0558c Parents: e170cf0 Author: Aman Sinha <[email protected]> Authored: Sat Jun 14 23:40:57 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Tue Jun 17 16:31:27 2014 -0700 ---------------------------------------------------------------------- .../physical/impl/aggregate/HashAggBatch.java | 3 +- .../impl/aggregate/HashAggTemplate.java | 21 +++++++++++-- .../physical/impl/aggregate/HashAggregator.java | 4 ++- .../exec/physical/impl/common/HashTable.java | 2 ++ .../physical/impl/common/HashTableMetrics.java | 33 ++++++++++++++++++++ .../physical/impl/common/HashTableStats.java | 30 ++++++++++++++++++ .../physical/impl/common/HashTableTemplate.java | 15 +++++++++ .../exec/physical/impl/join/HashJoinBatch.java | 16 +++++++++- .../org/apache/drill/TestExampleQueries.java | 2 +- 9 files changed, 120 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index ad929a4..8250682 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -220,7 +220,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { container.buildSchema(SelectionVectorMode.NONE); HashAggregator agg = context.getImplementationClass(top); - agg.setup(popConfig, context, oContext.getAllocator(), incoming, this, + agg.setup(popConfig, context, this.stats, + oContext.getAllocator(), incoming, this, aggrExprs, cgInner.getWorkspaceTypes(), groupByOutFieldIds, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index f73d46c..72095b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import javax.inject.Named; @@ -37,10 +36,13 @@ import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.holders.IntHolder; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.common.ChainedHashTable; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; +import org.apache.drill.exec.physical.impl.common.HashTableMetrics; +import org.apache.drill.exec.physical.impl.common.HashTableStats; import org.apache.drill.exec.physical.impl.common.HashTableTemplate.BatchHolder; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; @@ -95,6 +97,9 @@ public abstract class HashAggTemplate implements HashAggregator { private boolean allFlushed = false; private boolean buildComplete = false; + private OperatorStats stats = null; + private HashTableStats htStats = new HashTableStats(); + public class BatchHolder { private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables) @@ -166,7 +171,9 @@ public abstract class HashAggTemplate implements HashAggregator { @Override - public void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing, + public void setup(HashAggregate hashAggrConfig, FragmentContext context, + OperatorStats stats, + BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, @@ -181,6 +188,7 @@ public abstract class HashAggTemplate implements HashAggregator { } this.context = context; + this.stats = stats; this.allocator = allocator; this.incoming = incoming; this.schema = incoming.getSchema(); @@ -276,6 +284,8 @@ public abstract class HashAggTemplate implements HashAggregator { buildComplete = true; + updateStats(htable); + // output the first batch; remaining batches will be output // in response to each next() call by a downstream operator @@ -534,6 +544,13 @@ public abstract class HashAggTemplate implements HashAggregator { return false; } + + private void updateStats(HashTable htable) { + htable.getStats(htStats); + this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_BUCKETS, htStats.numBuckets); + this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_ENTRIES, htStats.numEntries); + this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, htStats.numResizing); + } // Code-generated methods (implemented in HashAggBatch) public abstract void doSetup(@Named("incoming") RecordBatch incoming); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index 641d377..d14880c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -27,6 +27,7 @@ import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; @@ -41,7 +42,8 @@ public interface HashAggregator { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR } - public abstract void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, + public abstract void setup(HashAggregate hashAggrConfig, FragmentContext context, + OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index 46cb47d..429ec63 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -52,6 +52,8 @@ public interface HashTable { public int containsKey(int incomingRowIdx, boolean isProbe); + public void getStats(HashTableStats stats); + public int size(); public boolean isEmpty(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java new file mode 100644 index 0000000..ee84855 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.common; + +import org.apache.drill.exec.ops.MetricDef; + +public enum HashTableMetrics implements MetricDef { + + HTABLE_NUM_BUCKETS, + HTABLE_NUM_ENTRIES, + HTABLE_NUM_RESIZING; + + @Override + public int metricId() { + return ordinal(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java new file mode 100644 index 0000000..848d860 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.common; + + +public class HashTableStats { + public int numBuckets; + public int numEntries; + public int numResizing; + + public HashTableStats() { + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index a8af5ea..f7cadf1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -88,6 +88,8 @@ public abstract class HashTableTemplate implements HashTable { private MaterializedField dummyIntField; + private int numResizing = 0; + // This class encapsulates the links, keys and values for up to BATCH_SIZE // *unique* records. Thus, suppose there are N incoming record batches, each // of size BATCH_SIZE..but they have M unique keys altogether, the number of @@ -363,10 +365,21 @@ public abstract class HashTableTemplate implements HashTable { return startIndices.getAccessor().getValueCount(); } + public int numResizing() { + return numResizing; + } + public int size() { return numEntries; } + public void getStats(HashTableStats stats) { + assert stats != null; + stats.numBuckets = numBuckets(); + stats.numEntries = numEntries; + stats.numResizing = numResizing; + } + public boolean isEmpty() { return numEntries == 0; } @@ -594,6 +607,8 @@ public abstract class HashTableTemplate implements HashTable { bh.dump(idx); } } + + numResizing++; } /* http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index ddc31ee..9343912 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -41,6 +41,8 @@ import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.impl.common.ChainedHashTable; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; +import org.apache.drill.exec.physical.impl.common.HashTableMetrics; +import org.apache.drill.exec.physical.impl.common.HashTableStats; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch; import org.apache.drill.exec.record.AbstractRecordBatch; @@ -132,6 +134,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { boolean firstOutputBatch = true; IterOutcome leftUpstream = IterOutcome.NONE; + + private HashTableStats htStats = new HashTableStats(); @Override public int getRecordCount() { @@ -161,10 +165,13 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { // Build the hash table, using the build side record batches. executeBuildPhase(); + // Update the hash table related stats for the operator + updateStats(this.hashTable); + // Create the run time generated code needed to probe and project hashJoinProbe = setupHashJoinProbe(); } - + // Store the number of records projected if (hashTable != null || joinType != JoinRelType.INNER) { @@ -432,6 +439,13 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { this.conditions = popConfig.getConditions(); } + private void updateStats(HashTable htable) { + htable.getStats(htStats); + this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_BUCKETS, htStats.numBuckets); + this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_ENTRIES, htStats.numEntries); + this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, htStats.numResizing); + } + @Override public void killIncoming() { this.left.kill(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 59556b3..1d6ca33 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -202,5 +202,5 @@ public class TestExampleQueries extends BaseTestQuery{ test("select r_name from cp.`tpch/region.parquet` order by r_name, r_regionkey"); test("select cast(r_name as varchar(20)) from cp.`tpch/region.parquet` order by r_name"); } - + }
