Updated Branches: refs/heads/master 7af77a2f3 -> 251022f84
DRILL-227: implement simple metrics framework Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/82a7b8fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/82a7b8fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/82a7b8fd Branch: refs/heads/master Commit: 82a7b8fd8300696979dbba62f466a15e23fb94ca Parents: 7af77a2 Author: Steven Phillips <[email protected]> Authored: Tue Sep 10 16:01:51 2013 -0700 Committer: Steven Phillips <[email protected]> Committed: Fri Sep 20 12:09:50 2013 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 3 + .../apache/drill/exec/metrics/DrillMetrics.java | 67 ++++++++++++++++++++ .../drill/exec/store/AffinityCalculator.java | 16 ++--- .../exec/store/parquet/ParquetGroupScan.java | 31 +++++++-- .../src/main/resources/drill-module.conf | 9 ++- exec/java-exec/src/test/sh/logback.xml | 4 +- 6 files changed, 113 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 78d8ac6..42abf54 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -34,4 +34,7 @@ public interface ExecConstants { public static final String METRICS_CONTEXT_NAME = "drill.exec.metrics.context"; public static final String FUNCTION_PACKAGES = "drill.exec.functions"; public static final String USE_IP_ADDRESS = "drill.exec.rpc.use.ip"; + public static final String METRICS_JMX_OUTPUT_ENABLED = "drill.exec.metrics.jmx.enabled"; + public static final String METRICS_LOG_OUTPUT_ENABLED = "drill.exec.metrics.log.enabled"; + public static final String METRICS_LOG_OUTPUT_INTERVAL = "drill.exec.metrics.log.interval"; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java new file mode 100644 index 0000000..878c088 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.metrics; + +import com.yammer.metrics.JmxReporter; +import com.yammer.metrics.MetricRegistry; +import com.yammer.metrics.Slf4jReporter; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.client.DrillClient; + +import java.util.concurrent.TimeUnit; + +public class DrillMetrics { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillMetrics.class); + static final DrillConfig config = DrillConfig.create(); + + private DrillMetrics() {} + + private static class RegistryHolder { + public static final MetricRegistry REGISTRY = new MetricRegistry("Drill Metrics"); + private static JmxReporter jmxReporter = getJmxReporter(); + private static Slf4jReporter logReporter = getLogReporter(); + + private static JmxReporter getJmxReporter() { + if (config.getBoolean(ExecConstants.METRICS_JMX_OUTPUT_ENABLED)) { + JmxReporter reporter = JmxReporter.forRegistry(getInstance()).build(); + reporter.start(); + + return reporter; + } else return null; + } + private static Slf4jReporter getLogReporter() { + if(config.getBoolean(ExecConstants.METRICS_LOG_OUTPUT_ENABLED)) { + Slf4jReporter reporter = Slf4jReporter.forRegistry(getInstance()) + .outputTo(logger) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + reporter.start(config.getInt(ExecConstants.METRICS_LOG_OUTPUT_INTERVAL), TimeUnit.SECONDS); + + return reporter; + } else return null; + } + } + + public static MetricRegistry getInstance() { + return RegistryHolder.REGISTRY; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java index 2fffab7..2383463 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java @@ -21,6 +21,9 @@ package org.apache.drill.exec.store; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableRangeMap; import com.google.common.collect.Range; +import com.yammer.metrics.*; +import com.yammer.metrics.Timer; +import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.store.parquet.ParquetGroupScan; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -35,6 +38,8 @@ import java.util.concurrent.TimeUnit; public class AffinityCalculator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class); + static final MetricRegistry metrics = DrillMetrics.getInstance(); + static final String BLOCK_MAP_BUILDER_TIMER = MetricRegistry.name(AffinityCalculator.class, "blockMapBuilderTimer"); HashMap<String,ImmutableRangeMap<Long,BlockLocation>> blockMapMap = new HashMap<>(); @@ -53,19 +58,13 @@ public class AffinityCalculator { * Builds a mapping of block locations to file byte range */ private void buildBlockMap(String fileName) { - Stopwatch watch = new Stopwatch(); + final Timer.Context context = metrics.timer(BLOCK_MAP_BUILDER_TIMER).time(); BlockLocation[] blocks; ImmutableRangeMap<Long,BlockLocation> blockMap; try { - watch.start(); FileStatus file = fs.getFileStatus(new Path(fileName)); blocks = fs.getFileBlockLocations(file, 0 , file.getLen()); - watch.stop(); - logger.debug("Block locations: {}", blocks); - logger.debug("Took {} ms to get Block locations", watch.elapsed(TimeUnit.MILLISECONDS)); } catch (IOException ioe) { throw new RuntimeException(ioe); } - watch.reset(); - watch.start(); ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new ImmutableRangeMap.Builder<Long,BlockLocation>(); for (BlockLocation block : blocks) { long start = block.getOffset(); @@ -74,9 +73,8 @@ public class AffinityCalculator { blockMapBuilder = blockMapBuilder.put(range, block); } blockMap = blockMapBuilder.build(); - watch.stop(); - logger.debug("Took {} ms to build block map", watch.elapsed(TimeUnit.MILLISECONDS)); blockMapMap.put(fileName, blockMap); + context.stop(); } /** * For a given RowGroup, calculate how many bytes are available on each on drillbit endpoint http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index bbfbc73..b789963 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -29,10 +29,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import com.yammer.metrics.Histogram; +import com.yammer.metrics.MetricRegistry; +import com.yammer.metrics.Timer; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.FieldReference; -import org.apache.drill.common.logical.StorageEngineConfig; +import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.ReadEntryFromHDFS; @@ -67,6 +70,12 @@ import com.google.common.collect.Multimap; @JsonTypeName("parquet-scan") public class ParquetGroupScan extends AbstractGroupScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class); + static final MetricRegistry metrics = DrillMetrics.getInstance(); + static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter"); + static final String ENDPOINT_BYTES_TIMER = MetricRegistry.name(ParquetGroupScan.class, "endpointBytes"); + static final String ASSIGNMENT_TIMER = MetricRegistry.name(ParquetGroupScan.class, "applyAssignments"); + static final String ASSIGNMENT_AFFINITY_HIST = MetricRegistry.name(ParquetGroupScan.class, "assignmentAffinity"); + final Histogram assignmentAffinityStats = metrics.histogram(ASSIGNMENT_AFFINITY_HIST); private ArrayListMultimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> mappings; private List<RowGroupInfo> rowGroupInfos; @@ -122,13 +131,17 @@ public class ParquetGroupScan extends AbstractGroupScan { private void readFooter() throws IOException { watch.reset(); watch.start(); + Timer.Context tContext = metrics.timer(READ_FOOTER_TIMER).time(); rowGroupInfos = new ArrayList(); long start = 0, length = 0; ColumnChunkMetaData columnChunkMetaData; for (ReadEntryWithPath readEntryWithPath : entries){ Path path = new Path(readEntryWithPath.getPath()); List<Footer> footers = ParquetFileReader.readFooters(this.storageEngine.getHadoopConfig(), path); - readEntryWithPath.getPath(); + if (footers.size() == 0) { + logger.warn("No footers found"); + } +// readEntryWithPath.getPath(); for (Footer footer : footers) { int index = 0; @@ -150,11 +163,14 @@ public class ParquetGroupScan extends AbstractGroupScan { } } } + Preconditions.checkState(!rowGroupInfos.isEmpty(), "No row groups found"); + tContext.stop(); watch.stop(); logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS)); } private void calculateEndpointBytes() { + Timer.Context tContext = metrics.timer(ENDPOINT_BYTES_TIMER).time(); watch.reset(); watch.start(); AffinityCalculator ac = new AffinityCalculator(fs, availableEndpoints); @@ -163,6 +179,7 @@ public class ParquetGroupScan extends AbstractGroupScan { totalBytes += e.getLength(); } watch.stop(); + tContext.stop(); logger.debug("Took {} ms to calculate EndpointBytes", watch.elapsed(TimeUnit.MILLISECONDS)); } @@ -273,7 +290,9 @@ public class ParquetGroupScan extends AbstractGroupScan { public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) { watch.reset(); watch.start(); - Preconditions.checkArgument(incomingEndpoints.size() <= rowGroupInfos.size()); + final Timer.Context tcontext = metrics.timer(ASSIGNMENT_TIMER).time(); + Preconditions.checkArgument(incomingEndpoints.size() <= rowGroupInfos.size(), String.format("Incoming endpoints %d " + + "is greater than number of row groups %d", incomingEndpoints.size(), rowGroupInfos.size())); mappings = ArrayListMultimap.create(); ArrayList rowGroupList = new ArrayList(rowGroupInfos); List<DrillbitEndpoint> endpointLinkedlist = Lists.newLinkedList(incomingEndpoints); @@ -281,10 +300,11 @@ public class ParquetGroupScan extends AbstractGroupScan { scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff, false); } scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true); + tcontext.stop(); watch.stop(); logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS)); - Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned"); - Preconditions.checkArgument(!rowGroupInfos.isEmpty()); + Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned"); + Preconditions.checkState(!rowGroupInfos.isEmpty()); } public int fragmentPointer = 0; @@ -320,6 +340,7 @@ public class ParquetGroupScan extends AbstractGroupScan { endpointAssignments.put(minorFragmentId, rowGroupInfo.getRowGroupReadEntry()); logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, endpoints.get(minorFragmentId).getAddress()); + assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength()); iter.remove(); fragmentPointer = (minorFragmentId + 1) % endpoints.size(); break; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index c6e04ff..41282e5 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -22,7 +22,14 @@ drill.exec: { packages += "org.apache.drill.exec.store" } metrics : { - context: "drillbit" + context: "drillbit", + jmx: { + enabled : true + }, + log: { + enabled : false, + interval : 60 + } }, zk: { connect: "localhost:2181", http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/82a7b8fd/exec/java-exec/src/test/sh/logback.xml ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/sh/logback.xml b/exec/java-exec/src/test/sh/logback.xml index ec7ea44..8301a25 100644 --- a/exec/java-exec/src/test/sh/logback.xml +++ b/exec/java-exec/src/test/sh/logback.xml @@ -37,8 +37,8 @@ <!-- <appender-ref ref="STDOUT" /> --> </logger> - <logger name="io.netty" additivity="false"> - <level value="info" /> + <logger name="org.apache.drill" additivity="false"> + <level value="debug" /> <appender-ref ref="SOCKET" /> <!-- <appender-ref ref="STDOUT" /> --> </logger>
