added writing custom aggregator
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f64379e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f64379e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f64379e3 Branch: refs/heads/APEXCORE-293 Commit: f64379e362fdef018e258a24c51c715b265d2b4c Parents: 6b79095 Author: Chandni Singh <[email protected]> Authored: Wed Nov 4 19:24:40 2015 -0800 Committer: Thomas Weise <[email protected]> Committed: Sun Feb 28 22:46:36 2016 -0800 ---------------------------------------------------------------------- autometrics/autometrics.md | 78 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 73 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f64379e3/autometrics/autometrics.md ---------------------------------------------------------------------- diff --git a/autometrics/autometrics.md b/autometrics/autometrics.md index 364007f..f449082 100644 --- a/autometrics/autometrics.md +++ b/autometrics/autometrics.md @@ -5,7 +5,7 @@ Apache Apex: AutoMetric in a Nutshell Metrics help to collect some statistical information about a process which can be very useful for diagnosis. Auto Metrics in Apex help to monitor operators in a DAG. The goal of `AutoMetric` API is to enable operator developer to define relevant metrics for an operator in a simple way which the platform collects and reports automatically. # Specifying AutoMetrics in an Operator -An `AutoMetric` can be any object. It can be of a primitive type - int, long, etc. or a complex one. A field or a `get` method in an operator can be annotated with `@AutoMetric` to specify that its value is a metric. After every application window, the platform collects the values of these fields/methods in a map and sends it to application master. +An `AutoMetric` can be any object. It can be of a primitive type - int, long, etc. or a complex one. A field or a `get` method in an operator can be annotated with `@AutoMetric` to specify that its value is a metric. After every application end window, the platform collects the values of these fields/methods in a map and sends it to application master. ```java public class LineReceiver extends BaseOperator @@ -35,10 +35,10 @@ public class LineReceiver extends BaseOperator } ``` -In the above snippet, there are 2 auto-metrics declared in the `LineReceiver`. At the end of each application window, the platform will send a map with 2 entries - `[(length, 100), (count, 10)]` to the application master. +There are 2 auto-metrics declared in the `LineReceiver`. At the end of each application window, the platform will send a map with 2 entries - `[(length, 100), (count, 10)]` to the application master. # Aggregating AutoMetrics across Partitions -When an operator is partitioned, it is useful to aggregate the values of auto-metrics across all its partitions every window to get a logical view of these metrics. +When an operator is partitioned, it is useful to aggregate the values of auto-metrics across all its partitions every window to get a logical view of these metrics. The application master performs these aggregations using metrics aggregators. The AutoMetric API helps to achieve this by providing an interface for writing aggregators- `AutoMetric.Aggregator`. Any implementation of `AutoMetric.Aggregator` can be set as an operator attribute - `METRICS_AGGREGATOR` for a particular operator which in turn is used for aggregating physical metrics. @@ -47,12 +47,80 @@ The AutoMetric API helps to achieve this by providing an interface for writing a `MetricsAggregator` is just a collection of `SingleMetricAggregator`s. There are multiple implementations of `SingleMetricAggregator` that perform sum, min, max, avg which are present in Apex core and Apex malhar. -For the `LineReceiver` operator, the application developer need not specify any aggregator. The platform will automatically inject an instance of `MetricsAggregator` that constitutes of two `LongSumAggregator`s - one for `length` and one for `count`. This aggregator will report sum of length and sum of count across all the partitions of `LineReceiver`. +For the `LineReceiver` operator, the application developer need not specify any aggregator. The platform will automatically inject an instance of `MetricsAggregator` that contains two `LongSumAggregator`s - one for `length` and one for `count`. This aggregator will report sum of length and sum of count across all the partitions of `LineReceiver`. ## Building custom aggregators -Platform cannot perform any meaningful aggregation for a non-numeric metric. In such case, the operator or application developer can write custom aggregators. Letâs say, if the `LineReceiver` was modified to +Platform cannot perform any meaningful aggregations for non-numeric metrics. In such cases, the operator or application developer can write custom aggregators. Letâs say, if the `LineReceiver` was modified to have a complex metric as shown below. +```java +public class AnotherLineReceiver extends BaseOperator +{ + @AutoMetric + final LineMetrics lineMetrics = new LineMetrics(); + + public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() + { + @Override + public void process(String s) + { + lineMetrics.length += s.length(); + lineMetrics.count++; + } + }; + + @Override + public void beginWindow(long windowId) + { + lineMetrics.length = 0; + lineMetrics.count = 0; + } + + public static class LineMetrics implements Serializable + { + int length; + int count; + + private static final long serialVersionUID = 201511041908L; + } +} +``` + +Below is a custom aggregator that can calculate average line length across all partitions of `AnotherLineReceiver`. + +```java +public class AvgLineLengthAggregator implements AutoMetric.Aggregator +{ + + Map<String, Object> result = Maps.newHashMap(); + + @Override + public Map<String, Object> aggregate(long l, Collection<AutoMetric.PhysicalMetricsContext> collection) + { + long totalLength = 0; + long totalCount = 0; + for (AutoMetric.PhysicalMetricsContext pmc : collection) { + AnotherLineReceiver.LineMetrics lm = (AnotherLineReceiver.LineMetrics)pmc.getMetrics().get("lineMetrics"); + totalLength += lm.length; + totalCount += lm.count; + } + result.put("avgLineLength", totalLength/totalCount); + return result; + } +} +``` +An instance of above aggregator can be specified as the `METRIC_AGGREGATOR` for `AnotherLineReceiver` while creating the DAG as shown below. + +``` + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + ... + AnotherLineReceiver lineReceiver = dag.addOperator("LineReceiver", new AnotherLineReceiver()); + dag.setAttribute(lineReceiver, Context.OperatorContext.METRICS_AGGREGATOR, new AvgLineLengthAggregator()); + ... + } +``` # Retrieving AutoMetrics The Gateway REST API provides a way to retrieve the latest AutoMetrics for each logical operator. For example:
