Updated Branches:
  refs/heads/master 7af77a2f3 -> 251022f84

DRILL-227: implement simple metrics framework


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/82a7b8fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/82a7b8fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/82a7b8fd

Branch: refs/heads/master
Commit: 82a7b8fd8300696979dbba62f466a15e23fb94ca
Parents: 7af77a2
Author: Steven Phillips <[email protected]>
Authored: Tue Sep 10 16:01:51 2013 -0700
Committer: Steven Phillips <[email protected]>
Committed: Fri Sep 20 12:09:50 2013 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  3 +
 .../apache/drill/exec/metrics/DrillMetrics.java | 67 ++++++++++++++++++++
 .../drill/exec/store/AffinityCalculator.java    | 16 ++---
 .../exec/store/parquet/ParquetGroupScan.java    | 31 +++++++--
 .../src/main/resources/drill-module.conf        |  9 ++-
 exec/java-exec/src/test/sh/logback.xml          |  4 +-
 6 files changed, 113 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 78d8ac6..42abf54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -34,4 +34,7 @@ public interface ExecConstants {
   public static final String METRICS_CONTEXT_NAME = 
"drill.exec.metrics.context";
   public static final String FUNCTION_PACKAGES = "drill.exec.functions";
   public static final String USE_IP_ADDRESS = "drill.exec.rpc.use.ip";
+  public static final String METRICS_JMX_OUTPUT_ENABLED = 
"drill.exec.metrics.jmx.enabled";
+  public static final String METRICS_LOG_OUTPUT_ENABLED = 
"drill.exec.metrics.log.enabled";
+  public static final String METRICS_LOG_OUTPUT_INTERVAL = 
"drill.exec.metrics.log.interval";
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
new file mode 100644
index 0000000..878c088
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.metrics;
+
+import com.yammer.metrics.JmxReporter;
+import com.yammer.metrics.MetricRegistry;
+import com.yammer.metrics.Slf4jReporter;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+
+import java.util.concurrent.TimeUnit;
+
+public class DrillMetrics {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillMetrics.class);
+  static final DrillConfig config = DrillConfig.create();
+
+  private DrillMetrics() {}
+
+  private static class RegistryHolder {
+    public static final MetricRegistry REGISTRY = new MetricRegistry("Drill 
Metrics");
+    private static JmxReporter jmxReporter = getJmxReporter();
+    private static Slf4jReporter logReporter = getLogReporter();
+
+    private static JmxReporter getJmxReporter() {
+      if (config.getBoolean(ExecConstants.METRICS_JMX_OUTPUT_ENABLED)) {
+        JmxReporter reporter = JmxReporter.forRegistry(getInstance()).build();
+        reporter.start();
+
+        return reporter;
+      } else return null;
+    }
+     private static Slf4jReporter getLogReporter() {
+       if(config.getBoolean(ExecConstants.METRICS_LOG_OUTPUT_ENABLED)) {
+         Slf4jReporter reporter = Slf4jReporter.forRegistry(getInstance())
+                 .outputTo(logger)
+                 .convertRatesTo(TimeUnit.SECONDS)
+                 .convertDurationsTo(TimeUnit.MILLISECONDS)
+                 .build();
+         
reporter.start(config.getInt(ExecConstants.METRICS_LOG_OUTPUT_INTERVAL), 
TimeUnit.SECONDS);
+
+         return reporter;
+       } else return null;
+     }
+  }
+
+  public static MetricRegistry getInstance() {
+    return RegistryHolder.REGISTRY;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
index 2fffab7..2383463 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
@@ -21,6 +21,9 @@ package org.apache.drill.exec.store;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableRangeMap;
 import com.google.common.collect.Range;
+import com.yammer.metrics.*;
+import com.yammer.metrics.Timer;
+import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.store.parquet.ParquetGroupScan;
 
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -35,6 +38,8 @@ import java.util.concurrent.TimeUnit;
 
 public class AffinityCalculator {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
+  static final MetricRegistry metrics = DrillMetrics.getInstance();
+  static final String BLOCK_MAP_BUILDER_TIMER = 
MetricRegistry.name(AffinityCalculator.class, "blockMapBuilderTimer");
 
 
   HashMap<String,ImmutableRangeMap<Long,BlockLocation>> blockMapMap = new 
HashMap<>();
@@ -53,19 +58,13 @@ public class AffinityCalculator {
    * Builds a mapping of block locations to file byte range
    */
   private void buildBlockMap(String fileName) {
-    Stopwatch watch = new Stopwatch();
+    final Timer.Context context = 
metrics.timer(BLOCK_MAP_BUILDER_TIMER).time();
     BlockLocation[] blocks;
     ImmutableRangeMap<Long,BlockLocation> blockMap;
     try {
-      watch.start();
       FileStatus file = fs.getFileStatus(new Path(fileName));
       blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
-      watch.stop();
-      logger.debug("Block locations: {}", blocks);
-      logger.debug("Took {} ms to get Block locations", 
watch.elapsed(TimeUnit.MILLISECONDS));
     } catch (IOException ioe) { throw new RuntimeException(ioe); }
-    watch.reset();
-    watch.start();
     ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new 
ImmutableRangeMap.Builder<Long,BlockLocation>();
     for (BlockLocation block : blocks) {
       long start = block.getOffset();
@@ -74,9 +73,8 @@ public class AffinityCalculator {
       blockMapBuilder = blockMapBuilder.put(range, block);
     }
     blockMap = blockMapBuilder.build();
-    watch.stop();
-    logger.debug("Took {} ms to build block map", 
watch.elapsed(TimeUnit.MILLISECONDS));
     blockMapMap.put(fileName, blockMap);
+    context.stop();
   }
   /**
    * For a given RowGroup, calculate how many bytes are available on each on 
drillbit endpoint

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index bbfbc73..b789963 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -29,10 +29,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import com.yammer.metrics.Histogram;
+import com.yammer.metrics.MetricRegistry;
+import com.yammer.metrics.Timer;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.ReadEntryFromHDFS;
@@ -67,6 +70,12 @@ import com.google.common.collect.Multimap;
 @JsonTypeName("parquet-scan")
 public class ParquetGroupScan extends AbstractGroupScan {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
+  static final MetricRegistry metrics = DrillMetrics.getInstance();
+  static final String READ_FOOTER_TIMER = 
MetricRegistry.name(ParquetGroupScan.class, "readFooter");
+  static final String ENDPOINT_BYTES_TIMER = 
MetricRegistry.name(ParquetGroupScan.class, "endpointBytes");
+  static final String ASSIGNMENT_TIMER = 
MetricRegistry.name(ParquetGroupScan.class, "applyAssignments");
+  static final String ASSIGNMENT_AFFINITY_HIST = 
MetricRegistry.name(ParquetGroupScan.class, "assignmentAffinity");
+  final Histogram assignmentAffinityStats = 
metrics.histogram(ASSIGNMENT_AFFINITY_HIST);
 
   private ArrayListMultimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> 
mappings;
   private List<RowGroupInfo> rowGroupInfos;
@@ -122,13 +131,17 @@ public class ParquetGroupScan extends AbstractGroupScan {
   private void readFooter() throws IOException {
     watch.reset();
     watch.start();
+    Timer.Context tContext = metrics.timer(READ_FOOTER_TIMER).time();
     rowGroupInfos = new ArrayList();
     long start = 0, length = 0;
     ColumnChunkMetaData columnChunkMetaData;
     for (ReadEntryWithPath readEntryWithPath : entries){
       Path path = new Path(readEntryWithPath.getPath());
       List<Footer> footers = 
ParquetFileReader.readFooters(this.storageEngine.getHadoopConfig(), path);
-      readEntryWithPath.getPath();
+      if (footers.size() == 0) {
+        logger.warn("No footers found");
+      }
+//      readEntryWithPath.getPath();
 
       for (Footer footer : footers) {
         int index = 0;
@@ -150,11 +163,14 @@ public class ParquetGroupScan extends AbstractGroupScan {
         }
       }
     }
+    Preconditions.checkState(!rowGroupInfos.isEmpty(), "No row groups found");
+    tContext.stop();
     watch.stop();
     logger.debug("Took {} ms to get row group infos", 
watch.elapsed(TimeUnit.MILLISECONDS));
   }
 
   private void calculateEndpointBytes() {
+    Timer.Context tContext = metrics.timer(ENDPOINT_BYTES_TIMER).time();
     watch.reset();
     watch.start();
     AffinityCalculator ac = new AffinityCalculator(fs, availableEndpoints);
@@ -163,6 +179,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
       totalBytes += e.getLength();
     }
     watch.stop();
+    tContext.stop();
     logger.debug("Took {} ms to calculate EndpointBytes", 
watch.elapsed(TimeUnit.MILLISECONDS));
   }
 
@@ -273,7 +290,9 @@ public class ParquetGroupScan extends AbstractGroupScan {
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
     watch.reset();
     watch.start();
-    Preconditions.checkArgument(incomingEndpoints.size() <= 
rowGroupInfos.size());
+    final Timer.Context tcontext = metrics.timer(ASSIGNMENT_TIMER).time();
+    Preconditions.checkArgument(incomingEndpoints.size() <= 
rowGroupInfos.size(), String.format("Incoming endpoints %d " +
+            "is greater than number of row groups %d", 
incomingEndpoints.size(), rowGroupInfos.size()));
     mappings = ArrayListMultimap.create();
     ArrayList rowGroupList = new ArrayList(rowGroupInfos);
     List<DrillbitEndpoint> endpointLinkedlist = 
Lists.newLinkedList(incomingEndpoints);
@@ -281,10 +300,11 @@ public class ParquetGroupScan extends AbstractGroupScan {
       scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff, false);
     }
     scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
+    tcontext.stop();
     watch.stop();
     logger.debug("Took {} ms to apply assignments", 
watch.elapsed(TimeUnit.MILLISECONDS));
-    Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries 
should be assigned by now, but some are still unassigned");
-    Preconditions.checkArgument(!rowGroupInfos.isEmpty());
+    Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should 
be assigned by now, but some are still unassigned");
+    Preconditions.checkState(!rowGroupInfos.isEmpty());
   }
 
   public int fragmentPointer = 0;
@@ -320,6 +340,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
 
           endpointAssignments.put(minorFragmentId, 
rowGroupInfo.getRowGroupReadEntry());
           logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint 
{}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, 
endpoints.get(minorFragmentId).getAddress());
+          assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) 
/ rowGroupInfo.getLength());
           iter.remove();
           fragmentPointer = (minorFragmentId + 1) % endpoints.size();
           break;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index c6e04ff..41282e5 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -22,7 +22,14 @@ drill.exec: {
     packages += "org.apache.drill.exec.store"  
   }
   metrics : { 
-    context: "drillbit"
+    context: "drillbit",
+    jmx: {
+      enabled : true
+    },
+    log: {
+      enabled : false,
+      interval : 60
+    }
   },
   zk: {
        connect: "localhost:2181",

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/test/sh/logback.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/sh/logback.xml 
b/exec/java-exec/src/test/sh/logback.xml
index ec7ea44..8301a25 100644
--- a/exec/java-exec/src/test/sh/logback.xml
+++ b/exec/java-exec/src/test/sh/logback.xml
@@ -37,8 +37,8 @@
 <!--     <appender-ref ref="STDOUT" /> -->
        </logger>
   
-  <logger name="io.netty" additivity="false">
-    <level value="info" />
+  <logger name="org.apache.drill" additivity="false">
+    <level value="debug" />
     <appender-ref ref="SOCKET" />
 <!--     <appender-ref ref="STDOUT" /> -->
   </logger>

Reply via email to