added writing custom aggregator

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/aa989b28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/aa989b28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/aa989b28

Branch: refs/heads/APEXCORE-293
Commit: aa989b28dc7213a6e64f87df7ac9f4fa37205781
Parents: 324d28c
Author: Chandni Singh <[email protected]>
Authored: Wed Nov 4 19:24:40 2015 -0800
Committer: Thomas Weise <[email protected]>
Committed: Sun Feb 28 22:46:36 2016 -0800

----------------------------------------------------------------------
 autometrics/autometrics.md | 78 ++++++++++++++++++++++++++++++++++++++---
 1 file changed, 73 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/aa989b28/autometrics/autometrics.md
----------------------------------------------------------------------
diff --git a/autometrics/autometrics.md b/autometrics/autometrics.md
index 364007f..f449082 100644
--- a/autometrics/autometrics.md
+++ b/autometrics/autometrics.md
@@ -5,7 +5,7 @@ Apache Apex: AutoMetric in a Nutshell
 Metrics help to collect some statistical information about a process which can 
be very useful for diagnosis. Auto Metrics in Apex help to monitor operators in 
a DAG. The goal of `AutoMetric` API is to enable operator developer to define 
relevant metrics for an operator in a simple way which the platform collects 
and reports automatically.
 
 # Specifying AutoMetrics in an Operator
-An `AutoMetric` can be any object. It can be of a primitive type - int, long, 
etc. or a complex one. A field or a `get` method in an operator can be 
annotated with `@AutoMetric` to specify that its value is a metric. After every 
application window, the platform collects the values of these fields/methods in 
a map and sends it to application master.
+An `AutoMetric` can be any object. It can be of a primitive type - int, long, 
etc. or a complex one. A field or a `get` method in an operator can be 
annotated with `@AutoMetric` to specify that its value is a metric. After every 
application end window, the platform collects the values of these 
fields/methods in a map and sends it to application master.
 
 ```java
 public class LineReceiver extends BaseOperator
@@ -35,10 +35,10 @@ public class LineReceiver extends BaseOperator
 }
 ```
 
-In the above snippet, there are 2 auto-metrics declared in the `LineReceiver`. 
At the end of each application window, the platform will send a map with 2 
entries - `[(length, 100), (count, 10)]` to the application master.
+There are 2 auto-metrics declared in the `LineReceiver`. At the end of each 
application window, the platform will send a map with 2 entries - `[(length, 
100), (count, 10)]` to the application master.
 
 # Aggregating AutoMetrics across Partitions
-When an operator is partitioned, it is useful to aggregate the values of 
auto-metrics across all its partitions every window to get a logical view of 
these metrics. 
+When an operator is partitioned, it is useful to aggregate the values of 
auto-metrics across all its partitions every window to get a logical view of 
these metrics. The application master performs these aggregations using metrics 
aggregators.
 
 The AutoMetric API helps to achieve this by providing an interface for writing 
aggregators- `AutoMetric.Aggregator`. Any implementation of 
`AutoMetric.Aggregator` can be set as an operator attribute - 
`METRICS_AGGREGATOR` for a particular operator which in turn is used for 
aggregating physical metrics. 
 
@@ -47,12 +47,80 @@ The AutoMetric API helps to achieve this by providing an 
interface for writing a
 
 `MetricsAggregator` is just a collection of `SingleMetricAggregator`s. There 
are multiple implementations of `SingleMetricAggregator` that perform sum, min, 
max, avg which are present in Apex core and Apex malhar.
 
-For the `LineReceiver` operator, the application developer need not specify 
any aggregator. The platform will automatically inject an instance of 
`MetricsAggregator` that constitutes of two `LongSumAggregator`s - one for 
`length` and one for `count`. This aggregator will report sum of length and sum 
of count across all the partitions of `LineReceiver`.
+For the `LineReceiver` operator, the application developer need not specify 
any aggregator. The platform will automatically inject an instance of 
`MetricsAggregator` that contains two `LongSumAggregator`s - one for `length` 
and one for `count`. This aggregator will report sum of length and sum of count 
across all the partitions of `LineReceiver`.
 
 
 ## Building custom aggregators
-Platform cannot perform any meaningful aggregation for a non-numeric metric. 
In such case, the operator or application developer can write custom 
aggregators. Let’s say, if the `LineReceiver` was modified to 
+Platform cannot perform any meaningful aggregations for non-numeric metrics. 
In such cases, the operator or application developer can write custom 
aggregators. Let’s say, if the `LineReceiver` was modified to have a complex 
metric as shown below.
 
+```java
+public class AnotherLineReceiver extends BaseOperator
+{
+  @AutoMetric
+  final LineMetrics lineMetrics = new LineMetrics();
+
+  public final transient DefaultInputPort<String> input = new 
DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String s)
+    {
+      lineMetrics.length += s.length();
+      lineMetrics.count++;
+    }
+  };
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    lineMetrics.length = 0;
+    lineMetrics.count = 0;
+  }
+
+  public static class LineMetrics implements Serializable
+  {
+    int length;
+    int count;
+
+    private static final long serialVersionUID = 201511041908L;
+  }
+}
+```
+
+Below is a custom aggregator that can calculate average line length across all 
partitions of `AnotherLineReceiver`.
+
+```java
+public class AvgLineLengthAggregator implements AutoMetric.Aggregator
+{
+
+  Map<String, Object> result = Maps.newHashMap();
+
+  @Override
+  public Map<String, Object> aggregate(long l, 
Collection<AutoMetric.PhysicalMetricsContext> collection)
+  {
+    long totalLength = 0;
+    long totalCount = 0;
+    for (AutoMetric.PhysicalMetricsContext pmc : collection) {
+      AnotherLineReceiver.LineMetrics lm = 
(AnotherLineReceiver.LineMetrics)pmc.getMetrics().get("lineMetrics");
+      totalLength += lm.length;
+      totalCount += lm.count;
+    }
+    result.put("avgLineLength", totalLength/totalCount);
+    return result;
+  }
+}
+```
+An instance of above aggregator can be specified as the `METRIC_AGGREGATOR` 
for `AnotherLineReceiver` while creating the DAG as shown below.
+
+```
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    ...
+    AnotherLineReceiver lineReceiver = dag.addOperator("LineReceiver", new 
AnotherLineReceiver());
+    dag.setAttribute(lineReceiver, Context.OperatorContext.METRICS_AGGREGATOR, 
new AvgLineLengthAggregator());
+    ...
+  }
+```
 
 # Retrieving AutoMetrics
 The Gateway REST API provides a way to retrieve the latest AutoMetrics for 
each logical operator.  For example:

Reply via email to