Enhancements to capture HashAggregate and HashJoin runtime metrics.

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/28992889
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/28992889
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/28992889

Branch: refs/heads/master
Commit: 28992889090c000104bd549230f8a7fe4ee0558c
Parents: e170cf0
Author: Aman Sinha <[email protected]>
Authored: Sat Jun 14 23:40:57 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Tue Jun 17 16:31:27 2014 -0700

----------------------------------------------------------------------
 .../physical/impl/aggregate/HashAggBatch.java   |  3 +-
 .../impl/aggregate/HashAggTemplate.java         | 21 +++++++++++--
 .../physical/impl/aggregate/HashAggregator.java |  4 ++-
 .../exec/physical/impl/common/HashTable.java    |  2 ++
 .../physical/impl/common/HashTableMetrics.java  | 33 ++++++++++++++++++++
 .../physical/impl/common/HashTableStats.java    | 30 ++++++++++++++++++
 .../physical/impl/common/HashTableTemplate.java | 15 +++++++++
 .../exec/physical/impl/join/HashJoinBatch.java  | 16 +++++++++-
 .../org/apache/drill/TestExampleQueries.java    |  2 +-
 9 files changed, 120 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index ad929a4..8250682 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -220,7 +220,8 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
     container.buildSchema(SelectionVectorMode.NONE);
     HashAggregator agg = context.getImplementationClass(top);
 
-    agg.setup(popConfig, context, oContext.getAllocator(), incoming, this,
+    agg.setup(popConfig, context, this.stats,
+              oContext.getAllocator(), incoming, this,
               aggrExprs,
               cgInner.getWorkspaceTypes(),
               groupByOutFieldIds,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index f73d46c..72095b7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import javax.inject.Named;
@@ -37,10 +36,13 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.IntHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.physical.impl.common.HashTableMetrics;
+import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import 
org.apache.drill.exec.physical.impl.common.HashTableTemplate.BatchHolder;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
@@ -95,6 +97,9 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   private boolean allFlushed = false;
   private boolean  buildComplete = false;
 
+  private OperatorStats stats = null;
+  private HashTableStats htStats = new HashTableStats();
+
   public class BatchHolder {
 
     private VectorContainer aggrValuesContainer; // container for aggr values 
(workspace variables)
@@ -166,7 +171,9 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
 
   @Override
-  public void setup(HashAggregate hashAggrConfig, FragmentContext context, 
BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing,
+  public void setup(HashAggregate hashAggrConfig, FragmentContext context, 
+                    OperatorStats stats,
+                    BufferAllocator allocator, RecordBatch incoming, 
HashAggBatch outgoing,
                     LogicalExpression[] valueExprs,
                     List<TypedFieldId> valueFieldIds,
                     TypedFieldId[] groupByOutFieldIds,
@@ -181,6 +188,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     }
 
     this.context = context;
+    this.stats = stats;
     this.allocator = allocator;
     this.incoming = incoming;
     this.schema = incoming.getSchema();
@@ -276,6 +284,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
               
               buildComplete = true;
               
+              updateStats(htable);
+
               // output the first batch; remaining batches will be output 
               // in response to each next() call by a downstream operator
               
@@ -534,6 +544,13 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
     return false;
   }
+  
+  private void updateStats(HashTable htable) {
+    htable.getStats(htStats);
+    this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_BUCKETS, 
htStats.numBuckets);
+    this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_ENTRIES, 
htStats.numEntries);
+    this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, 
htStats.numResizing);      
+  }
 
   // Code-generated methods (implemented in HashAggBatch)
   public abstract void doSetup(@Named("incoming") RecordBatch incoming);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 641d377..d14880c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -27,6 +27,7 @@ import 
org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
@@ -41,7 +42,8 @@ public interface HashAggregator {
     RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR
          }
   
-  public abstract void setup(HashAggregate hashAggrConfig, FragmentContext 
context, BufferAllocator allocator, RecordBatch incoming,
+  public abstract void setup(HashAggregate hashAggrConfig, FragmentContext 
context, 
+                             OperatorStats stats, BufferAllocator allocator, 
RecordBatch incoming,
                              HashAggBatch outgoing, LogicalExpression[] 
valueExprs, 
                              List<TypedFieldId> valueFieldIds,
                              TypedFieldId[] keyFieldIds,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 46cb47d..429ec63 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -52,6 +52,8 @@ public interface HashTable {
   
   public int containsKey(int incomingRowIdx, boolean isProbe);
 
+  public void getStats(HashTableStats stats);
+
   public int size();
 
   public boolean isEmpty();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java
new file mode 100644
index 0000000..ee84855
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import org.apache.drill.exec.ops.MetricDef;
+
+public enum HashTableMetrics implements MetricDef {
+
+  HTABLE_NUM_BUCKETS,
+  HTABLE_NUM_ENTRIES,
+  HTABLE_NUM_RESIZING;
+
+  @Override
+  public int metricId() {
+    return ordinal();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
new file mode 100644
index 0000000..848d860
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+
+public class HashTableStats {
+  public int numBuckets;
+  public int numEntries;
+  public int numResizing;
+
+  public HashTableStats() {
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index a8af5ea..f7cadf1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -88,6 +88,8 @@ public abstract class HashTableTemplate implements HashTable {
 
   private MaterializedField dummyIntField;
 
+  private int numResizing = 0;
+  
   // This class encapsulates the links, keys and values for up to BATCH_SIZE
   // *unique* records. Thus, suppose there are N incoming record batches, each
   // of size BATCH_SIZE..but they have M unique keys altogether, the number of
@@ -363,10 +365,21 @@ public abstract class HashTableTemplate implements 
HashTable {
     return startIndices.getAccessor().getValueCount();
   }
 
+  public int numResizing() { 
+    return numResizing;  
+  }
+
   public int size() {
     return numEntries;
   }
 
+  public void getStats(HashTableStats stats) { 
+    assert stats != null; 
+    stats.numBuckets = numBuckets();
+    stats.numEntries = numEntries;
+    stats.numResizing = numResizing;
+  }
+
   public boolean isEmpty() {
     return numEntries == 0;
   }
@@ -594,6 +607,8 @@ public abstract class HashTableTemplate implements 
HashTable {
         bh.dump(idx);
       }
     }
+
+    numResizing++;
   }
 
   /* 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index ddc31ee..9343912 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -41,6 +41,8 @@ import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.physical.impl.common.HashTableMetrics;
+import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
 import org.apache.drill.exec.record.AbstractRecordBatch;
@@ -132,6 +134,8 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
     boolean firstOutputBatch = true;
 
     IterOutcome leftUpstream = IterOutcome.NONE;
+    
+    private HashTableStats htStats = new HashTableStats();
 
     @Override
     public int getRecordCount() {
@@ -161,10 +165,13 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                 // Build the hash table, using the build side record batches.
                 executeBuildPhase();
 
+                // Update the hash table related stats for the operator
+                updateStats(this.hashTable);
+
                 // Create the run time generated code needed to probe and 
project
                 hashJoinProbe = setupHashJoinProbe();
             }
-
+                        
             // Store the number of records projected
             if (hashTable != null 
                 || joinType != JoinRelType.INNER) {
@@ -432,6 +439,13 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
         this.conditions = popConfig.getConditions();
     }
 
+    private void updateStats(HashTable htable) {
+      htable.getStats(htStats);
+      this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_BUCKETS, 
htStats.numBuckets);
+      this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_ENTRIES, 
htStats.numEntries);
+      this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, 
htStats.numResizing);  
+    }
+    
     @Override
     public void killIncoming() {
         this.left.kill();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28992889/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 59556b3..1d6ca33 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -202,5 +202,5 @@ public class TestExampleQueries extends BaseTestQuery{
     test("select r_name from cp.`tpch/region.parquet` order by r_name, 
r_regionkey");  
     test("select cast(r_name as varchar(20)) from cp.`tpch/region.parquet` 
order by r_name");
   }
-  
+
 }

Reply via email to