[FLINK-6013][metrics] Add Datadog HTTP metrics reporter This closes #3736.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54ceec16 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54ceec16 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54ceec16 Branch: refs/heads/master Commit: 54ceec16c11655da4181c0816a3b12d1c4bab465 Parents: 50baec6 Author: Bowen Li <[email protected]> Authored: Tue Apr 18 10:27:17 2017 -0700 Committer: zentol <[email protected]> Committed: Tue May 9 22:56:49 2017 +0200 ---------------------------------------------------------------------- docs/monitoring/metrics.md | 24 +++ flink-dist/pom.xml | 7 + flink-dist/src/main/assemblies/opt.xml | 7 + flink-metrics/flink-metrics-datadog/pom.xml | 108 ++++++++++ .../apache/flink/metrics/datadog/DCounter.java | 44 ++++ .../apache/flink/metrics/datadog/DGauge.java | 45 ++++ .../apache/flink/metrics/datadog/DMeter.java | 42 ++++ .../apache/flink/metrics/datadog/DMetric.java | 84 ++++++++ .../apache/flink/metrics/datadog/DSeries.java | 45 ++++ .../metrics/datadog/DatadogHttpClient.java | 97 +++++++++ .../metrics/datadog/DatadogHttpReporter.java | 210 +++++++++++++++++++ .../flink/metrics/datadog/MetricType.java | 30 +++ .../metrics/datadog/DatadogHttpClientTest.java | 199 ++++++++++++++++++ .../src/test/resources/log4j-test.properties | 27 +++ flink-metrics/pom.xml | 1 + 15 files changed, 970 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/docs/monitoring/metrics.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 290a452..2bc65a6 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -436,6 +436,30 @@ metrics.reporter.stsd.port: 8125 {% endhighlight %} +### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter) + +In order to use this reporter you must copy `/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder +of your Flink distribution. + +Note any variables in Flink metrics, such as `<host>`, `<job_name>`, `<tm_id>`, `<subtask_index>`, `<task_name>`, and `<operator_name>`, +will be sent to Datadog as tags. Tags will look like `host:localhost` and `job_name:myjobname`. + +Parameters: + +- `apikey` - the Datadog API key +- `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only + +Example configuration: + +{% highlight yaml %} + +metrics.reporters: dghttp +metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter +metrics.reporter.dghttp.apikey: xxx +metrics.reporter.dghttp.tags: myflinkapp,prod + +{% endhighlight %} + ## System metrics By default Flink gathers several metrics that provide deep insights on the current state. http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 9773991..6d8debf 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -202,6 +202,13 @@ under the License. <version>${project.version}</version> <scope>provided</scope> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-datadog</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> <!-- end optional Flink metrics reporters --> <!-- start optional Flink libraries --> http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-dist/src/main/assemblies/opt.xml ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml index 95218d7..0386b92 100644 --- a/flink-dist/src/main/assemblies/opt.xml +++ b/flink-dist/src/main/assemblies/opt.xml @@ -105,6 +105,13 @@ </file> <file> + <source>../flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-${project.version}-shaded.jar</source> + <outputDirectory>opt/</outputDirectory> + <destName>flink-metrics-datadog-${project.version}.jar</destName> + <fileMode>0644</fileMode> + </file> + + <file> <source>../flink-shaded-hadoop/flink-shaded-hadoop2/target/flink-shaded-hadoop2-${project.version}.jar</source> <outputDirectory>opt/</outputDirectory> <destName>flink-shaded-hadoop2-${project.version}.jar</destName> http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-datadog/pom.xml b/flink-metrics/flink-metrics-datadog/pom.xml new file mode 100644 index 0000000..0d473fc --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/pom.xml @@ -0,0 +1,108 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics</artifactId> + <version>1.4-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-metrics-datadog</artifactId> + <name>flink-metrics-datadog</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <version>3.7.0</version> + </dependency> + + <dependency> + <groupId>com.squareup.okio</groupId> + <artifactId>okio</artifactId> + <version>1.12.0</version> + </dependency> + + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadedArtifactAttached>true</shadedArtifactAttached> + <relocations combine.children="append"> + <relocation> + <pattern>okhttp3</pattern> + <shadedPattern>org.apache.flink.shaded.okhttp3</shadedPattern> + </relocation> + <relocation> + <pattern>okio</pattern> + <shadedPattern>org.apache.flink.shaded.okio</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java new file mode 100644 index 0000000..58abbd6 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Counter; + +import java.util.List; + +/** + * Mapping of counter between Flink and Datadog + * */ +public class DCounter extends DMetric { + private final Counter counter; + + public DCounter(Counter c, String metricName, String host, List<String> tags) { + super(MetricType.counter, metricName, host, tags); + counter = c; + } + + /** + * Visibility of this method must not be changed + * since we deliberately not map it to json object in a Datadog-defined format + * */ + @Override + public Number getMetricValue() { + return counter.getCount(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java new file mode 100644 index 0000000..8deb117 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + + +import org.apache.flink.metrics.Gauge; + +import java.util.List; + +/** + * Mapping of gauge between Flink and Datadog + * */ +public class DGauge extends DMetric { + private final Gauge<Number> gauge; + + public DGauge(Gauge<Number> g, String metricName, String host, List<String> tags) { + super(MetricType.gauge, metricName, host, tags); + gauge = g; + } + + /** + * Visibility of this method must not be changed + * since we deliberately not map it to json object in a Datadog-defined format + * */ + @Override + public Number getMetricValue() { + return gauge.getValue(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java new file mode 100644 index 0000000..181a00c --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Meter; + +import java.util.List; + +/** + * Mapping of meter between Flink and Datadog + * + * Only consider rate of the meter, due to Datadog HTTP API's limited support of meter + * */ +public class DMeter extends DMetric { + private final Meter meter; + + public DMeter(Meter m, String metricName, String host, List<String> tags) { + super(MetricType.gauge, metricName, host, tags); + meter = m; + } + + @Override + public Number getMetricValue() { + return meter.getRate(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java new file mode 100644 index 0000000..3f9d6ff --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; + +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract metric of Datadog for serialization + * */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public abstract class DMetric { + private static final long MILLIS_TO_SEC = 1000L; + + /** + * Names of metric/type/tags field and their getters must not be changed + * since they are mapped to json objects in a Datadog-defined format + * */ + private final String metric; // Metric name + private final MetricType type; + private final String host; + private final List<String> tags; + + public DMetric(MetricType metricType, String metric, String host, List<String> tags) { + this.type = metricType; + this.metric = metric; + this.host = host; + this.tags = tags; + } + + public MetricType getType() { + return type; + } + + public String getMetric() { + return metric; + } + + public String getHost() { + return host; + } + + public List<String> getTags() { + return tags; + } + + public List<List<Number>> getPoints() { + // One single data point + List<Number> point = new ArrayList<>(); + point.add(getUnixEpochTimestamp()); + point.add(getMetricValue()); + + List<List<Number>> points = new ArrayList<>(); + points.add(point); + + return points; + } + + @JsonIgnore + public abstract Number getMetricValue(); + + public static long getUnixEpochTimestamp() { + return (System.currentTimeMillis() / MILLIS_TO_SEC); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java new file mode 100644 index 0000000..fb0bb09 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import java.util.ArrayList; +import java.util.List; + +/** + * Json serialization between Flink and Datadog + **/ +public class DSeries { + /** + * Names of series field and its getters must not be changed + * since they are mapped to json objects in a Datadog-defined format + * */ + private List<DMetric> series; + + public DSeries() { + series = new ArrayList<>(); + } + + public void addMetric(DMetric metric) { + series.add(metric); + } + + public List<DMetric> getSeries() { + return series; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java new file mode 100644 index 0000000..dfbcee1 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.RequestBody; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Http client talking to Datadog + * */ +public class DatadogHttpClient{ + private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s"; + private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s"; + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8"); + private static final int TIMEOUT = 3; + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final String seriesUrl; + private final String validateUrl; + private final OkHttpClient client; + private final String apiKey; + + public DatadogHttpClient(String dgApiKey) { + if (dgApiKey == null || dgApiKey.isEmpty()) { + throw new IllegalArgumentException("Invalid API key:" + dgApiKey); + } + + apiKey = dgApiKey; + client = new OkHttpClient.Builder() + .connectTimeout(TIMEOUT, TimeUnit.SECONDS) + .writeTimeout(TIMEOUT, TimeUnit.SECONDS) + .readTimeout(TIMEOUT, TimeUnit.SECONDS) + .build(); + + seriesUrl = String.format(SERIES_URL_FORMAT, apiKey); + validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey); + validateApiKey(); + } + + private void validateApiKey() { + Request r = new Request.Builder().url(validateUrl).get().build(); + + try { + Response response = client.newCall(r).execute(); + if (!response.isSuccessful()) { + throw new IllegalArgumentException( + String.format("API key: %s is invalid", apiKey)); + } + } catch(IOException e) { + throw new IllegalStateException("Failed contacting Datadog to validate API key", e); + } + } + + public void send(DatadogHttpReporter.DatadogHttpRequest request) throws Exception { + String postBody = serialize(request.getSeries()); + + Request r = new Request.Builder() + .url(seriesUrl) + .post(RequestBody.create(MEDIA_TYPE, postBody)) + .build(); + + client.newCall(r).execute().close(); + } + + public static String serialize(Object obj) throws JsonProcessingException { + return MAPPER.writeValueAsString(obj); + } + + public void close() { + client.dispatcher().executorService().shutdown(); + client.connectionPool().evictAll(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java new file mode 100644 index 0000000..fcb5c4b --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + private static final String HOST_VARIABLE = "<host>"; + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Map<Gauge, DGauge> gauges = new ConcurrentHashMap<>(); + private final Map<Counter, DCounter> counters = new ConcurrentHashMap<>(); + private final Map<Meter, DMeter> meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List<String> configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName); + + List<String> tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + String host = getHostFromMetricGroup(group); + + if (metric instanceof Counter) { + Counter c = (Counter) metric; + counters.put(c, new DCounter(c, name, host, tags)); + } else if (metric instanceof Gauge) { + Gauge g = (Gauge) metric; + gauges.put(g, new DGauge(g, name, host, tags)); + } else if (metric instanceof Meter) { + Meter m = (Meter) metric; + // Only consider rate + meters.put(m, new DMeter(m, name, host, tags)); + } else if (metric instanceof Histogram) { + LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName); + } else { + LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + if (metric instanceof Counter) { + counters.remove(metric); + } else if (metric instanceof Gauge) { + gauges.remove(metric); + } else if (metric instanceof Meter) { + meters.remove(metric); + } else if (metric instanceof Histogram) { + // No Histogram is registered + } else { + LOGGER.warn("Cannot remove unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + + @Override + public void open(MetricConfig config) { + client = new DatadogHttpClient(config.getString(API_KEY, null)); + LOGGER.info("Configured DatadogHttpReporter"); + + configTags = getTagsFromConfig(config.getString(TAGS, "")); + } + + @Override + public void close() { + client.close(); + LOGGER.info("Shut down DatadogHttpReporter"); + } + + @Override + public void report() { + DatadogHttpRequest request = new DatadogHttpRequest(); + + for (Map.Entry<Gauge, DGauge> entry : gauges.entrySet()) { + DGauge g = entry.getValue(); + try { + // Will throw exception if the Gauge is not of Number type + // Flink uses Gauge to store many types other than Number + g.getMetricValue(); + request.addGauge(g); + } catch (Exception e) { + // Remove that Gauge if it's not of Number type + gauges.remove(entry.getKey()); + } + } + + for (DCounter c : counters.values()) { + request.addCounter(c); + } + + for (DMeter m : meters.values()) { + request.addMeter(m); + } + + try { + client.send(request); + } catch (Exception e) { + LOGGER.warn("Failed reporting metrics to Datadog.", e); + } + } + + /** + * Get config tags from config 'metrics.reporter.dghttp.tags' + * */ + private List<String> getTagsFromConfig(String str) { + return Arrays.asList(str.split(",")); + } + + /** + * Get tags from MetricGroup#getAllVariables(), excluding 'host' + * */ + private List<String> getTagsFromMetricGroup(MetricGroup metricGroup) { + List<String> tags = new ArrayList<>(); + + for (Map.Entry<String, String> entry: metricGroup.getAllVariables().entrySet()) { + if(!entry.getKey().equals(HOST_VARIABLE)) { + tags.add(getVariableName(entry.getKey()) + ":" + entry.getValue()); + } + } + + return tags; + } + + /** + * Get host from MetricGroup#getAllVariables() if it exists; returns Null otherwise + * */ + private String getHostFromMetricGroup(MetricGroup metricGroup) { + return metricGroup.getAllVariables().get(HOST_VARIABLE); + } + + /** + * Given "<xxx>", return "xxx" + * */ + private String getVariableName(String str) { + return str.substring(1, str.length() - 1); + } + + /** + * Compact metrics in batch, serialize them, and send to Datadog via HTTP + * */ + static class DatadogHttpRequest { + private final DSeries series; + + public DatadogHttpRequest() { + series = new DSeries(); + } + + public void addGauge(DGauge gauge) { + series.addMetric(gauge); + } + + public void addCounter(DCounter counter) { + series.addMetric(counter); + } + + public void addMeter(DMeter meter) { + series.addMetric(meter); + } + + public DSeries getSeries() { + return series; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java new file mode 100644 index 0000000..97f9b29 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +/** + * Metric types supported by Datadog + * */ +public enum MetricType { + /** + * Names of 'gauge' and 'counter' must not be changed + * since they are mapped to json objects in a Datadog-defined format + * */ + gauge, counter +} http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java new file mode 100644 index 0000000..bda5d47 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +@RunWith(Enclosed.class) +public class DatadogHttpClientTest { + public static class TestApiKey { + @Test(expected = IllegalArgumentException.class) + public void testClientWithEmptyKey() { + new DatadogHttpClient(""); + } + + @Test(expected = IllegalArgumentException.class) + public void testClientWithNullKey() { + new DatadogHttpClient(null); + } + } + + @RunWith(PowerMockRunner.class) + @PrepareForTest(DMetric.class) + public static class TestSerialization { + private static List<String> tags = Arrays.asList("tag1", "tag2"); + + private static final long MOCKED_SYSTEM_MILLIS = 123L; + + @Before + public void mockSystemMillis() { + PowerMockito.mockStatic(DMetric.class); + PowerMockito.when(DMetric.getUnixEpochTimestamp()).thenReturn(MOCKED_SYSTEM_MILLIS); + } + + @Test + public void serializeGauge() throws JsonProcessingException { + + DGauge g = new DGauge(new Gauge<Number>() { + @Override + public Number getValue() { + return 1; + } + }, "testCounter", "localhost", tags); + + assertEquals( + "{\"metric\":\"testCounter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}", + DatadogHttpClient.serialize(g)); + } + + @Test + public void serializeGaugeWithoutHost() throws JsonProcessingException { + + DGauge g = new DGauge(new Gauge<Number>() { + @Override + public Number getValue() { + return 1; + } + }, "testCounter", null, tags); + + assertEquals( + "{\"metric\":\"testCounter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}", + DatadogHttpClient.serialize(g)); + } + + @Test + public void serializeCounter() throws JsonProcessingException { + DCounter c = new DCounter(new Counter() { + @Override + public void inc() {} + + @Override + public void inc(long n) {} + + @Override + public void dec() {} + + @Override + public void dec(long n) {} + + @Override + public long getCount() { + return 1; + } + }, "testCounter", "localhost", tags); + + assertEquals( + "{\"metric\":\"testCounter\",\"type\":\"counter\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}", + DatadogHttpClient.serialize(c)); + } + + @Test + public void serializeCounterWithoutHost() throws JsonProcessingException { + DCounter c = new DCounter(new Counter() { + @Override + public void inc() {} + + @Override + public void inc(long n) {} + + @Override + public void dec() {} + + @Override + public void dec(long n) {} + + @Override + public long getCount() { + return 1; + } + }, "testCounter", null, tags); + + assertEquals( + "{\"metric\":\"testCounter\",\"type\":\"counter\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}", + DatadogHttpClient.serialize(c)); + } + + @Test + public void serializeMeter() throws JsonProcessingException { + + DMeter m = new DMeter(new Meter() { + @Override + public void markEvent() {} + + @Override + public void markEvent(long n) {} + + @Override + public double getRate() { + return 1; + } + + @Override + public long getCount() { + return 0; + } + }, "testMeter","localhost", tags); + + assertEquals( + "{\"metric\":\"testMeter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}", + DatadogHttpClient.serialize(m)); + } + + @Test + public void serializeMeterWithoutHost() throws JsonProcessingException { + + DMeter m = new DMeter(new Meter() { + @Override + public void markEvent() {} + + @Override + public void markEvent(long n) {} + + @Override + public double getRate() { + return 1; + } + + @Override + public long getCount() { + return 0; + } + }, "testMeter", null, tags); + + assertEquals( + "{\"metric\":\"testMeter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}", + DatadogHttpClient.serialize(m)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..2226f68 --- /dev/null +++ b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml index 317dde8..e1b66c2 100644 --- a/flink-metrics/pom.xml +++ b/flink-metrics/pom.xml @@ -40,6 +40,7 @@ under the License. <module>flink-metrics-graphite</module> <module>flink-metrics-jmx</module> <module>flink-metrics-statsd</module> + <module>flink-metrics-datadog</module> </modules> <!-- override these root dependencies as 'provided', so they don't end up
