http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java new file mode 100644 index 0000000..57a6f34 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype; + +import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.adservice.prototype.core.MetricsCollectorInterface; +import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker; +import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.List; +import java.util.TreeMap; + +public class TestTukeys { + + @BeforeClass + public static void init() throws URISyntaxException { + Assume.assumeTrue(System.getenv("R_HOME") != null); + } + + @Test + public void testPointInTimeDetectionSystem() throws UnknownHostException, URISyntaxException { + + URL url = ClassLoader.getSystemResource("R-scripts"); + String fullFilePath = new File(url.toURI()).getAbsolutePath(); + RFunctionInvoker.setScriptsDir(fullFilePath); + + MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface("avijayan-ams-1.openstacklocal","http", "6188"); + + EmaTechnique ema = new EmaTechnique(0.5, 3); + long now = System.currentTimeMillis(); + + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("mm9"); + metric1.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); + metric1.setStartTime(now); + metric1.setAppId("aa9"); + metric1.setInstanceId(null); + metric1.setType("Integer"); + + //Train + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + + //2hr data. + for (int i = 0; i < 120; i++) { + double metric = 20000 + Math.random(); + metricValues.put(now - i * 60 * 1000, metric); + } + metric1.setMetricValues(metricValues); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.addOrMergeTimelineMetric(metric1); + + metricsCollectorInterface.emitMetrics(timelineMetrics); + + List<MetricAnomaly> anomalyList = ema.test(metric1); + metricsCollectorInterface.publish(anomalyList); +// +// PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(ema, metricsCollectorInterface, 3, 5*60*1000, 15*60*1000); +// pointInTimeADSystem.runOnce(); +// +// List<MetricAnomaly> anomalyList2 = ema.test(metric1); +// +// pointInTimeADSystem.runOnce(); +// List<MetricAnomaly> anomalyList3 = ema.test(metric1); +// +// pointInTimeADSystem.runOnce(); +// List<MetricAnomaly> anomalyList4 = ema.test(metric1); +// +// pointInTimeADSystem.runOnce(); +// List<MetricAnomaly> anomalyList5 = ema.test(metric1); +// +// pointInTimeADSystem.runOnce(); +// List<MetricAnomaly> anomalyList6 = ema.test(metric1); +// +// Assert.assertTrue(anomalyList6.size() < anomalyList.size()); + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/AbstractMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/AbstractMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/AbstractMetricSeries.java new file mode 100644 index 0000000..635a929 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/AbstractMetricSeries.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.seriesgenerator; + +public interface AbstractMetricSeries { + + public double nextValue(); + public double[] getSeries(int n); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/DualBandMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/DualBandMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/DualBandMetricSeries.java new file mode 100644 index 0000000..a9e3f30 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/DualBandMetricSeries.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.seriesgenerator; + +import java.util.Random; + +public class DualBandMetricSeries implements AbstractMetricSeries { + + double lowBandValue = 0.0; + double lowBandDeviationPercentage = 0.0; + int lowBandPeriodSize = 10; + double highBandValue = 1.0; + double highBandDeviationPercentage = 0.0; + int highBandPeriodSize = 10; + + Random random = new Random(); + double lowBandValueLowerLimit, lowBandValueHigherLimit; + double highBandLowerLimit, highBandUpperLimit; + int l = 0, h = 0; + + public DualBandMetricSeries(double lowBandValue, + double lowBandDeviationPercentage, + int lowBandPeriodSize, + double highBandValue, + double highBandDeviationPercentage, + int highBandPeriodSize) { + this.lowBandValue = lowBandValue; + this.lowBandDeviationPercentage = lowBandDeviationPercentage; + this.lowBandPeriodSize = lowBandPeriodSize; + this.highBandValue = highBandValue; + this.highBandDeviationPercentage = highBandDeviationPercentage; + this.highBandPeriodSize = highBandPeriodSize; + init(); + } + + private void init() { + lowBandValueLowerLimit = lowBandValue - lowBandDeviationPercentage * lowBandValue; + lowBandValueHigherLimit = lowBandValue + lowBandDeviationPercentage * lowBandValue; + highBandLowerLimit = highBandValue - highBandDeviationPercentage * highBandValue; + highBandUpperLimit = highBandValue + highBandDeviationPercentage * highBandValue; + } + + @Override + public double nextValue() { + + double value = 0.0; + + if (l < lowBandPeriodSize) { + value = lowBandValueLowerLimit + (lowBandValueHigherLimit - lowBandValueLowerLimit) * random.nextDouble(); + l++; + } else if (h < highBandPeriodSize) { + value = highBandLowerLimit + (highBandUpperLimit - highBandLowerLimit) * random.nextDouble(); + h++; + } + + if (l == lowBandPeriodSize && h == highBandPeriodSize) { + l = 0; + h = 0; + } + + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MetricSeriesGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MetricSeriesGeneratorFactory.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MetricSeriesGeneratorFactory.java new file mode 100644 index 0000000..a50b433 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MetricSeriesGeneratorFactory.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.seriesgenerator; + +import java.util.Map; + +public class MetricSeriesGeneratorFactory { + + /** + * Return a normally distributed data series with some deviation % and outliers. + * + * @param n size of the data series + * @param value The value around which the uniform data series is centered on. + * @param deviationPercentage The allowed deviation % on either side of the uniform value. For example, if value = 10, and deviation % is 0.1, the series values lie between 0.9 to 1.1. + * @param outlierProbability The probability of finding an outlier in the series. + * @param outlierDeviationLowerPercentage min percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and 13. + * @param outlierDeviationHigherPercentage max percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and 16. + * @param outliersAboveValue Outlier should be greater or smaller than the value. + * @return uniform series + */ + public static double[] createUniformSeries(int n, + double value, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + + UniformMetricSeries metricSeries = new UniformMetricSeries(value, + deviationPercentage, + outlierProbability, + outlierDeviationLowerPercentage, + outlierDeviationHigherPercentage, + outliersAboveValue); + + return metricSeries.getSeries(n); + } + + + /** + * /** + * Returns a normally distributed series. + * + * @param n size of the data series + * @param mean mean of the distribution + * @param sd sd of the distribution + * @param outlierProbability sd of the distribution + * @param outlierDeviationSDTimesLower Lower Limit of the outlier with respect to times sdev from the mean. + * @param outlierDeviationSDTimesHigher Higher Limit of the outlier with respect to times sdev from the mean. + * @param outlierOnRightEnd Outlier should be on the right end or the left end. + * @return normal series + */ + public static double[] createNormalSeries(int n, + double mean, + double sd, + double outlierProbability, + double outlierDeviationSDTimesLower, + double outlierDeviationSDTimesHigher, + boolean outlierOnRightEnd) { + + + NormalMetricSeries metricSeries = new NormalMetricSeries(mean, + sd, + outlierProbability, + outlierDeviationSDTimesLower, + outlierDeviationSDTimesHigher, + outlierOnRightEnd); + + return metricSeries.getSeries(n); + } + + + /** + * Returns a monotonically increasing / decreasing series + * + * @param n size of the data series + * @param startValue Start value of the monotonic sequence + * @param slope direction of monotonicity m > 0 for increasing and m < 0 for decreasing. + * @param deviationPercentage The allowed deviation % on either side of the current 'y' value. For example, if current value = 10 according to slope, and deviation % is 0.1, the series values lie between 0.9 to 1.1. + * @param outlierProbability The probability of finding an outlier in the series. + * @param outlierDeviationLowerPercentage min percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and 13. + * @param outlierDeviationHigherPercentage max percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and 16. + * @param outliersAboveValue Outlier should be greater or smaller than the 'y' value. + * @return + */ + public static double[] createMonotonicSeries(int n, + double startValue, + double slope, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + + MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(startValue, + slope, + deviationPercentage, + outlierProbability, + outlierDeviationLowerPercentage, + outlierDeviationHigherPercentage, + outliersAboveValue); + + return metricSeries.getSeries(n); + } + + + /** + * Returns a dual band series (lower and higher) + * + * @param n size of the data series + * @param lowBandValue lower band value + * @param lowBandDeviationPercentage lower band deviation + * @param lowBandPeriodSize lower band + * @param highBandValue high band centre value + * @param highBandDeviationPercentage high band deviation. + * @param highBandPeriodSize high band size + * @return + */ + public static double[] getDualBandSeries(int n, + double lowBandValue, + double lowBandDeviationPercentage, + int lowBandPeriodSize, + double highBandValue, + double highBandDeviationPercentage, + int highBandPeriodSize) { + + DualBandMetricSeries metricSeries = new DualBandMetricSeries(lowBandValue, + lowBandDeviationPercentage, + lowBandPeriodSize, + highBandValue, + highBandDeviationPercentage, + highBandPeriodSize); + + return metricSeries.getSeries(n); + } + + /** + * Returns a step function series. + * + * @param n size of the data series + * @param startValue start steady value + * @param steadyValueDeviationPercentage required devation in the steady state value + * @param steadyPeriodSlope direction of monotonicity m > 0 for increasing and m < 0 for decreasing, m = 0 no increase or decrease. + * @param steadyPeriodMinSize min size for step period + * @param steadyPeriodMaxSize max size for step period. + * @param stepChangePercentage Increase / decrease in steady state to denote a step in terms of deviation percentage from the last value. + * @param upwardStep upward or downward step. + * @return + */ + public static double[] getStepFunctionSeries(int n, + double startValue, + double steadyValueDeviationPercentage, + double steadyPeriodSlope, + int steadyPeriodMinSize, + int steadyPeriodMaxSize, + double stepChangePercentage, + boolean upwardStep) { + + StepFunctionMetricSeries metricSeries = new StepFunctionMetricSeries(startValue, + steadyValueDeviationPercentage, + steadyPeriodSlope, + steadyPeriodMinSize, + steadyPeriodMaxSize, + stepChangePercentage, + upwardStep); + + return metricSeries.getSeries(n); + } + + /** + * Series with small period of turbulence and then back to steady. + * + * @param n size of the data series + * @param steadyStateValue steady state center value + * @param steadyStateDeviationPercentage steady state deviation in percentage + * @param turbulentPeriodDeviationLowerPercentage turbulent state lower limit in terms of percentage from centre value. + * @param turbulentPeriodDeviationHigherPercentage turbulent state higher limit in terms of percentage from centre value. + * @param turbulentPeriodLength turbulent period length (number of points) + * @param turbulentStatePosition Where the turbulent state should be 0 - at the beginning, 1 - in the middle (25% - 50% of the series), 2 - at the end of the series. + * @return + */ + public static double[] getSteadySeriesWithTurbulentPeriod(int n, + double steadyStateValue, + double steadyStateDeviationPercentage, + double turbulentPeriodDeviationLowerPercentage, + double turbulentPeriodDeviationHigherPercentage, + int turbulentPeriodLength, + int turbulentStatePosition + ) { + + + SteadyWithTurbulenceMetricSeries metricSeries = new SteadyWithTurbulenceMetricSeries(n, + steadyStateValue, + steadyStateDeviationPercentage, + turbulentPeriodDeviationLowerPercentage, + turbulentPeriodDeviationHigherPercentage, + turbulentPeriodLength, + turbulentStatePosition); + + return metricSeries.getSeries(n); + } + + + public static double[] generateSeries(String type, int n, Map<String, String> configs) { + + double[] series; + switch (type) { + + case "normal": + series = createNormalSeries(n, + Double.parseDouble(configs.getOrDefault("mean", "0")), + Double.parseDouble(configs.getOrDefault("sd", "1")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")), + Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true"))); + break; + + case "uniform": + series = createUniformSeries(n, + Double.parseDouble(configs.getOrDefault("value", "10")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "monotonic": + series = createMonotonicSeries(n, + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("slope", "0")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "dualband": + series = getDualBandSeries(n, + Double.parseDouble(configs.getOrDefault("lowBandValue", "10")), + Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")), + Double.parseDouble(configs.getOrDefault("highBandValue", "10")), + Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0"))); + break; + + case "step": + series = getStepFunctionSeries(n, + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")), + Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true"))); + break; + + case "turbulence": + series = getSteadySeriesWithTurbulentPeriod(n, + Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")), + Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")), + Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0"))); + break; + + default: + series = createNormalSeries(n, + 0, + 1, + 0, + 0, + 0, + true); + } + return series; + } + + public static AbstractMetricSeries generateSeries(String type, Map<String, String> configs) { + + AbstractMetricSeries series; + switch (type) { + + case "normal": + series = new NormalMetricSeries(Double.parseDouble(configs.getOrDefault("mean", "0")), + Double.parseDouble(configs.getOrDefault("sd", "1")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")), + Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true"))); + break; + + case "uniform": + series = new UniformMetricSeries( + Double.parseDouble(configs.getOrDefault("value", "10")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "monotonic": + series = new MonotonicMetricSeries( + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("slope", "0")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "dualband": + series = new DualBandMetricSeries( + Double.parseDouble(configs.getOrDefault("lowBandValue", "10")), + Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")), + Double.parseDouble(configs.getOrDefault("highBandValue", "10")), + Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0"))); + break; + + case "step": + series = new StepFunctionMetricSeries( + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")), + Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true"))); + break; + + case "turbulence": + series = new SteadyWithTurbulenceMetricSeries( + Integer.parseInt(configs.getOrDefault("approxSeriesLength", "100")), + Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")), + Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")), + Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0"))); + break; + + default: + series = new NormalMetricSeries(0, + 1, + 0, + 0, + 0, + true); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MetricSeriesGeneratorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MetricSeriesGeneratorTest.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MetricSeriesGeneratorTest.java new file mode 100644 index 0000000..03537e4 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MetricSeriesGeneratorTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.seriesgenerator; + +import org.junit.Assert; +import org.junit.Test; + +public class MetricSeriesGeneratorTest { + + @Test + public void testUniformSeries() { + + UniformMetricSeries metricSeries = new UniformMetricSeries(5, 0.2, 0, 0, 0, true); + Assert.assertTrue(metricSeries.nextValue() <= 6 && metricSeries.nextValue() >= 4); + + double[] uniformSeries = MetricSeriesGeneratorFactory.createUniformSeries(50, 10, 0.2, 0.1, 0.4, 0.5, true); + Assert.assertTrue(uniformSeries.length == 50); + + for (int i = 0; i < uniformSeries.length; i++) { + double value = uniformSeries[i]; + + if (value > 10 * 1.2) { + Assert.assertTrue(value >= 10 * 1.4 && value <= 10 * 1.6); + } else { + Assert.assertTrue(value >= 10 * 0.8 && value <= 10 * 1.2); + } + } + } + + @Test + public void testNormalSeries() { + NormalMetricSeries metricSeries = new NormalMetricSeries(0, 1, 0, 0, 0, true); + Assert.assertTrue(metricSeries.nextValue() <= 3 && metricSeries.nextValue() >= -3); + } + + @Test + public void testMonotonicSeries() { + + MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(0, 0.5, 0, 0, 0, 0, true); + Assert.assertTrue(metricSeries.nextValue() == 0); + Assert.assertTrue(metricSeries.nextValue() == 0.5); + + double[] incSeries = MetricSeriesGeneratorFactory.createMonotonicSeries(20, 0, 0.5, 0, 0, 0, 0, true); + Assert.assertTrue(incSeries.length == 20); + for (int i = 0; i < incSeries.length; i++) { + Assert.assertTrue(incSeries[i] == i * 0.5); + } + } + + @Test + public void testDualBandSeries() { + double[] dualBandSeries = MetricSeriesGeneratorFactory.getDualBandSeries(30, 5, 0.2, 5, 15, 0.3, 4); + Assert.assertTrue(dualBandSeries[0] >= 4 && dualBandSeries[0] <= 6); + Assert.assertTrue(dualBandSeries[4] >= 4 && dualBandSeries[4] <= 6); + Assert.assertTrue(dualBandSeries[5] >= 10.5 && dualBandSeries[5] <= 19.5); + Assert.assertTrue(dualBandSeries[8] >= 10.5 && dualBandSeries[8] <= 19.5); + Assert.assertTrue(dualBandSeries[9] >= 4 && dualBandSeries[9] <= 6); + } + + @Test + public void testStepSeries() { + double[] stepSeries = MetricSeriesGeneratorFactory.getStepFunctionSeries(30, 10, 0, 0, 5, 5, 0.5, true); + + Assert.assertTrue(stepSeries[0] == 10); + Assert.assertTrue(stepSeries[4] == 10); + + Assert.assertTrue(stepSeries[5] == 10*1.5); + Assert.assertTrue(stepSeries[9] == 10*1.5); + + Assert.assertTrue(stepSeries[10] == 10*1.5*1.5); + Assert.assertTrue(stepSeries[14] == 10*1.5*1.5); + } + + @Test + public void testSteadySeriesWithTurbulence() { + double[] steadySeriesWithTurbulence = MetricSeriesGeneratorFactory.getSteadySeriesWithTurbulentPeriod(30, 5, 0, 1, 1, 5, 1); + + int count = 0; + for (int i = 0; i < steadySeriesWithTurbulence.length; i++) { + if (steadySeriesWithTurbulence[i] == 10) { + count++; + } + } + Assert.assertTrue(count == 5); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MonotonicMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MonotonicMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MonotonicMetricSeries.java new file mode 100644 index 0000000..8bd1a9b --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/MonotonicMetricSeries.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.seriesgenerator; + +import java.util.Random; + +public class MonotonicMetricSeries implements AbstractMetricSeries { + + double startValue = 0.0; + double slope = 0.5; + double deviationPercentage = 0.0; + double outlierProbability = 0.0; + double outlierDeviationLowerPercentage = 0.0; + double outlierDeviationHigherPercentage = 0.0; + boolean outliersAboveValue = true; + + Random random = new Random(); + double nonOutlierProbability; + + // y = mx + c + double y; + double m; + double x; + double c; + + public MonotonicMetricSeries(double startValue, + double slope, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + this.startValue = startValue; + this.slope = slope; + this.deviationPercentage = deviationPercentage; + this.outlierProbability = outlierProbability; + this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage; + this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage; + this.outliersAboveValue = outliersAboveValue; + init(); + } + + private void init() { + y = startValue; + m = slope; + x = 1; + c = y - (m * x); + nonOutlierProbability = 1.0 - outlierProbability; + } + + @Override + public double nextValue() { + + double value; + double probability = random.nextDouble(); + + y = m * x + c; + if (probability <= nonOutlierProbability) { + double valueDeviationLowerLimit = y - deviationPercentage * y; + double valueDeviationHigherLimit = y + deviationPercentage * y; + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + } else { + if (outliersAboveValue) { + double outlierLowerLimit = y + outlierDeviationLowerPercentage * y; + double outlierUpperLimit = y + outlierDeviationHigherPercentage * y; + value = outlierLowerLimit + (outlierUpperLimit - outlierLowerLimit) * random.nextDouble(); + } else { + double outlierLowerLimit = y - outlierDeviationLowerPercentage * y; + double outlierUpperLimit = y - outlierDeviationHigherPercentage * y; + value = outlierUpperLimit + (outlierLowerLimit - outlierUpperLimit) * random.nextDouble(); + } + } + x++; + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/NormalMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/NormalMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/NormalMetricSeries.java new file mode 100644 index 0000000..fdedb6e --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/NormalMetricSeries.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.seriesgenerator; + +import java.util.Random; + +public class NormalMetricSeries implements AbstractMetricSeries { + + double mean = 0.0; + double sd = 1.0; + double outlierProbability = 0.0; + double outlierDeviationSDTimesLower = 0.0; + double outlierDeviationSDTimesHigher = 0.0; + boolean outlierOnRightEnd = true; + + Random random = new Random(); + double nonOutlierProbability; + + + public NormalMetricSeries(double mean, + double sd, + double outlierProbability, + double outlierDeviationSDTimesLower, + double outlierDeviationSDTimesHigher, + boolean outlierOnRightEnd) { + this.mean = mean; + this.sd = sd; + this.outlierProbability = outlierProbability; + this.outlierDeviationSDTimesLower = outlierDeviationSDTimesLower; + this.outlierDeviationSDTimesHigher = outlierDeviationSDTimesHigher; + this.outlierOnRightEnd = outlierOnRightEnd; + init(); + } + + private void init() { + nonOutlierProbability = 1.0 - outlierProbability; + } + + @Override + public double nextValue() { + + double value; + double probability = random.nextDouble(); + + if (probability <= nonOutlierProbability) { + value = random.nextGaussian() * sd + mean; + } else { + if (outlierOnRightEnd) { + value = mean + (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd; + } else { + value = mean - (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd; + } + } + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java new file mode 100644 index 0000000..403e599 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.seriesgenerator; + +import java.util.Random; + +public class SteadyWithTurbulenceMetricSeries implements AbstractMetricSeries { + + double steadyStateValue = 0.0; + double steadyStateDeviationPercentage = 0.0; + double turbulentPeriodDeviationLowerPercentage = 0.3; + double turbulentPeriodDeviationHigherPercentage = 0.5; + int turbulentPeriodLength = 5; + int turbulentStatePosition = 1; + int approximateSeriesLength = 10; + + Random random = new Random(); + double valueDeviationLowerLimit; + double valueDeviationHigherLimit; + double tPeriodLowerLimit; + double tPeriodUpperLimit; + int tPeriodStartIndex = 0; + int index = 0; + + public SteadyWithTurbulenceMetricSeries(int approximateSeriesLength, + double steadyStateValue, + double steadyStateDeviationPercentage, + double turbulentPeriodDeviationLowerPercentage, + double turbulentPeriodDeviationHigherPercentage, + int turbulentPeriodLength, + int turbulentStatePosition) { + this.approximateSeriesLength = approximateSeriesLength; + this.steadyStateValue = steadyStateValue; + this.steadyStateDeviationPercentage = steadyStateDeviationPercentage; + this.turbulentPeriodDeviationLowerPercentage = turbulentPeriodDeviationLowerPercentage; + this.turbulentPeriodDeviationHigherPercentage = turbulentPeriodDeviationHigherPercentage; + this.turbulentPeriodLength = turbulentPeriodLength; + this.turbulentStatePosition = turbulentStatePosition; + init(); + } + + private void init() { + + if (turbulentStatePosition == 1) { + tPeriodStartIndex = (int) (0.25 * approximateSeriesLength + (0.25 * approximateSeriesLength * random.nextDouble())); + } else if (turbulentStatePosition == 2) { + tPeriodStartIndex = approximateSeriesLength - turbulentPeriodLength; + } + + valueDeviationLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue; + valueDeviationHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue; + + tPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue; + tPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue; + } + + @Override + public double nextValue() { + + double value; + + if (index >= tPeriodStartIndex && index <= (tPeriodStartIndex + turbulentPeriodLength)) { + value = tPeriodLowerLimit + (tPeriodUpperLimit - tPeriodLowerLimit) * random.nextDouble(); + } else { + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + } + index++; + return value; + } + + @Override + public double[] getSeries(int n) { + + double[] series = new double[n]; + int turbulentPeriodStartIndex = 0; + + if (turbulentStatePosition == 1) { + turbulentPeriodStartIndex = (int) (0.25 * n + (0.25 * n * random.nextDouble())); + } else if (turbulentStatePosition == 2) { + turbulentPeriodStartIndex = n - turbulentPeriodLength; + } + + double valueDevLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue; + double valueDevHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue; + + double turbulentPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue; + double turbulentPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue; + + for (int i = 0; i < n; i++) { + if (i >= turbulentPeriodStartIndex && i < (turbulentPeriodStartIndex + turbulentPeriodLength)) { + series[i] = turbulentPeriodLowerLimit + (turbulentPeriodUpperLimit - turbulentPeriodLowerLimit) * random.nextDouble(); + } else { + series[i] = valueDevLowerLimit + (valueDevHigherLimit - valueDevLowerLimit) * random.nextDouble(); + } + } + + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/StepFunctionMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/StepFunctionMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/StepFunctionMetricSeries.java new file mode 100644 index 0000000..c91eac9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/StepFunctionMetricSeries.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.seriesgenerator; + +import java.util.Random; + +public class StepFunctionMetricSeries implements AbstractMetricSeries { + + double startValue = 0.0; + double steadyValueDeviationPercentage = 0.0; + double steadyPeriodSlope = 0.5; + int steadyPeriodMinSize = 10; + int steadyPeriodMaxSize = 20; + double stepChangePercentage = 0.0; + boolean upwardStep = true; + + Random random = new Random(); + + // y = mx + c + double y; + double m; + double x; + double c; + int currentStepSize; + int currentIndex; + + public StepFunctionMetricSeries(double startValue, + double steadyValueDeviationPercentage, + double steadyPeriodSlope, + int steadyPeriodMinSize, + int steadyPeriodMaxSize, + double stepChangePercentage, + boolean upwardStep) { + this.startValue = startValue; + this.steadyValueDeviationPercentage = steadyValueDeviationPercentage; + this.steadyPeriodSlope = steadyPeriodSlope; + this.steadyPeriodMinSize = steadyPeriodMinSize; + this.steadyPeriodMaxSize = steadyPeriodMaxSize; + this.stepChangePercentage = stepChangePercentage; + this.upwardStep = upwardStep; + init(); + } + + private void init() { + y = startValue; + m = steadyPeriodSlope; + x = 1; + c = y - (m * x); + + currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble()); + currentIndex = 0; + } + + @Override + public double nextValue() { + + double value = 0.0; + + if (currentIndex < currentStepSize) { + y = m * x + c; + double valueDeviationLowerLimit = y - steadyValueDeviationPercentage * y; + double valueDeviationHigherLimit = y + steadyValueDeviationPercentage * y; + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + x++; + currentIndex++; + } + + if (currentIndex == currentStepSize) { + currentIndex = 0; + currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble()); + if (upwardStep) { + y = y + stepChangePercentage * y; + } else { + y = y - stepChangePercentage * y; + } + x = 1; + c = y - (m * x); + } + + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/UniformMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/UniformMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/UniformMetricSeries.java new file mode 100644 index 0000000..6122f82 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/seriesgenerator/UniformMetricSeries.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.seriesgenerator; + +import java.util.Random; + +public class UniformMetricSeries implements AbstractMetricSeries { + + double value = 0.0; + double deviationPercentage = 0.0; + double outlierProbability = 0.0; + double outlierDeviationLowerPercentage = 0.0; + double outlierDeviationHigherPercentage = 0.0; + boolean outliersAboveValue= true; + + Random random = new Random(); + double valueDeviationLowerLimit; + double valueDeviationHigherLimit; + double outlierLeftLowerLimit; + double outlierLeftHigherLimit; + double outlierRightLowerLimit; + double outlierRightUpperLimit; + double nonOutlierProbability; + + + public UniformMetricSeries(double value, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + this.value = value; + this.deviationPercentage = deviationPercentage; + this.outlierProbability = outlierProbability; + this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage; + this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage; + this.outliersAboveValue = outliersAboveValue; + init(); + } + + private void init() { + valueDeviationLowerLimit = value - deviationPercentage * value; + valueDeviationHigherLimit = value + deviationPercentage * value; + + outlierLeftLowerLimit = value - outlierDeviationHigherPercentage * value; + outlierLeftHigherLimit = value - outlierDeviationLowerPercentage * value; + outlierRightLowerLimit = value + outlierDeviationLowerPercentage * value; + outlierRightUpperLimit = value + outlierDeviationHigherPercentage * value; + + nonOutlierProbability = 1.0 - outlierProbability; + } + + @Override + public double nextValue() { + + double value; + double probability = random.nextDouble(); + + if (probability <= nonOutlierProbability) { + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + } else { + if (!outliersAboveValue) { + value = outlierLeftLowerLimit + (outlierLeftHigherLimit - outlierLeftLowerLimit) * random.nextDouble(); + } else { + value = outlierRightLowerLimit + (outlierRightUpperLimit - outlierRightLowerLimit) * random.nextDouble(); + } + } + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml deleted file mode 100644 index e6e12f2..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml +++ /dev/null @@ -1,205 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>ambari-metrics</artifactId> - <groupId>org.apache.ambari</groupId> - <version>2.0.0.0-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - <artifactId>ambari-metrics-anomaly-detector</artifactId> - <version>2.0.0.0-SNAPSHOT</version> - <properties> - <scala.version>2.10.4</scala.version> - <scala.binary.version>2.11</scala.binary.version> - </properties> - - <repositories> - <repository> - <id>scala-tools.org</id> - <name>Scala-Tools Maven2 Repository</name> - <url>http://scala-tools.org/repo-releases</url> - </repository> - </repositories> - - <pluginRepositories> - <pluginRepository> - <id>scala-tools.org</id> - <name>Scala-Tools Maven2 Repository</name> - <url>http://scala-tools.org/repo-releases</url> - </pluginRepository> - </pluginRepositories> - - <build> - <plugins> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.8</source> - <target>1.8</target> - </configuration> - </plugin> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>compile</goal> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <scalaVersion>${scala.version}</scalaVersion> - <args> - <arg>-target:jvm-1.5</arg> - </args> - </configuration> - </plugin> - </plugins> - </build> - <name>Ambari Metrics Anomaly Detector</name> - <packaging>jar</packaging> - - <dependencies> - - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <version>2.5</version> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>1.7.2</version> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.7.2</version> - </dependency> - - <dependency> - <groupId>com.github.lucarosellini.rJava</groupId> - <artifactId>JRI</artifactId> - <version>0.9-7</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_2.11</artifactId> - <version>2.1.1</version> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>0.10.1.0</version> - <exclusions> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - <exclusion> - <groupId>javax.mail</groupId> - <artifactId>mail</artifactId> - </exclusion> - <exclusion> - <groupId>javax.jms</groupId> - <artifactId>jmx</artifactId> - </exclusion> - <exclusion> - <groupId>javax.jms</groupId> - <artifactId>jms</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>0.10.1.0</version> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>connect-json</artifactId> - <version>0.10.1.0</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kafka_2.10</artifactId> - <version>1.6.3</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> - <version>1.6.3</version> - </dependency> - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-spark</artifactId> - <version>4.10.0-HBase-1.1</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib_2.10</artifactId> - <version>1.3.0</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - <version>4.10</version> - </dependency> - <dependency> - <groupId>org.apache.ambari</groupId> - <artifactId>ambari-metrics-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - <version>4.2.5</version> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>2.1.1</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib_${scala.binary.version}</artifactId> - <version>2.1.1</version> - <scope>provided</scope> - </dependency> - </dependencies> -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java deleted file mode 100644 index eb19857..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.common; - -import java.util.Arrays; - -public class DataSeries { - - public String seriesName; - public double[] ts; - public double[] values; - - public DataSeries(String seriesName, double[] ts, double[] values) { - this.seriesName = seriesName; - this.ts = ts; - this.values = values; - } - - @Override - public String toString() { - return seriesName + Arrays.toString(ts) + Arrays.toString(values); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java deleted file mode 100644 index 101b0e9..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.common; - - -import java.util.ArrayList; -import java.util.List; - -public class ResultSet { - - public List<double[]> resultset = new ArrayList<>(); - - public ResultSet(List<double[]> resultset) { - this.resultset = resultset; - } - - public void print() { - System.out.println("Result : "); - if (!resultset.isEmpty()) { - for (int i = 0; i<resultset.get(0).length;i++) { - for (double[] entity : resultset) { - System.out.print(entity[i] + " "); - } - System.out.println(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java deleted file mode 100644 index 4ea4ac5..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.common; - - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; - -public class StatisticUtils { - - public static double mean(double[] values) { - double sum = 0; - for (double d : values) { - sum += d; - } - return sum / values.length; - } - - public static double variance(double[] values) { - double avg = mean(values); - double variance = 0; - for (double d : values) { - variance += Math.pow(d - avg, 2.0); - } - return variance; - } - - public static double sdev(double[] values, boolean useBesselsCorrection) { - double variance = variance(values); - int n = (useBesselsCorrection) ? values.length - 1 : values.length; - return Math.sqrt(variance / n); - } - - public static double median(double[] values) { - double[] clonedValues = Arrays.copyOf(values, values.length); - Arrays.sort(clonedValues); - int n = values.length; - - if (n % 2 != 0) { - return clonedValues[(n-1)/2]; - } else { - return ( clonedValues[(n-1)/2] + clonedValues[n/2] ) / 2; - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java deleted file mode 100644 index b6b1bf5..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.alertservice.prototype.core; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONObject; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Serializable; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.Base64; - -public class AmbariServerInterface implements Serializable{ - - private static final Log LOG = LogFactory.getLog(AmbariServerInterface.class); - - private String ambariServerHost; - private String clusterName; - - public AmbariServerInterface(String ambariServerHost, String clusterName) { - this.ambariServerHost = ambariServerHost; - this.clusterName = clusterName; - } - - public int getPointInTimeSensitivity() { - - String url = constructUri("http", ambariServerHost, "8080", "/api/v1/clusters/" + clusterName + "/alert_definitions?fields=*"); - - URL obj = null; - BufferedReader in = null; - - try { - obj = new URL(url); - HttpURLConnection con = (HttpURLConnection) obj.openConnection(); - con.setRequestMethod("GET"); - - String encoded = Base64.getEncoder().encodeToString(("admin:admin").getBytes(StandardCharsets.UTF_8)); - con.setRequestProperty("Authorization", "Basic "+encoded); - - int responseCode = con.getResponseCode(); - LOG.info("Sending 'GET' request to URL : " + url); - LOG.info("Response Code : " + responseCode); - - in = new BufferedReader( - new InputStreamReader(con.getInputStream())); - - StringBuilder responseJsonSb = new StringBuilder(); - String line; - while ((line = in.readLine()) != null) { - responseJsonSb.append(line); - } - - JSONObject jsonObject = new JSONObject(responseJsonSb.toString()); - JSONArray array = jsonObject.getJSONArray("items"); - for(int i = 0 ; i < array.length() ; i++){ - JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition"); - if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) { - JSONObject sourceNode = alertDefn.getJSONObject("source"); - JSONArray params = sourceNode.getJSONArray("parameters"); - for(int j = 0 ; j < params.length() ; j++){ - JSONObject param = params.getJSONObject(j); - if (param.get("name").equals("sensitivity")) { - return param.getInt("value"); - } - } - break; - } - } - - } catch (Exception e) { - LOG.error(e); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - LOG.warn(e); - } - } - } - - return -1; - } - - private String constructUri(String protocol, String host, String port, String path) { - StringBuilder sb = new StringBuilder(protocol); - sb.append("://"); - sb.append(host); - sb.append(":"); - sb.append(port); - sb.append(path); - return sb.toString(); - } - -// public static void main(String[] args) { -// AmbariServerInterface ambariServerInterface = new AmbariServerInterface(); -// ambariServerInterface.getPointInTimeSensitivity("avijayan-ams-1.openstacklocal","c1"); -// } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java deleted file mode 100644 index 2287ee3..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.core; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; - -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -public class MetricKafkaProducer { - - Producer producer; - private static String topicName = "ambari-metrics-topic"; - - public MetricKafkaProducer(String kafkaServers) { - Properties configProperties = new Properties(); - configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667" - configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); - configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer"); - producer = new KafkaProducer(configProperties); - } - - public void sendMetrics(TimelineMetrics timelineMetrics) throws InterruptedException, ExecutionException { - - ObjectMapper objectMapper = new ObjectMapper(); - JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics); - ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(topicName,jsonNode); - Future<RecordMetadata> kafkaFuture = producer.send(rec); - - System.out.println(kafkaFuture.isDone()); - System.out.println(kafkaFuture.get().topic()); - } -}
