Added example code for setting dimensionScheme for ADT
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/c8cc637e Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/c8cc637e Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/c8cc637e Branch: refs/heads/master Commit: c8cc637ea3963e731deba35feb54a0cb5a27b4f1 Parents: c237a24 Author: David Yan <[email protected]> Authored: Thu Nov 5 11:47:06 2015 -0800 Committer: Thomas Weise <[email protected]> Committed: Sun Feb 28 22:46:38 2016 -0800 ---------------------------------------------------------------------- autometrics/autometrics.md | 72 ++++++++++++++++++++++++++++++++--------- 1 file changed, 57 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c8cc637e/autometrics/autometrics.md ---------------------------------------------------------------------- diff --git a/autometrics/autometrics.md b/autometrics/autometrics.md index f1c9a27..2ed0d9e 100644 --- a/autometrics/autometrics.md +++ b/autometrics/autometrics.md @@ -11,10 +11,10 @@ An `AutoMetric` can be any object. It can be of a primitive type - int, long, et public class LineReceiver extends BaseOperator { @AutoMetric - int length; + long length; @AutoMetric - int count; + long count; public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() { @@ -40,7 +40,7 @@ There are 2 auto-metrics declared in the `LineReceiver`. At the end of each appl # Aggregating AutoMetrics across Partitions When an operator is partitioned, it is useful to aggregate the values of auto-metrics across all its partitions every window to get a logical view of these metrics. The application master performs these aggregations using metrics aggregators. -The AutoMetric API helps to achieve this by providing an interface for writing aggregators- `AutoMetric.Aggregator`. Any implementation of `AutoMetric.Aggregator` can be set as an operator attribute - `METRICS_AGGREGATOR` for a particular operator which in turn is used for aggregating physical metrics. +The AutoMetric API helps to achieve this by providing an interface for writing aggregators- `AutoMetric.Aggregator`. Any implementation of `AutoMetric.Aggregator` can be set as an operator attribute - `METRICS_AGGREGATOR` for a particular operator which in turn is used for aggregating physical metrics. ## Default aggregators [`MetricsAggregator`](https://github.com/apache/incubator-apex-core/blob/devel-3/common/src/main/java/com/datatorrent/common/metric/MetricsAggregator.java) is a simple implementation of `AutoMetric.Aggregator` that platform uses as a default for summing up primitive types - int, long, float and double. @@ -78,8 +78,8 @@ public class AnotherLineReceiver extends BaseOperator public static class LineMetrics implements Serializable { - int length; - int count; + long length; + long count; private static final long serialVersionUID = 201511041908L; } @@ -111,7 +111,7 @@ public class AvgLineLengthAggregator implements AutoMetric.Aggregator ``` An instance of above aggregator can be specified as the `METRIC_AGGREGATOR` for `AnotherLineReceiver` while creating the DAG as shown below. -``` +```java @Override public void populateDAG(DAG dag, Configuration configuration) { @@ -130,10 +130,10 @@ GET /ws/v2/applications/{appid}/logicalPlan/operators/{opName} { ... "autoMetrics": { - "count": "71314", + "count": "71314", "length": "27780706" }, - "className": "com.datatorrent.autometric.LineReceiver", + "className": "com.datatorrent.autometric.LineReceiver", ... } ``` @@ -156,13 +156,13 @@ The Gateway REST API provides a way to retrieve the latest values for all of the GET /ws/v2/applications/{appid}/logicalPlan/operators/{opName} { ... - "cpuPercentageMA": "{cpuPercentageMA}", - "failureCount": "{failureCount}", + "cpuPercentageMA": "{cpuPercentageMA}", + "failureCount": "{failureCount}", "latencyMA": "{latencyMA}", - "totalTuplesEmitted": "{totalTuplesEmitted}", - "totalTuplesProcessed": "{totalTuplesProcessed}", - "tuplesEmittedPSMA": "{tuplesEmittedPSMA}", - "tuplesProcessedPSMA": "{tuplesProcessedPSMA}", + "totalTuplesEmitted": "{totalTuplesEmitted}", + "totalTuplesProcessed": "{totalTuplesProcessed}", + "tuplesEmittedPSMA": "{tuplesEmittedPSMA}", + "tuplesProcessedPSMA": "{tuplesProcessedPSMA}", ... } ``` @@ -222,7 +222,7 @@ public class AggregatorIIRAVG extends AbstractIncrementalAggregator double[] destVals = dest.getAggregates().getFieldsDouble(); double[] srcVals = src.getAggregates().getFieldsDouble(); - for(int index = 0; index < destLongs.length; index++) { + for (int index = 0; index < destLongs.length; index++) { destVals[index] = .5 * destVals[index] + .5 * srcVals[index]; } } @@ -251,6 +251,48 @@ AppDataTracker searches for custom aggregator jars under the following directori It uses reflection to find all the classes that extend from `IncrementalAggregator` and `OTFAggregator` in these jars and registers them with the name provided by `@Name` annotation (or class name when `@Name` is absent). +# Using `METRICS_DIMENSIONS_SCHEME` + +Here is a sample code snippet on how you can make use of `METRICS_DIMENSIONS_SCHEME` to set your own time buckets and your own set of aggregators for certain `AutoMetric`s performed by the App Data Tracker in your application. + +```java + public void populateDAG(DAG dag, Configuration configuration) + { + ... + LineReceiver lineReceiver = dag.addOperator("LineReceiver", new LineReceiver()); + ... + AutoMetric.DimensionsScheme dimensionsScheme = new AutoMetric.DimensionsScheme() + { + String[] timeBuckets = new String[] { "1s", "1m", "1h" }; + String[] lengthAggregators = new String[] { "IIRAVG", "SUM" }; + String[] countAggregators = new String[] { "SUM" }; + + /* Setting the aggregation time bucket to be one second, one minute and one hour */ + @Override + public String[] getTimeBuckets() + { + return timeBuckets; + } + + @Override + public String[] getDimensionAggregationsFor(String logicalMetricName) + { + if ("length".equals(logicalMetricName)) { + return lengthAggregators; + } else if ("count".equals(logicalMetricName)) { + return countAggregators; + } else { + return null; // use default + } + } + }; + + dag.setAttribute(lineReceiver, OperatorContext.METRICS_DIMENSIONS_SCHEME, dimensionsScheme); + ... + } +``` + + # Dashboards With App Data Tracker enabled, you can visualize the AutoMetrics and system metrics in the Dashboards within dtManage. Refer back to the diagram in the App Data Tracker section, dtGateway relays queries and query results to and from the App Data Tracker. In this way, dtManage sends queries and receives results from the App Data Tracker via dtGateway and uses the results to let the user visualize the data.
