Repository: ambari Updated Branches: refs/heads/branch-2.4 fadf2d762 -> 121dc7fa8
AMBARI-17615 : AMS metrics GET API does not work for same metric with multiple aggregation functions (avijayan) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/121dc7fa Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/121dc7fa Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/121dc7fa Branch: refs/heads/branch-2.4 Commit: 121dc7fa8884a26511bd3f2e075283ae7a600be0 Parents: fadf2d7 Author: Aravindan Vijayan <[email protected]> Authored: Tue Jul 12 12:04:25 2016 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Tue Jul 12 12:04:25 2016 -0700 ---------------------------------------------------------------------- .../ambari-metrics-timelineservice/pom.xml | 6 + .../timeline/HBaseTimelineMetricStore.java | 13 +- .../metrics/timeline/PhoenixHBaseAccessor.java | 118 ++++++++++--------- .../timeline/HBaseTimelineMetricStoreTest.java | 38 ++++-- .../timeline/ITPhoenixHBaseAccessor.java | 15 ++- .../timeline/PhoenixHBaseAccessorTest.java | 8 +- 6 files changed, 121 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/121dc7fa/ambari-metrics/ambari-metrics-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml index a0e4adf..a75d2e9 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml +++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml @@ -532,6 +532,12 @@ </dependency> <dependency> + <groupId>com.google.collections</groupId> + <artifactId>google-collections</artifactId> + <version>1.0</version> + </dependency> + + <dependency> <groupId>org.fusesource.leveldbjni</groupId> <artifactId>leveldbjni-all</artifactId> <version>1.8</version> http://git-wip-us.apache.org/repos/asf/ambari/blob/121dc7fa/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 9510574..dd1ae3f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -185,7 +187,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func); } - Map<String, List<Function>> metricFunctions = + Multimap<String, List<Function>> metricFunctions = parseMetricNamesToAggregationFunctions(metricNames); ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet())) @@ -285,8 +287,8 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin return metricValues; } - static HashMap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) { - HashMap<String, List<Function>> metricsFunctions = new HashMap<>(); + static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) { + Multimap<String, List<Function>> metricsFunctions = ArrayListMultimap.create(); for (String metricName : metricNames){ Function function = Function.DEFAULT_VALUE_FUNCTION; @@ -303,10 +305,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin // fallback to VALUE, and fullMetricName } - List<Function> functionsList = metricsFunctions.get(cleanMetricName); - if (functionsList == null) { - functionsList = new ArrayList<>(1); - } + List<Function> functionsList = new ArrayList<>(); functionsList.add(function); metricsFunctions.put(cleanMetricName, functionsList); } http://git-wip-us.apache.org/repos/asf/ambari/blob/121dc7fa/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index bbd6d83..b86f97a 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import com.google.common.collect.Multimap; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -748,7 +749,7 @@ public class PhoenixHBaseAccessor { @SuppressWarnings("unchecked") public TimelineMetrics getMetricRecords( - final Condition condition, Map<String, List<Function>> metricFunctions) + final Condition condition, Multimap<String, List<Function>> metricFunctions) throws SQLException, IOException { validateConditionIsNotEmpty(condition); @@ -847,34 +848,36 @@ public class PhoenixHBaseAccessor { * or aggregate data with default function applied. */ private void appendMetricFromResultSet(TimelineMetrics metrics, Condition condition, - Map<String, List<Function>> metricFunctions, + Multimap<String, List<Function>> metricFunctions, ResultSet rs) throws SQLException, IOException { String metricName = rs.getString("METRIC_NAME"); - List<Function> functions = findMetricFunctions(metricFunctions, metricName); + Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName); - // Apply aggregation function if present - if ((functions != null && !functions.isEmpty())) { - if (functions.size() > 1) { - throw new IllegalArgumentException("Multiple aggregate functions not supported."); - } - for (Function f : functions) { - if (f.getReadFunction() == Function.ReadFunction.VALUE) { - getTimelineMetricsFromResultSet(metrics, f, condition, rs); - } else { - SingleValuedTimelineMetric metric = - TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f); - - if (condition.isGrouped()) { - metrics.addOrMergeTimelineMetric(metric); + for (List<Function> functions : functionList) { + // Apply aggregation function if present + if ((functions != null && !functions.isEmpty())) { + if (functions.size() > 1) { + throw new IllegalArgumentException("Multiple aggregate functions not supported."); + } + for (Function f : functions) { + if (f.getReadFunction() == Function.ReadFunction.VALUE) { + getTimelineMetricsFromResultSet(metrics, f, condition, rs); } else { - metrics.getMetrics().add(metric.getTimelineMetric()); + SingleValuedTimelineMetric metric = + TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f); + + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric.getTimelineMetric()); + } } } + } else { + // No aggregation requested + // Execution never goes here, function always contain at least 1 element + getTimelineMetricsFromResultSet(metrics, null, condition, rs); } - } else { - // No aggregation requested - // Execution never goes here, function always contain at least 1 element - getTimelineMetricsFromResultSet(metrics, null, condition, rs); } } @@ -936,7 +939,7 @@ public class PhoenixHBaseAccessor { * @throws SQLException */ public TimelineMetrics getAggregateMetricRecords(final Condition condition, - Map<String, List<Function>> metricFunctions) throws SQLException { + Multimap<String, List<Function>> metricFunctions) throws SQLException { validateConditionIsNotEmpty(condition); @@ -986,34 +989,37 @@ public class PhoenixHBaseAccessor { } private void appendAggregateMetricFromResultSet(TimelineMetrics metrics, - Condition condition, Map<String, List<Function>> metricFunctions, + Condition condition, Multimap<String, List<Function>> metricFunctions, ResultSet rs) throws SQLException { String metricName = rs.getString("METRIC_NAME"); - List<Function> functions = findMetricFunctions(metricFunctions, metricName); + Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName); - for (Function aggregateFunction : functions) { - SingleValuedTimelineMetric metric; + for (List<Function> functions : functionList) { + for (Function aggregateFunction : functions) { + SingleValuedTimelineMetric metric; - if (condition.getPrecision() == Precision.MINUTES + if (condition.getPrecision() == Precision.MINUTES || condition.getPrecision() == Precision.HOURS || condition.getPrecision() == Precision.DAYS) { - metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, false); - } else { - metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, true); - } + metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, false); + } else { + metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, true); + } - if (condition.isGrouped()) { - metrics.addOrMergeTimelineMetric(metric); - } else { - metrics.getMetrics().add(metric.getTimelineMetric()); + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric.getTimelineMetric()); + } } } + } private void getLatestAggregateMetricRecords(Condition condition, Connection conn, TimelineMetrics metrics, - Map<String, List<Function>> metricFunctions) throws SQLException { + Multimap<String, List<Function>> metricFunctions) throws SQLException { PreparedStatement stmt = null; SplitByMetricNamesCondition splitCondition = @@ -1027,22 +1033,24 @@ public class PhoenixHBaseAccessor { try { rs = stmt.executeQuery(); while (rs.next()) { - List<Function> functions = findMetricFunctions(metricFunctions, metricName); - if (functions != null) { - for (Function f : functions) { - SingleValuedTimelineMetric metric = - getAggregateTimelineMetricFromResultSet(rs, f, true); - - if (condition.isGrouped()) { - metrics.addOrMergeTimelineMetric(metric); - } else { - metrics.getMetrics().add(metric.getTimelineMetric()); + Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName); + for (List<Function> functions : functionList) { + if (functions != null) { + for (Function f : functions) { + SingleValuedTimelineMetric metric = + getAggregateTimelineMetricFromResultSet(rs, f, true); + + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric.getTimelineMetric()); + } } + } else { + SingleValuedTimelineMetric metric = + getAggregateTimelineMetricFromResultSet(rs, new Function(), true); + metrics.getMetrics().add(metric.getTimelineMetric()); } - } else { - SingleValuedTimelineMetric metric = - getAggregateTimelineMetricFromResultSet(rs, new Function(), true); - metrics.getMetrics().add(metric.getTimelineMetric()); } } } finally { @@ -1108,16 +1116,16 @@ public class PhoenixHBaseAccessor { } } - private List<Function> findMetricFunctions(Map<String, List<Function>> metricFunctions, + private Collection<List<Function>> findMetricFunctions(Multimap<String, List<Function>> metricFunctions, String metricName) { if (metricFunctions.containsKey(metricName)) { return metricFunctions.get(metricName); } - for (Map.Entry<String, List<Function>> nameToFunctions : metricFunctions.entrySet()) { - String metricRegEx = nameToFunctions.getKey().replace("%", ".*"); + for (String metricNameEntry : metricFunctions.keySet()) { + String metricRegEx = metricNameEntry.replace("%", ".*"); if (metricName.matches(metricRegEx)) { - return nameToFunctions.getValue(); + return metricFunctions.get(metricNameEntry); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/121dc7fa/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java index 29e2664..aae1d4b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java @@ -17,21 +17,21 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import com.google.common.collect.Multimap; import junit.framework.Assert; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; import org.junit.Test; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.SUM; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; public class HBaseTimelineMetricStoreTest { @@ -44,25 +44,49 @@ public class HBaseTimelineMetricStoreTest { //giwen List<String> metricNames = Arrays.asList( MEM_METRIC + "._avg", + MEM_METRIC + "._sum", MEM_METRIC + "._rate._avg", BYTES_IN_METRIC, BYTES_NOT_AFUNCTION_METRIC); //when - HashMap<String, List<Function>> mfm = + Multimap<String, List<Function>> multimap = HBaseTimelineMetricStore.parseMetricNamesToAggregationFunctions(metricNames); //then - assertThat(mfm).hasSize(3) - .containsKeys(MEM_METRIC, BYTES_IN_METRIC, BYTES_NOT_AFUNCTION_METRIC); + Assert.assertEquals(multimap.keySet().size(), 3); + Assert.assertTrue(multimap.containsKey(MEM_METRIC)); + Assert.assertTrue(multimap.containsKey(BYTES_IN_METRIC)); + Assert.assertTrue(multimap.containsKey(BYTES_NOT_AFUNCTION_METRIC)); + List<List<Function>> metricEntry = (List<List<Function>>) multimap.get(MEM_METRIC); + HashMap<String, List<Function>> mfm = new HashMap<String, List<Function>>(); + mfm.put(MEM_METRIC, metricEntry.get(0)); + + assertThat(mfm.get(MEM_METRIC)).containsOnly( + new Function(AVG, null)); + + mfm = new HashMap<String, List<Function>>(); + mfm.put(MEM_METRIC, metricEntry.get(1)); + assertThat(mfm.get(MEM_METRIC)).containsOnly( + new Function(SUM, null)); + + mfm = new HashMap<String, List<Function>>(); + mfm.put(MEM_METRIC, metricEntry.get(2)); assertThat(mfm.get(MEM_METRIC)).containsOnly( - new Function(AVG, null), new Function(AVG, RATE)); + metricEntry = (List<List<Function>>) multimap.get(BYTES_IN_METRIC); + mfm = new HashMap<String, List<Function>>(); + mfm.put(BYTES_IN_METRIC, metricEntry.get(0)); + assertThat(mfm.get(BYTES_IN_METRIC)) .contains(Function.DEFAULT_VALUE_FUNCTION); + metricEntry = (List<List<Function>>) multimap.get(BYTES_NOT_AFUNCTION_METRIC); + mfm = new HashMap<String, List<Function>>(); + mfm.put(BYTES_NOT_AFUNCTION_METRIC, metricEntry.get(0)); + assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC)) .contains(Function.DEFAULT_VALUE_FUNCTION); http://git-wip-us.apache.org/repos/asf/ambari/blob/121dc7fa/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java index ae1aa5a..7c6e7f4 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -259,9 +261,10 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { Condition condition = new DefaultCondition( Collections.singletonList("disk_free"), null, null, null, null, null, Precision.SECONDS, null, true); - TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition, - Collections.singletonMap("disk_free", - Collections.singletonList(new Function(Function.ReadFunction.SUM, null)))); + + Multimap<String, List<Function>> mmap = ArrayListMultimap.create(); + mmap.put("disk_free", Collections.singletonList(new Function(Function.ReadFunction.SUM, null))); + TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition, mmap); //THEN assertEquals(1, timelineMetrics.getMetrics().size()); @@ -380,8 +383,10 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { hBaseAdmin.close(); } - private Map<String, List<Function>> singletonValueFunctionMap(String metricName) { - return Collections.singletonMap(metricName, Collections.singletonList(new Function())); + private Multimap<String, List<Function>> singletonValueFunctionMap(String metricName) { + Multimap<String, List<Function>> mmap = ArrayListMultimap.create(); + mmap.put(metricName, Collections.singletonList(new Function())); + return mmap; } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/121dc7fa/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java index a86fa11..0ea668a 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -79,7 +81,7 @@ public class PhoenixHBaseAccessorTest { List<String> metricNames = new LinkedList<>(); List<String> hostnames = new LinkedList<>(); - Map<String, List<Function>> metricFunctions = new HashMap<>(); + Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create(); PowerMock.mockStatic(PhoenixTransactSQL.class); PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class); @@ -128,7 +130,7 @@ public class PhoenixHBaseAccessorTest { List<String> metricNames = new LinkedList<>(); List<String> hostnames = new LinkedList<>(); - Map<String, List<Function>> metricFunctions = new HashMap<>(); + Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create(); PowerMock.mockStatic(PhoenixTransactSQL.class); PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class); @@ -178,7 +180,7 @@ public class PhoenixHBaseAccessorTest { List<String> metricNames = new LinkedList<>(); List<String> hostnames = new LinkedList<>(); - Map<String, List<Function>> metricFunctions = new HashMap<>(); + Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create(); PowerMock.mockStatic(PhoenixTransactSQL.class); PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class);
