APEXCORE-291 #resolve #comment using operator instance as default aggregator when it implements AutoMetric.Aggregator
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/ecb96bad Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/ecb96bad Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/ecb96bad Branch: refs/heads/master Commit: ecb96bad0aa2e5c959d7626ebdd93e0fb265c424 Parents: 56b55fe Author: Chandni Singh <[email protected]> Authored: Tue Dec 22 00:45:22 2015 -0800 Committer: Chandni Singh <[email protected]> Committed: Tue Dec 29 23:40:22 2015 -0800 ---------------------------------------------------------------------- engine/pom.xml | 2 +- .../stram/appdata/AppDataPushAgent.java | 3 +- .../stram/plan/logical/LogicalPlan.java | 38 +-------- .../plan/logical/MetricAggregatorMeta.java | 89 ++++++++++++++++++++ .../stram/engine/AutoMetricTest.java | 65 +++++++++++++- 5 files changed, 156 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ecb96bad/engine/pom.xml ---------------------------------------------------------------------- diff --git a/engine/pom.xml b/engine/pom.xml index 34593ce..6bf8283 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -145,7 +145,7 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <configuration> - <maxAllowedViolations>4406</maxAllowedViolations> + <maxAllowedViolations>4400</maxAllowedViolations> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ecb96bad/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java b/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java index 9389e3c..b47dc41 100644 --- a/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java @@ -47,6 +47,7 @@ import com.datatorrent.stram.StreamingContainerManager; import com.datatorrent.stram.WebsocketAppDataPusher; import com.datatorrent.stram.api.AppDataPusher; import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.MetricAggregatorMeta; import com.datatorrent.stram.webapp.LogicalOperatorInfo; /** @@ -253,7 +254,7 @@ public class AppDataPushAgent extends AbstractService result.put("appName", dnmgr.getApplicationAttributes().get(DAGContext.APPLICATION_NAME)); result.put("logicalOperatorName", operatorMeta.getName()); - LogicalPlan.MetricAggregatorMeta metricAggregatorMeta = operatorMeta.getMetricAggregatorMeta(); + MetricAggregatorMeta metricAggregatorMeta = operatorMeta.getMetricAggregatorMeta(); JSONArray valueSchemas = new JSONArray(); for (Map.Entry<String, Object> entry : aggregates.entrySet()) { String metricName = entry.getKey(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ecb96bad/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 377fa6d..347e94f 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -839,6 +839,9 @@ public class LogicalPlan implements Serializable, DAG protected void populateAggregatorMeta() { AutoMetric.Aggregator aggregator = getValue(OperatorContext.METRICS_AGGREGATOR); + if (aggregator == null && operator instanceof AutoMetric.Aggregator) { + aggregator = new MetricAggregatorMeta.MetricsAggregatorProxy(this); + } if (aggregator == null) { MetricsAggregator defAggregator = null; Set<String> metricNames = Sets.newHashSet(); @@ -2015,39 +2018,4 @@ public class LogicalPlan implements Serializable, DAG return result; } - public final class MetricAggregatorMeta implements Serializable - { - private final AutoMetric.Aggregator aggregator; - private final AutoMetric.DimensionsScheme dimensionsScheme; - - protected MetricAggregatorMeta(AutoMetric.Aggregator aggregator, - AutoMetric.DimensionsScheme dimensionsScheme) - { - this.aggregator = aggregator; - this.dimensionsScheme = dimensionsScheme; - } - - public AutoMetric.Aggregator getAggregator() - { - return this.aggregator; - } - - public String[] getDimensionAggregatorsFor(String logicalMetricName) - { - if (dimensionsScheme == null) { - return null; - } - return dimensionsScheme.getDimensionAggregationsFor(logicalMetricName); - } - - public String[] getTimeBuckets() - { - if (dimensionsScheme == null) { - return null; - } - return dimensionsScheme.getTimeBuckets(); - } - - private static final long serialVersionUID = 201604271719L; - } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ecb96bad/engine/src/main/java/com/datatorrent/stram/plan/logical/MetricAggregatorMeta.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/MetricAggregatorMeta.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/MetricAggregatorMeta.java new file mode 100644 index 0000000..65bc2a4 --- /dev/null +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/MetricAggregatorMeta.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.plan.logical; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; + +/** + * A class that encapsulates {@link AutoMetric.Aggregator} and {@link AutoMetric.DimensionsScheme} of a particular + * operator. + */ +public final class MetricAggregatorMeta implements Serializable +{ + private final AutoMetric.Aggregator aggregator; + private final AutoMetric.DimensionsScheme dimensionsScheme; + + protected MetricAggregatorMeta(AutoMetric.Aggregator aggregator, AutoMetric.DimensionsScheme dimensionsScheme) + { + this.aggregator = aggregator; + this.dimensionsScheme = dimensionsScheme; + } + + public AutoMetric.Aggregator getAggregator() + { + return this.aggregator; + } + + public String[] getDimensionAggregatorsFor(String logicalMetricName) + { + if (dimensionsScheme == null) { + return null; + } + return dimensionsScheme.getDimensionAggregationsFor(logicalMetricName); + } + + public String[] getTimeBuckets() + { + if (dimensionsScheme == null) { + return null; + } + return dimensionsScheme.getTimeBuckets(); + } + + private static final long serialVersionUID = 201604271719L; + + /** + * Serves as a proxy for Aggregator when operator itself implements {@link AutoMetric.Aggregator}. + */ + static final class MetricsAggregatorProxy implements AutoMetric.Aggregator, Serializable + { + private final LogicalPlan.OperatorMeta om; + + MetricsAggregatorProxy(@NotNull LogicalPlan.OperatorMeta om) + { + this.om = Preconditions.checkNotNull(om); + } + + @Override + public Map<String, Object> aggregate(long windowId, Collection<AutoMetric.PhysicalMetricsContext> physicalMetrics) + { + return ((AutoMetric.Aggregator)om.getOperator()).aggregate(windowId, physicalMetrics); + } + + private static final long serialVersionUID = 201512221830L; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ecb96bad/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java index 28e2e51..a76e6e0 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java @@ -25,7 +25,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import org.apache.hadoop.conf.Configuration; +import javax.validation.constraints.NotNull; + import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -34,17 +35,21 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; + import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.datatorrent.api.*; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Partitioner; import com.datatorrent.api.Stats.OperatorStats; - +import com.datatorrent.api.StatsListener; import com.datatorrent.common.partitioner.StatelessPartitioner; -import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.StramLocalCluster; import com.datatorrent.stram.engine.AutoMetricTest.TestOperator.TestStatsListener; import com.datatorrent.stram.plan.logical.LogicalPlan; @@ -307,6 +312,33 @@ public class AutoMetricTest Assert.assertNotNull("default aggregator injected", o1meta.getMetricAggregatorMeta().getAggregator()); } + @Test + public void testDefaultMetricsAggregator() throws Exception + { + LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration()); + + TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class); + + CountDownLatch latch = new CountDownLatch(1); + OperatorAndAggregator o1 = dag.addOperator("o1", new OperatorAndAggregator(latch)); + + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent()); + + dag.addStream("TestTuples", inputOperator.outport, o1.inport1); + + lpc.prepareDAG(dag, null, "AutoMetricTest"); + + LogicalPlan.OperatorMeta o1meta = dag.getOperatorMeta("o1"); + Assert.assertNotNull("default aggregator injected", o1meta.getMetricAggregatorMeta().getAggregator()); + + lpc.prepareDAG(dag, null, "AutoMetricTest"); + StramLocalCluster lc = new StramLocalCluster(dag); + lc.runAsync(); + latch.await(); + Assert.assertEquals("progress", 1, o1.result.get("progress")); + lc.shutdown(); + } + private static class MockAggregator implements AutoMetric.Aggregator, Serializable { long cachedSum = -1; @@ -361,6 +393,31 @@ public class AutoMetricTest } } + public static class OperatorAndAggregator extends OperatorWithMetrics implements AutoMetric.Aggregator + { + Map<String, Object> result = Maps.newHashMap(); + + private final transient CountDownLatch latch; + + private OperatorAndAggregator() + { + latch = null; + } + + OperatorAndAggregator(@NotNull CountDownLatch latch) + { + this.latch = Preconditions.checkNotNull(latch); + } + + @Override + public Map<String, Object> aggregate(long windowId, Collection<AutoMetric.PhysicalMetricsContext> physicalMetrics) + { + result.put("progress", physicalMetrics.iterator().next().getMetrics().get("progress")); + latch.countDown(); + return result; + } + } + @Test public void testMetricsAnnotatedMethod() throws Exception {
