http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java deleted file mode 100644 index d3da500..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ /dev/null @@ -1,513 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; - -import java.io.IOException; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlRootElement; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; -import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; -import org.apache.hadoop.metrics2.sink.timeline.Precision; -import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore; -import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper; -import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair; -import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; -import org.apache.hadoop.yarn.webapp.BadRequestException; - -import com.google.inject.Inject; -import com.google.inject.Singleton; - -@Singleton -@Path("/ws/v1/timeline") -public class TimelineWebServices { - private static final Log LOG = LogFactory.getLog(TimelineWebServices.class); - - private TimelineMetricStore timelineMetricStore; - - @Inject - public TimelineWebServices(TimelineMetricStore timelineMetricStore) { - this.timelineMetricStore = timelineMetricStore; - } - - @XmlRootElement(name = "about") - @XmlAccessorType(XmlAccessType.NONE) - @Public - @Unstable - public static class AboutInfo { - - private String about; - - public AboutInfo() { - - } - - public AboutInfo(String about) { - this.about = about; - } - - @XmlElement(name = "About") - public String getAbout() { - return about; - } - - public void setAbout(String about) { - this.about = about; - } - - } - - /** - * Return the description of the timeline web services. - */ - @GET - @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public AboutInfo about( - @Context HttpServletRequest req, - @Context HttpServletResponse res) { - init(res); - return new AboutInfo("AMS API"); - } - - /** - * Store the given metrics into the timeline store, and return errors that - * happened during storing. - */ - @Path("/metrics") - @POST - @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public TimelinePutResponse postMetrics( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - TimelineMetrics metrics) { - - init(res); - if (metrics == null) { - return new TimelinePutResponse(); - } - - try { - - // TODO: Check ACLs for MetricEntity using the TimelineACLManager. - // TODO: Save owner of the MetricEntity. - - if (LOG.isDebugEnabled()) { - LOG.debug("Storing metrics: " + - TimelineUtils.dumpTimelineRecordtoJSON(metrics, true)); - } - - return timelineMetricStore.putMetrics(metrics); - - } catch (Exception e) { - LOG.error("Error saving metrics.", e); - throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); - } - } - - /** - * Store the given metrics into the timeline store, and return errors that - * happened during storing. - */ - @Path("/metrics/aggregated") - @POST - @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public TimelinePutResponse postAggregatedMetrics( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - AggregationResult metrics) { - - init(res); - if (metrics == null) { - return new TimelinePutResponse(); - } - - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Storing aggregated metrics: " + - TimelineUtils.dumpTimelineRecordtoJSON(metrics, true)); - } - - return timelineMetricStore.putHostAggregatedMetrics(metrics); - } catch (Exception e) { - LOG.error("Error saving metrics.", e); - throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); - } - } - - @Path("/containermetrics") - @POST - @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public TimelinePutResponse postContainerMetrics( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - List<ContainerMetric> metrics) { - init(res); - if (metrics == null || metrics.isEmpty()) { - return new TimelinePutResponse(); - } - - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Storing container metrics: " + TimelineUtils - .dumpTimelineRecordtoJSON(metrics, true)); - } - - return timelineMetricStore.putContainerMetrics(metrics); - - } catch (Exception e) { - LOG.error("Error saving metrics.", e); - throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); - } - } - - /** - * Query for a set of different metrics satisfying the filter criteria. - * All query params are optional. The default limit will apply if none - * specified. - * - * @param metricNames Comma separated list of metrics to retrieve. - * @param appId Application Id for the requested metrics. - * @param instanceId Application instance id. - * @param hostname Hostname where the metrics originated. - * @param startTime Start time for the metric records retrieved. - * @param precision Precision [ seconds, minutes, hours ] - * @param limit limit on total number of {@link TimelineMetric} records - * retrieved. - * @return {@link @TimelineMetrics} - */ - @GET - @Path("/metrics") - @Produces({ MediaType.APPLICATION_JSON }) - public TimelineMetrics getTimelineMetrics( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - @QueryParam("metricNames") String metricNames, - @QueryParam("appId") String appId, - @QueryParam("instanceId") String instanceId, - @QueryParam("hostname") String hostname, - @QueryParam("startTime") String startTime, - @QueryParam("endTime") String endTime, - @QueryParam("precision") String precision, - @QueryParam("limit") String limit, - @QueryParam("grouped") String grouped, - @QueryParam("topN") String topN, - @QueryParam("topNFunction") String topNFunction, - @QueryParam("isBottomN") String isBottomN, - @QueryParam("seriesAggregateFunction") String seriesAggregateFunction - ) { - init(res); - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Request for metrics => metricNames: " + metricNames + ", " + - "appId: " + appId + ", instanceId: " + instanceId + ", " + - "hostname: " + hostname + ", startTime: " + startTime + ", " + - "endTime: " + endTime + ", " + - "precision: " + precision + "seriesAggregateFunction: " + seriesAggregateFunction); - } - - return timelineMetricStore.getTimelineMetrics( - parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, parseStr(instanceId), - parseLongStr(startTime), parseLongStr(endTime), - Precision.getPrecision(precision), parseIntStr(limit), - parseBoolean(grouped), parseTopNConfig(topN, topNFunction, isBottomN), - seriesAggregateFunction); - - } catch (NumberFormatException ne) { - throw new BadRequestException("startTime and limit should be numeric " + - "values"); - } catch (Precision.PrecisionFormatException pfe) { - throw new BadRequestException("precision should be seconds, minutes " + - "or hours"); - } catch (PrecisionLimitExceededException iae) { - throw new PrecisionLimitExceededException(iae.getMessage()); - } catch (IllegalArgumentException iae) { - throw new BadRequestException(iae.getMessage()); - } catch (SQLException | IOException e) { - throw new WebApplicationException(e, - Response.Status.INTERNAL_SERVER_ERROR); - } - } - - @GET - @Path("/metrics/metadata") - @Produces({ MediaType.APPLICATION_JSON }) - public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - @QueryParam("appId") String appId, - @QueryParam("metricName") String metricPattern, - @QueryParam("includeAll") String includeBlacklistedMetrics - ) { - init(res); - - try { - return timelineMetricStore.getTimelineMetricMetadata( - parseStr(appId), - parseStr(metricPattern), - parseBoolean(includeBlacklistedMetrics)); - } catch (Exception e) { - throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); - } - } - - @GET - @Path("/metrics/hosts") - @Produces({ MediaType.APPLICATION_JSON }) - public Map<String, Set<String>> getHostedAppsMetadata( - @Context HttpServletRequest req, - @Context HttpServletResponse res - ) { - init(res); - - try { - return timelineMetricStore.getHostAppsMetadata(); - } catch (Exception e) { - throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); - } - } - - @GET - @Path("/metrics/instances") - @Produces({ MediaType.APPLICATION_JSON }) - public Map<String, Map<String, Set<String>>> getClusterHostsMetadata( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - @QueryParam("appId") String appId, - @QueryParam("instanceId") String instanceId - ) { - init(res); - - try { - return timelineMetricStore.getInstanceHostsMetadata(instanceId, appId); - } catch (Exception e) { - throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); - } - } - - @GET - @Path("/metrics/uuid") - @Produces({ MediaType.APPLICATION_JSON }) - public byte[] getUuid( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - @QueryParam("metricName") String metricName, - @QueryParam("appId") String appId, - @QueryParam("instanceId") String instanceId, - @QueryParam("hostname") String hostname - ) { - init(res); - - try { - return timelineMetricStore.getUuid(metricName, appId, instanceId, hostname); - } catch (Exception e) { - throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); - } - } - - /** - * This is a discovery endpoint that advertises known live collector - * instances. Note: It will always answer with current instance as live. - * This can be utilized as a liveliness pinger endpoint since the instance - * names are cached and thereby no synchronous calls result from this API - * - * @return List<String> hostnames</String> - */ - @GET - @Path("/metrics/livenodes") - @Produces({ MediaType.APPLICATION_JSON }) - public List<String> getLiveCollectorNodes( - @Context HttpServletRequest req, - @Context HttpServletResponse res - ) { - init(res); - - return timelineMetricStore.getLiveInstances(); - } - - private void init(HttpServletResponse response) { - response.setContentType(null); - } - - private static SortedSet<String> parseArrayStr(String str, String delimiter) { - if (str == null) { - return null; - } - SortedSet<String> strSet = new TreeSet<String>(); - String[] strs = str.split(delimiter); - for (String aStr : strs) { - strSet.add(aStr.trim()); - } - return strSet; - } - - private static NameValuePair parsePairStr(String str, String delimiter) { - if (str == null) { - return null; - } - String[] strs = str.split(delimiter, 2); - try { - return new NameValuePair(strs[0].trim(), - GenericObjectMapper.OBJECT_READER.readValue(strs[1].trim())); - } catch (Exception e) { - // didn't work as an Object, keep it as a String - return new NameValuePair(strs[0].trim(), strs[1].trim()); - } - } - - private static Collection<NameValuePair> parsePairsStr( - String str, String aDelimiter, String pDelimiter) { - if (str == null) { - return null; - } - String[] strs = str.split(aDelimiter); - Set<NameValuePair> pairs = new HashSet<NameValuePair>(); - for (String aStr : strs) { - pairs.add(parsePairStr(aStr, pDelimiter)); - } - return pairs; - } - - private static EnumSet<Field> parseFieldsStr(String str, String delimiter) { - if (str == null) { - return null; - } - String[] strs = str.split(delimiter); - List<Field> fieldList = new ArrayList<Field>(); - for (String s : strs) { - s = s.trim().toUpperCase(); - if (s.equals("EVENTS")) { - fieldList.add(Field.EVENTS); - } else if (s.equals("LASTEVENTONLY")) { - fieldList.add(Field.LAST_EVENT_ONLY); - } else if (s.equals("RELATEDENTITIES")) { - fieldList.add(Field.RELATED_ENTITIES); - } else if (s.equals("PRIMARYFILTERS")) { - fieldList.add(Field.PRIMARY_FILTERS); - } else if (s.equals("OTHERINFO")) { - fieldList.add(Field.OTHER_INFO); - } else { - throw new IllegalArgumentException("Requested nonexistent field " + s); - } - } - if (fieldList.size() == 0) { - return null; - } - Field f1 = fieldList.remove(fieldList.size() - 1); - if (fieldList.size() == 0) { - return EnumSet.of(f1); - } else { - return EnumSet.of(f1, fieldList.toArray(new Field[fieldList.size()])); - } - } - - private static Long parseLongStr(String str) { - return str == null ? null : Long.parseLong(str.trim()); - } - - private static Integer parseIntStr(String str) { - return str == null ? null : Integer.parseInt(str.trim()); - } - - private static boolean parseBoolean(String booleanStr) { - return booleanStr == null || Boolean.parseBoolean(booleanStr); - } - - private static TopNConfig parseTopNConfig(String topN, String topNFunction, - String bottomN) { - if (topN == null || topN.isEmpty()) { - return null; - } - Integer topNValue = parseIntStr(topN); - - if (topNValue == 0) { - LOG.info("Invalid Input for TopN query. Ignoring TopN Request."); - return null; - } - - Boolean isBottomN = (bottomN != null && Boolean.parseBoolean(bottomN)); - return new TopNConfig(topNValue, topNFunction, isBottomN); - } - - /** - * Parses delimited string to list of strings. It skips strings that are - * effectively empty (i.e. only whitespace). - * - */ - private static List<String> parseListStr(String str, String delimiter) { - if (str == null || str.trim().isEmpty()){ - return null; - } - - String[] split = str.trim().split(delimiter); - List<String> list = new ArrayList<String>(split.length); - for (String s : split) { - if (!s.trim().isEmpty()){ - list.add(s); - } - } - - return list; - } - - private static String parseStr(String str) { - String trimmedInstance = (str == null) ? null : str.trim(); - if (trimmedInstance != null) { - if (trimmedInstance.isEmpty() || trimmedInstance.equalsIgnoreCase("undefined")) { - trimmedInstance = null; - } - } - return trimmedInstance; - } -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py b/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py index b6b4e0b..5faeffc 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/python/main.py @@ -39,13 +39,13 @@ SERVER_START_CMD = \ "-cp {0} {1} " + \ "-Djava.net.preferIPv4Stack=true " \ "-Dproc_timelineserver " + \ - "org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer" + "org.apache.ambari.metrics.AMSApplicationServer" SERVER_START_CMD_DEBUG = \ "-cp {0} {1} " + \ "-Djava.net.preferIPv4Stack=true " \ "-Dproc_timelineserver " + \ " -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend={2} " + \ - "org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer" + "org.apache.ambari.metrics.AMSApplicationServer" AMC_DIE_MSG = "Ambari Metrics Collector java process died with exitcode {0}. Check {1} for more information." http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java new file mode 100644 index 0000000..89a5759 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestAppMetrics.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.loadsimulator.data; + +import org.apache.ambari.metrics.core.loadsimulator.util.Json; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class TestAppMetrics { + private static final String SAMPLE_SINGLE_METRIC_HOST_JSON = "{\n" + + " \"metrics\" : [ {\n" + + " \"instanceid\" : \"\",\n" + + " \"hostname\" : \"localhost\",\n" + + " \"metrics\" : {\n" + + " \"0\" : \"5.35\",\n" + + " \"5000\" : \"5.35\",\n" + + " \"10000\" : \"5.35\",\n" + + " \"15000\" : \"5.35\"\n" + + " },\n" + + " \"starttime\" : \"1411663170112\",\n" + + " \"appid\" : \"HOST\",\n" + + " \"metricname\" : \"disk_free\"\n" + + " } ]\n" + + "}"; + + private static final String SAMPLE_TWO_METRIC_HOST_JSON = "{\n" + + " \"metrics\" : [ {\n" + + " \"instanceid\" : \"\",\n" + + " \"hostname\" : \"localhost\",\n" + + " \"metrics\" : {\n" + + " \"0\" : \"5.35\",\n" + + " \"5000\" : \"5.35\",\n" + + " \"10000\" : \"5.35\",\n" + + " \"15000\" : \"5.35\"\n" + + " },\n" + + " \"starttime\" : \"0\",\n" + + " \"appid\" : \"HOST\",\n" + + " \"metricname\" : \"disk_free\"\n" + + " }, {\n" + + " \"instanceid\" : \"\",\n" + + " \"hostname\" : \"localhost\",\n" + + " \"metrics\" : {\n" + + " \"0\" : \"94.0\",\n" + + " \"5000\" : \"94.0\",\n" + + " \"10000\" : \"94.0\",\n" + + " \"15000\" : \"94.0\"\n" + + " },\n" + + " \"starttime\" : \"0\",\n" + + " \"appid\" : \"HOST\",\n" + + " \"metricname\" : \"mem_cached\"\n" + + " } ]\n" + + "}"; + + private long[] timestamps; + + @Before + public void setUp() throws Exception { + timestamps = new long[4]; + timestamps[0] = 0; + timestamps[1] = timestamps[0] + 5000; + timestamps[2] = timestamps[1] + 5000; + timestamps[3] = timestamps[2] + 5000; + + } + + @Test + public void testHostDiskMetricsSerialization() throws IOException { + long timestamp = 1411663170112L; + AppMetrics appMetrics = new AppMetrics(new ApplicationInstance("localhost", AppID.HOST, ""), timestamp); + + Metric diskFree = appMetrics.createMetric("disk_free"); + double value = 5.35; + + diskFree.putMetric(timestamps[0], Double.toString(value)); + diskFree.putMetric(timestamps[1], Double.toString(value)); + diskFree.putMetric(timestamps[2], Double.toString(value)); + diskFree.putMetric(timestamps[3], Double.toString(value)); + + appMetrics.addMetric(diskFree); + + String expected = SAMPLE_SINGLE_METRIC_HOST_JSON; + String s = new Json(true).serialize(appMetrics); + + assertEquals("Serialized Host Metrics", expected, s); + } + + + @Test + public void testSingleHostManyMetricsSerialization() throws IOException { + AppMetrics appMetrics = new AppMetrics(new ApplicationInstance("localhost", AppID.HOST, ""), timestamps[0]); + + Metric diskFree = appMetrics.createMetric("disk_free"); + double value = 5.35; + diskFree.putMetric(timestamps[0], Double.toString(value)); + diskFree.putMetric(timestamps[1], Double.toString(value)); + diskFree.putMetric(timestamps[2], Double.toString(value)); + diskFree.putMetric(timestamps[3], Double.toString(value)); + + appMetrics.addMetric(diskFree); + + Metric memCache = appMetrics.createMetric("mem_cached"); + double memVal = 94; + memCache.putMetric(timestamps[0], Double.toString(memVal)); + memCache.putMetric(timestamps[1], Double.toString(memVal)); + memCache.putMetric(timestamps[2], Double.toString(memVal)); + memCache.putMetric(timestamps[3], Double.toString(memVal)); + + appMetrics.addMetric(memCache); + + String expected = SAMPLE_TWO_METRIC_HOST_JSON; + String s = new Json(true).serialize(appMetrics); + + assertEquals("Serialized Host Metrics", expected, s); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java new file mode 100644 index 0000000..79e4b8f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/data/TestMetric.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.loadsimulator.data; + +import org.apache.ambari.metrics.core.loadsimulator.util.Json; +import org.junit.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; +import static org.junit.Assert.assertEquals; + +public class TestMetric { + private static final String SAMPLE_METRIC_IN_JSON = "{\n" + + " \"instanceid\" : \"\",\n" + + " \"hostname\" : \"localhost\",\n" + + " \"metrics\" : {\n" + + " \"0\" : \"5.35\",\n" + + " \"5000\" : \"5.35\",\n" + + " \"10000\" : \"5.35\",\n" + + " \"15000\" : \"5.35\"\n" + + " },\n" + + " \"starttime\" : \"0\",\n" + + " \"appid\" : \"HOST\",\n" + + " \"metricname\" : \"disk_free\"\n" + + "}"; + + @Test + public void testSerializeToJson() throws IOException { + Metric diskOnHostMetric = new Metric(new ApplicationInstance("localhost", AppID.HOST, ""), "disk_free", 0); + + long timestamp = 0; + double value = 5.35; + + diskOnHostMetric.putMetric(timestamp, Double.toString(value)); + diskOnHostMetric.putMetric(timestamp + 5000, Double.toString(value)); + diskOnHostMetric.putMetric(timestamp + 10000, Double.toString(value)); + diskOnHostMetric.putMetric(timestamp + 15000, Double.toString(value)); + + String expected = SAMPLE_METRIC_IN_JSON; + String s = new Json(true).serialize(diskOnHostMetric); + + assertEquals("Json should match", expected, s); + } + + @Test + public void testDeserializeObjectFromString() throws IOException { + String source = SAMPLE_METRIC_IN_JSON; + + Metric m = new Json().deserialize(source, Metric.class); + + assertEquals("localhost", m.getHostname()); + assertEquals("HOST", m.getAppid()); + assertEquals("", m.getInstanceid()); + assertEquals("disk_free", m.getMetricname()); + assertEquals("0", m.getStarttime()); + + assertThat(m.getMetrics()).isNotEmpty().hasSize(4).contains( + entry("0", "5.35"), + entry("5000", "5.35"), + entry("10000", "5.35"), + entry("15000", "5.35")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java new file mode 100644 index 0000000..8d2d38e --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AMSJMeterLoadTest.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest; + +import org.apache.commons.lang3.StringUtils; +import org.apache.ambari.metrics.core.loadsimulator.MetricsLoadSimulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class AMSJMeterLoadTest { + + private final static Logger LOG = LoggerFactory.getLogger(AMSJMeterLoadTest.class); + private static String PROPERTIES_FILE = "loadsimulator/ams-jmeter.properties"; + private ScheduledExecutorService scheduledExecutorService = null; + private List<AppGetMetric> appGetMetrics; + private Properties amsJmeterProperties = null; + + public AMSJMeterLoadTest(Map<String, String> args) { + + String testType = args.get("type"); + String userDefinedPropertiesFile = args.get("amsJmeterPropertiesFile"); + if (null == userDefinedPropertiesFile || userDefinedPropertiesFile.isEmpty()) { + this.amsJmeterProperties = readProperties(PROPERTIES_FILE); + } else { + this.amsJmeterProperties = readProperties(userDefinedPropertiesFile); + } + + if ("U".equals(testType)) { //GET metrics simulator + int numInstances = Integer.valueOf(amsJmeterProperties.getProperty("num-ui-instances")); + this.scheduledExecutorService = Executors.newScheduledThreadPool(numInstances); + this.appGetMetrics = initializeGetMetricsPayload(amsJmeterProperties); + this.runTest(numInstances); + } else { //PUT Metrics simulator + Map<String, String> mapArgs = new HashMap<String, String>(); + mapArgs.put("hostName", (args.get("host-prefix") != null) ? args.get("host-prefix") : amsJmeterProperties.getProperty("host-prefix")); + mapArgs.put("minHostIndex", (args.get("min-host-index") != null) ? args.get("min-host-index") : amsJmeterProperties.getProperty("min-host-index")); + mapArgs.put("numberOfHosts", (args.get("num-hosts") != null) ? args.get("num-hosts") : amsJmeterProperties.getProperty("num-hosts")); + mapArgs.put("metricsHostName", (args.get("ams-host-port") != null) ? args.get("ams-host-port") : amsJmeterProperties.getProperty("ams-host-port")); + mapArgs.put("collectInterval", (args.get("collection-interval") != null) ? args.get("collection-interval") : amsJmeterProperties.getProperty("collection-interval")); + mapArgs.put("sendInterval", (args.get("send-interval") != null) ? args.get("send-interval") : amsJmeterProperties.getProperty("send-interval")); + mapArgs.put("master", (args.get("create-master") != null) ? args.get("create-master") : amsJmeterProperties.getProperty("create-master")); + System.out.println("AMS Load Simulation Parameters : " + mapArgs); + MetricsLoadSimulator.startTest(mapArgs); + } + } + + public static Properties readProperties(String propertiesFile) { + try { + Properties properties = new Properties(); + InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile); + if (inputStream == null) { + inputStream = new FileInputStream(propertiesFile); + } + properties.load(inputStream); + return properties; + } catch (IOException ioEx) { + LOG.error("Error reading properties file for jmeter"); + return null; + } + } + + private static List<GetMetricRequestInfo> readMetricsFromFile(String app) { + InputStream input = null; + List<GetMetricRequestInfo> metricList = new ArrayList<>(); + String fileName = "ui_metrics_def/" + app + ".dat"; + + try { + input = ClassLoader.getSystemResourceAsStream(fileName); + BufferedReader reader = new BufferedReader(new InputStreamReader(input)); + String line; + List<String> metrics = new ArrayList<>(); + while ((line = reader.readLine()) != null) { + + if (line.startsWith("|")) { + boolean needsTimestamps = line.contains("startTime"); + boolean needsHost = line.contains("hostname"); + metricList.add(new GetMetricRequestInfo(metrics, needsTimestamps, needsHost)); + metrics.clear(); + } else { + metrics.add(line); + } + } + return metricList; + } catch (IOException e) { + LOG.error("Cannot read file " + fileName + " for appID " + app, e); + } finally { + if (input != null) { + try { + input.close(); + } catch (IOException ex) { + } + } + } + return null; + } + + private static List<AppGetMetric> initializeGetMetricsPayload(Properties amsJmeterProperties) { + + List<AppGetMetric> appGetMetrics = new ArrayList<AppGetMetric>(); + String appsToTest = amsJmeterProperties.getProperty("apps-to-test"); + String[] apps; + + if (appsToTest != null && !appsToTest.isEmpty()) { + apps = StringUtils.split(appsToTest, ","); + } else { + apps = new String[JmeterTestPlanTask.ClientApp.values().length]; + int ctr = 0; + for (JmeterTestPlanTask.ClientApp app : JmeterTestPlanTask.ClientApp.values()) + apps[ctr++] = app.getId(); + } + + for (String app : apps) { + + int interval = Integer.valueOf(amsJmeterProperties.getProperty("get-interval")); + String intervalString = amsJmeterProperties.getProperty(app + "-get-interval"); + if (intervalString != null && !intervalString.isEmpty()) { + interval = Integer.valueOf(intervalString); + } + appGetMetrics.add(new AppGetMetric(readMetricsFromFile(app), interval, app)); + } + + return appGetMetrics; + } + + public void runTest(int numInstances) { + + int appRefreshRate = Integer.valueOf(amsJmeterProperties.getProperty("app-refresh-rate")); + for (int i = 0; i < numInstances; i++) { + ScheduledFuture future = scheduledExecutorService.scheduleAtFixedRate(new JmeterTestPlanTask(appGetMetrics, + amsJmeterProperties), 0, appRefreshRate, TimeUnit.MILLISECONDS); + } + } + + /** + * Sample Usage: + * java -cp "lib/*":ambari-metrics-timelineservice-2.1.1.0.jar org.apache.ambari.metrics + * .core.loadsimulator.jmeter.AMSJMeterLoadTest + * -t UI -p ambari-metrics-timelineservice/src/main/resources/jmeter/ams-jmeter.properties + */ + public static void main(String[] args) { + Map<String, String> mapArgs = parseArgs(args); + new AMSJMeterLoadTest(mapArgs); + } + + private static Map<String, String> parseArgs(String[] args) { + Map<String, String> mapProps = new HashMap<String, String>(); + if (args.length == 0) { + printUsage(); + throw new RuntimeException("Unexpected argument, See usage message."); + } else { + for (int i = 0; i < args.length; i += 2) { + String arg = args[i]; + mapProps.put(arg.substring(1), args[i+1]); + } + } + return mapProps; + } + + public static void printUsage() { + System.err.println("Usage: java AMSJmeterLoadTest [OPTIONS]"); + System.err.println("Options: "); + System.err.println("[--t type (S=>Sink/U=>UI)] [-ams-host-port localhost:6188] [-min-host-index 2] [-host-prefix TestHost.] [-num-hosts 2] " + + "[-create-master true] [-collection-interval 10000 ] [-send-interval 60000 ] [-p amsJmeterPropertiesFile (Optional)]"); + } + +} + + http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java new file mode 100644 index 0000000..bc6428e --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/AppGetMetric.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest; + +import java.util.List; + +public class AppGetMetric { + + private String app; + private int interval; + private List<GetMetricRequestInfo> requests; + + public AppGetMetric(List<GetMetricRequestInfo> requests, int interval, String app) { + this.setMetricRequests(requests); + this.setInterval(interval); + this.setApp(app); + } + + public List<GetMetricRequestInfo> getMetricRequests() { + return requests; + } + + public void setMetricRequests(List<GetMetricRequestInfo> requests) { + this.requests = requests; + } + + public int getInterval() { + return interval; + } + + public void setInterval(int interval) { + this.interval = interval; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java new file mode 100644 index 0000000..60ed3eb --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/GetMetricRequestInfo.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest; + +import org.apache.commons.lang.StringUtils; + +import java.util.List; + + +public class GetMetricRequestInfo { + + private String metricStringPayload; + private boolean needsTimestamps; + private boolean needsHost; + + public GetMetricRequestInfo(List<String> metrics, boolean needsTimestamps, boolean needsHost) { + + this.setMetricStringPayload(StringUtils.join(metrics, ",")); + this.setNeedsTimestamps(needsTimestamps); + this.setNeedsHost(needsHost); + } + + public String getMetricStringPayload() { + return metricStringPayload; + } + + public void setMetricStringPayload(String metricStringPayload) { + this.metricStringPayload = metricStringPayload; + } + + public boolean needsTimestamps() { + return needsTimestamps; + } + + public void setNeedsTimestamps(boolean needsTimestamps) { + this.needsTimestamps = needsTimestamps; + } + + public boolean needsHost() { + return needsHost; + } + + public void setNeedsHost(boolean needsHost) { + this.needsHost = needsHost; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java new file mode 100644 index 0000000..0590c73 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/jmetertest/jmetertest/JmeterTestPlanTask.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.loadsimulator.jmetertest.jmetertest; + +import org.apache.commons.io.IOUtils; +import org.apache.jmeter.control.LoopController; +import org.apache.jmeter.engine.StandardJMeterEngine; +import org.apache.jmeter.protocol.http.sampler.HTTPSampler; +import org.apache.jmeter.protocol.http.util.HTTPConstants; +import org.apache.jmeter.reporters.ResultCollector; +import org.apache.jmeter.reporters.Summariser; +import org.apache.jmeter.testelement.TestElement; +import org.apache.jmeter.testelement.TestPlan; +import org.apache.jmeter.threads.JMeterContextService; +import org.apache.jmeter.threads.ThreadGroup; +import org.apache.jmeter.timers.ConstantTimer; +import org.apache.jmeter.util.JMeterUtils; +import org.apache.jorphan.collections.HashTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +public class JmeterTestPlanTask implements Runnable { + + private static StandardJMeterEngine jmeterEngine = null; + private final static Logger LOG = LoggerFactory.getLogger(JmeterTestPlanTask.class); + private List<AppGetMetric> appGetMetrics; + private Properties amsJmeterProperties; + private HashTree amsTestPlanTree; + private TestPlan amsTestPlan; + private static final String JMETER_HOME = "loadsimulator"; + private static final String JMETER_PROPERTIES_FILE = JMETER_HOME + "/jmeter.properties"; + private static final String SAVESERVICE_PROPERTIES_FILE = JMETER_HOME + "/saveservice.properties"; + + public enum ClientApp { + HOST("HOST"), + NAMENODE("NAMENODE"), + HBASE("HBASE"), + NIMBUS("NIMBUS"), + KAFKA_BROKER("KAFKA_BROKER"), + FLUME_HANDLER("FLUME_HANDLER"), + AMS_HBASE("AMS-HBASE"), + NODEMANAGER("NODEMANAGER"), + RESOURCEMANAGER("RESOURCEMANAGER"), + DATANODE("DATANODE"); + + private String id; + + private ClientApp(String id) { + this.id = id; + } + + public String getId() { + return id; + } + } + + public JmeterTestPlanTask(List<AppGetMetric> appGetMetrics, Properties amsJmeterProperties) { + this.appGetMetrics = appGetMetrics; + this.amsJmeterProperties = amsJmeterProperties; + amsTestPlanTree = new HashTree(); + amsTestPlan = new TestPlan("AMS JMeter Load Test plan"); + System.out.println("Starting AMS Jmeter load testing"); + } + + public void run() { + if (jmeterEngine != null) { + + Object[] threadGroups = amsTestPlanTree.getArray(amsTestPlan); + for (Object threadGroupObj : threadGroups) { + if (threadGroupObj instanceof ThreadGroup) { + ThreadGroup threadGroup = (ThreadGroup) threadGroupObj; + threadGroup.stop(); + } + } + amsTestPlanTree.clear(); + jmeterEngine.askThreadsToStop(); + jmeterEngine.stopTest(); + JMeterContextService.endTest(); + } + + //Start the new test plan for the new app. + try { + //Initialize Jmeter essentials + jmeterEngine = new StandardJMeterEngine(); + JMeterContextService.getContext().setEngine(jmeterEngine); + + //Workaround to supply JMeterUtils with jmeter.prooperties from JAR. + JMeterUtils.setJMeterHome(""); + Field f = new JMeterUtils().getClass().getDeclaredField("appProperties"); + f.setAccessible(true); + f.set(null, AMSJMeterLoadTest.readProperties(JMETER_PROPERTIES_FILE)); + + //Copy saveservices.properties file to tmp dir for JMeter to consume. + InputStream inputStream = ClassLoader.getSystemResourceAsStream(SAVESERVICE_PROPERTIES_FILE); + if (inputStream == null) { + inputStream = new FileInputStream(SAVESERVICE_PROPERTIES_FILE); + } + String tmpDir = System.getProperty("java.io.tmpdir"); + OutputStream outputStream = new FileOutputStream(tmpDir + "/saveservice.properties"); + IOUtils.copy(inputStream, outputStream); + outputStream.close(); + JMeterUtils.setProperty("saveservice_properties", tmpDir + "/saveservice.properties"); + + //Initialize Test plan + amsTestPlan.setProperty(TestElement.TEST_CLASS, TestPlan.class.getName()); + amsTestPlanTree.add("AMS Test plan", amsTestPlan); + + //Choose a random APP to run the perform GET metrics request. + int currentAppIndex = new Random().nextInt(appGetMetrics.size()); + + //Create ThreadGroup for the App + createThreadGroupHashTree(currentAppIndex, amsJmeterProperties, amsTestPlanTree, amsTestPlan); + + //Geneates the JMX file that you can use through the GUI mode. + //SaveService.saveTree(amsTestPlanTree, new FileOutputStream(JMETER_HOME + "/" + "amsTestPlan.jmx")); + + //Summarizer output to get test progress in stdout like. + Summariser summariser = null; + String summariserName = JMeterUtils.getPropDefault("summariser.name", "summary"); + if (summariserName.length() > 0) { + summariser = new Summariser(summariserName); + } + + //Store execution results into a .jtl file + String jmeterLogFile = tmpDir + "/amsJmeterTestResults.jtl"; + ResultCollector resultCollector = new ResultCollector(summariser); + resultCollector.setFilename(jmeterLogFile); + amsTestPlanTree.add(amsTestPlanTree.getArray()[0], resultCollector); + jmeterEngine.configure(amsTestPlanTree); + jmeterEngine.run(); + + LOG.info("AMS Jmeter Test started up successfully"); + + } catch (Exception ioEx) { + amsTestPlanTree.clear(); + jmeterEngine.askThreadsToStop(); + jmeterEngine.stopTest(); + JMeterContextService.endTest(); + LOG.error("Error occurred while running AMS load test : " + ioEx.getMessage()); + ioEx.printStackTrace(); + } + } + + private ConstantTimer createConstantTimer(int delay) { + ConstantTimer timer = new ConstantTimer(); + timer.setDelay("" + delay); + return timer; + } + + private Map<String, String> getAppSpecificParameters(String app, GetMetricRequestInfo request, Properties amsJmeterProperties) { + + Map<String, String> parametersMap = new HashMap<String, String>(); + String hostPrefix = amsJmeterProperties.getProperty("host-prefix"); + String hostSuffix = amsJmeterProperties.getProperty("host-suffix"); + int minHostIndex = Integer.valueOf(amsJmeterProperties.getProperty("min-host-index")); + int numHosts = Integer.valueOf(amsJmeterProperties.getProperty("num-hosts")); + + parametersMap.put("appId", app); + + if (request.needsTimestamps()) { + long currentTime = System.currentTimeMillis(); + long oneHourBack = currentTime - 3600 * 1000; + parametersMap.put("startTime", String.valueOf(oneHourBack)); + parametersMap.put("endTime", String.valueOf(currentTime)); + } + + if (request.needsHost()) { + if (ClientApp.AMS_HBASE.getId().equals(app)) { + parametersMap.put("hostname", amsJmeterProperties.getProperty("ams-host")); + } else if (ClientApp.HOST.getId().equals(app) || ClientApp.NODEMANAGER.getId().equals(app)) { + int randomHost = minHostIndex + new Random().nextInt(numHosts); + parametersMap.put("hostname", hostPrefix + randomHost + hostSuffix); + } else { + parametersMap.put("hostname", hostPrefix + amsJmeterProperties.getProperty(app + "-host") + hostSuffix); + } + } + parametersMap.put("metricNames", request.getMetricStringPayload()); + return parametersMap; + } + + private void createThreadGroupHashTree(int appIndex, Properties amsJmeterProperties, HashTree amsTestPlanTree, TestPlan amsTestPlan) { + + AppGetMetric appGetMetric = appGetMetrics.get(appIndex); + String app = appGetMetric.getApp(); + int interval = appGetMetric.getInterval(); + + //Read and validate AMS information. + String[] amsHostPort = amsJmeterProperties.getProperty("ams-host-port").split(":"); + String amsHost = amsHostPort[0]; + String amsPath = amsJmeterProperties.getProperty("ams-path"); + int amsPort = Integer.valueOf(amsHostPort[1]); + int numLoops = Integer.valueOf(amsJmeterProperties.getProperty("num-get-calls-per-app")); + + LoopController loopController = createLoopController(app + " GET loop controller", numLoops, false); + for (GetMetricRequestInfo request : appGetMetric.getMetricRequests()) { + + ThreadGroup threadGroup = createThreadGroup(app + " GET threadGroup", 1, 0, loopController); + + HashTree threadGroupHashTree = amsTestPlanTree.add(amsTestPlan, threadGroup); + Map<String, String> parametersMap = getAppSpecificParameters(app, request, amsJmeterProperties); + + HTTPSampler sampler = createGetSampler("GET " + app + " metrics", amsHost, amsPort, amsPath, null, parametersMap); + + if (numLoops > 1) { + threadGroupHashTree.add(createConstantTimer(interval)); + } + + threadGroupHashTree.add(sampler); + } + } + + private HTTPSampler createGetSampler(String name, String domain, int port, String path, String encoding, Map<String, String> parameters) { + + HTTPSampler sampler = new HTTPSampler(); + sampler.setDomain(domain); + sampler.setPort(port); + sampler.setPath(path); + sampler.setMethod(HTTPConstants.GET); + + if (encoding != null) + sampler.setContentEncoding(encoding); + + for (Map.Entry<String, String> entry : parameters.entrySet()) { + sampler.addArgument(entry.getKey(), entry.getValue()); + } + sampler.setName(name); + return sampler; + } + + private LoopController createLoopController(String name, int numLoops, boolean continueForever) { + LoopController loopController = new LoopController(); + loopController.setLoops(numLoops); + loopController.setProperty(TestElement.TEST_CLASS, LoopController.class.getName()); + loopController.initialize(); + loopController.setContinueForever(continueForever); + loopController.setName(name); + return loopController; + } + + private ThreadGroup createThreadGroup(String name, int numThreads, int rampUp, LoopController loopController) { + ThreadGroup threadGroup = new ThreadGroup(); + threadGroup.setName(name); + threadGroup.setNumThreads(numThreads); + threadGroup.setRampUp(rampUp); + threadGroup.setSamplerController(loopController); + threadGroup.setProperty(TestElement.TEST_CLASS, ThreadGroup.class.getName()); + return threadGroup; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java new file mode 100644 index 0000000..9c8e641 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestRestMetricsSender.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.loadsimulator.net; + +import org.junit.Test; + +import java.io.IOException; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.assertEquals; + +public class TestRestMetricsSender { + + @Test + public void testPushMetrics() throws Exception { + final UrlService svcMock = createStrictMock(UrlService.class); + final String payload = "test"; + final String expectedResponse = "mockResponse"; + + expect(svcMock.send(anyString())).andReturn(expectedResponse); + svcMock.disconnect(); + expectLastCall(); + + replay(svcMock); + + RestMetricsSender sender = new RestMetricsSender("expectedHostName") { + @Override + protected UrlService getConnectedUrlService() throws IOException { + return svcMock; + } + }; + String response = sender.pushMetrics(payload); + + verify(svcMock); + assertEquals("", expectedResponse, response); + } + + @Test + public void testPushMetricsFailed() throws Exception { + final UrlService svcMock = createStrictMock(UrlService.class); + final String payload = "test"; + final String expectedResponse = "mockResponse"; + RestMetricsSender sender = new RestMetricsSender("expectedHostName") { + @Override + protected UrlService getConnectedUrlService() throws IOException { + return svcMock; + } + }; + + expect(svcMock.send(anyString())).andThrow(new IOException()); + svcMock.disconnect(); + expectLastCall(); + + replay(svcMock); + + String response = sender.pushMetrics(payload); + + verify(svcMock); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java new file mode 100644 index 0000000..29ebda4 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/net/TestStdOutMetricsSender.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.loadsimulator.net; + + +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +public class TestStdOutMetricsSender { + + @Test + public void testPushMetrics() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(baos); + StdOutMetricsSender sender = new StdOutMetricsSender("expectedHostName", out); + sender.pushMetrics("test"); + + System.out.println(baos.toString()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java new file mode 100644 index 0000000..a1801b0 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestRandomMetricsProvider.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.core.loadsimulator.util; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class TestRandomMetricsProvider { + + @Test + public void testReturnSingle() { + double from = 5.25; + double to = 5.40; + RandomMetricsProvider provider = new RandomMetricsProvider(from, to); + double metric = provider.next(); + + assertTrue("Generated metric should be in range", from < metric && metric < to); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java new file mode 100644 index 0000000..9011e75 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/loadsimulator/util/TestTimeStampProvider.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.loadsimulator.util; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; + +public class TestTimeStampProvider { + + @Test + public void testReturnSingle() { + long startTime = 1411663170112L; + int timeStep = 5000; + TimeStampProvider tm = new TimeStampProvider(startTime, timeStep, 0); + + long tStamp = tm.next(); + + assertEquals("First generated timestamp should match starttime", startTime, tStamp); + } + + @Test + public void testReturnTstampsForSendInterval() throws Exception { + long startTime = 0; + int collectInterval = 5; + int sendInterval = 30; + TimeStampProvider tsp = new TimeStampProvider(startTime, collectInterval, sendInterval); + + long[] timestamps = tsp.timestampsForNextInterval(); + + assertThat(timestamps) + .hasSize(6) + .containsOnly(0, 5, 10, 15, 20, 25); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java new file mode 100644 index 0000000..0553d4c --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractMiniHBaseClusterTest.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; +import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.tearDownMiniCluster; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.ambari.metrics.core.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.phoenix.hbase.index.write.IndexWriterUtils; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public abstract class AbstractMiniHBaseClusterTest extends BaseTest { + + protected static final long BATCH_SIZE = 3; + protected Connection conn; + protected PhoenixHBaseAccessor hdb; + protected TimelineMetricMetadataManager metadataManager; + + public final Log LOG; + + public AbstractMiniHBaseClusterTest() { + LOG = LogFactory.getLog(this.getClass()); + } + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = getDefaultProps(); + props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, "false"); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); + props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); + // Make a small batch size to test multiple calls to reserve sequences + props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE)); + // Must update config before starting server + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @AfterClass + public static void doTeardown() throws Exception { + dropNonSystemTables(); + tearDownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + Logger.getLogger("org.apache.ambari.metrics.core.timeline").setLevel(Level.DEBUG); + hdb = createTestableHBaseAccessor(); + // inits connection, starts mini cluster + conn = getConnection(getUrl()); + + hdb.initMetricSchema(); + metadataManager = new TimelineMetricMetadataManager(new Configuration(), hdb); + hdb.setMetadataInstance(metadataManager); + } + + private void deleteTableIgnoringExceptions(Statement stmt, String tableName) { + try { + stmt.execute("delete from " + tableName); + } catch (Exception e) { + LOG.warn("Exception on delete table " + tableName, e); + } + } + + @After + public void tearDown() { + Connection conn = null; + Statement stmt = null; + try { + conn = getConnection(getUrl()); + stmt = conn.createStatement(); + + deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE"); + deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE_MINUTE"); + deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE_HOURLY"); + deleteTableIgnoringExceptions(stmt, "METRIC_AGGREGATE_DAILY"); + deleteTableIgnoringExceptions(stmt, "METRIC_RECORD"); + deleteTableIgnoringExceptions(stmt, "METRIC_RECORD_MINUTE"); + deleteTableIgnoringExceptions(stmt, "METRIC_RECORD_HOURLY"); + deleteTableIgnoringExceptions(stmt, "METRIC_RECORD_DAILY"); + deleteTableIgnoringExceptions(stmt, "METRICS_METADATA"); + deleteTableIgnoringExceptions(stmt, "HOSTED_APPS_METADATA"); + + conn.commit(); + } catch (Exception e) { + LOG.warn("Error on deleting HBase schema.", e); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + // Ignore + } + } + } + } + + public static Map<String, String> getDefaultProps() { + Map<String, String> props = new HashMap<String, String>(); + // Must update config before starting server + props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + Boolean.FALSE.toString()); + props.put("java.security.krb5.realm", ""); + props.put("java.security.krb5.kdc", ""); + return props; + } + + protected Connection getConnection(String url) throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + return conn; + } + + /** + * A canary test. Will show if the infrastructure is set-up correctly. + */ + @Test + public void testClusterOK() throws Exception { + Connection conn = getConnection(getUrl()); + conn.setAutoCommit(true); + + String sampleDDL = "CREATE TABLE TEST_METRICS " + + "(TEST_COLUMN VARCHAR " + + "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, " + + "TTL=86400, COMPRESSION='NONE' "; + + Statement stmt = conn.createStatement(); + stmt.executeUpdate(sampleDDL); + conn.commit(); + + ResultSet rs = stmt.executeQuery( + "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS"); + + rs.next(); + long l = rs.getLong(1); + assertThat(l).isGreaterThanOrEqualTo(0); + + stmt.execute("DROP TABLE TEST_METRICS"); + conn.close(); + } + + protected PhoenixHBaseAccessor createTestableHBaseAccessor() { + Configuration metricsConf = new Configuration(); + metricsConf.set(TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE"); + // Unit tests insert values into the future + metricsConf.setLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, 600000); + + return + new PhoenixHBaseAccessor(new TimelineMetricConfiguration(new Configuration(), metricsConf), + new PhoenixConnectionProvider() { + @Override + public Admin getHBaseAdmin() throws IOException { + try { + return driver.getConnectionQueryServices(null, null).getAdmin(); + } catch (SQLException e) { + LOG.error(e); + } + return null; + } + + @Override + public Connection getConnection() { + Connection connection = null; + try { + connection = DriverManager.getConnection(getUrl()); + } catch (SQLException e) { + LOG.warn("Unable to connect to HBase store using Phoenix.", e); + } + return connection; + } + + }); + } + + protected void insertMetricRecords(Connection conn, TimelineMetrics metrics) + throws SQLException, IOException { + + List<TimelineMetric> timelineMetrics = metrics.getMetrics(); + if (timelineMetrics == null || timelineMetrics.isEmpty()) { + LOG.debug("Empty metrics insert request."); + return; + } + + PreparedStatement metricRecordStmt = null; + + try { + metricRecordStmt = conn.prepareStatement(String.format( + UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME)); + + for (TimelineMetric metric : timelineMetrics) { + metricRecordStmt.clearParameters(); + + if (LOG.isTraceEnabled()) { + LOG.trace("host: " + metric.getHostName() + ", " + + "metricName = " + metric.getMetricName() + ", " + + "values: " + metric.getMetricValues()); + } + double[] aggregates = AggregatorUtils.calculateAggregates( + metric.getMetricValues()); + + byte[] uuid = metadataManager.getUuid(metric); + if (uuid == null) { + LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString()); + continue; + } + metricRecordStmt.setBytes(1, uuid); + metricRecordStmt.setLong(2, metric.getStartTime()); + metricRecordStmt.setDouble(3, aggregates[0]); + metricRecordStmt.setDouble(4, aggregates[1]); + metricRecordStmt.setDouble(5, aggregates[2]); + metricRecordStmt.setInt(6, (int) aggregates[3]); + String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); + metricRecordStmt.setString(7, json); + + try { + int row = metricRecordStmt.executeUpdate(); + LOG.info("Inserted " + row + " rows."); + } catch (SQLException sql) { + LOG.error(sql); + } + } + + conn.commit(); + + } finally { + if (metricRecordStmt != null) { + try { + metricRecordStmt.close(); + } catch (SQLException e) { + // Ignore + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java new file mode 100644 index 0000000..d3fc50f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/AbstractPhoenixConnectionlessTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; +import org.apache.phoenix.jdbc.PhoenixTestDriver; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public abstract class AbstractPhoenixConnectionlessTest extends BaseTest { + + protected static String getUrl() { + return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL; + } + + protected static String getUrl(String tenantId) { + return getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId; + } + + protected static PhoenixTestDriver driver; + + private static void startServer(String url) throws Exception { + assertNull(driver); + // only load the test driver if we are testing locally - for integration tests, we want to + // test on a wider scale + if (PhoenixEmbeddedDriver.isTestUrl(url)) { + driver = initDriver(ReadOnlyProps.EMPTY_PROPS); + assertTrue(DriverManager.getDriver(url) == driver); + driver.connect(url, PropertiesUtil.deepCopy(TEST_PROPERTIES)); + } + } + + protected static synchronized PhoenixTestDriver initDriver(ReadOnlyProps props) throws Exception { + if (driver == null) { + driver = new PhoenixTestDriver(props); + DriverManager.registerDriver(driver); + } + return driver; + } + + private String connUrl; + + @Before + public void setup() throws Exception { + connUrl = getUrl(); + startServer(connUrl); + } + + @Test + public void testStorageSystemInitialized() throws Exception { + String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " + + "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) DATA_BLOCK_ENCODING='FAST_DIFF', " + + "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'"; + + Connection conn = null; + PreparedStatement stmt = null; + try { + conn = DriverManager.getConnection(connUrl); + stmt = conn.prepareStatement(sampleDDL); + stmt.execute(); + conn.commit(); + } finally { + if (stmt != null) { + stmt.close(); + } + if (conn != null) { + conn.close(); + } + } + } + + @After + public void tearDown() throws Exception { + if (driver != null) { + try { + driver.close(); + } finally { + PhoenixTestDriver phoenixTestDriver = driver; + driver = null; + DriverManager.deregisterDriver(phoenixTestDriver); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java new file mode 100644 index 0000000..03e39f7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/FunctionTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import org.apache.ambari.metrics.core.timeline.aggregators.Function; +import org.junit.Ignore; +import org.junit.Test; + +import static org.apache.ambari.metrics.core.timeline.aggregators.Function.fromMetricName; +import static org.apache.ambari.metrics.core.timeline.aggregators.Function.ReadFunction.AVG; +import static org.apache.ambari.metrics.core.timeline.aggregators.Function.PostProcessingFunction.RATE; +import static org.apache.ambari.metrics.core.timeline.aggregators.Function.PostProcessingFunction.DIFF; +import static org.assertj.core.api.Assertions.assertThat; + +public class FunctionTest { + + @Test + public void testCreation() throws Exception { + Function f = fromMetricName("Metric._avg"); + assertThat(f).isEqualTo(new Function(AVG, null)); + + f = fromMetricName("Metric._rate._avg"); + assertThat(f).isEqualTo(new Function(AVG, RATE)); + + f = fromMetricName("bytes_in"); + assertThat(f).isEqualTo(Function.DEFAULT_VALUE_FUNCTION); + + // Rate support without aggregates + f = fromMetricName("Metric._rate"); + assertThat(f).isEqualTo(new Function(null, RATE)); + + // Diff support + f = fromMetricName("Metric._diff._avg"); + assertThat(f).isEqualTo(new Function(AVG, DIFF)); + + // Diff support without aggregates + f = fromMetricName("Metric._diff"); + assertThat(f).isEqualTo(new Function(null, DIFF)); + + } + + @Ignore // If unknown function: behavior is best effort query without function + @Test(expected = Function.FunctionFormatException.class) + public void testNotAFunction() throws Exception { + fromMetricName("bytes._not._afunction"); + } +}
